From 18716eebb42dc95d8175bb22a072f46e83ec8303 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 15 Jun 2023 13:08:03 +0200 Subject: [PATCH] Cancel on cache close --- app/ocache/entry.go | 18 +++++++++++++++++- app/ocache/ocache.go | 9 +++++++-- app/ocache/ocache_test.go | 19 +++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/app/ocache/entry.go b/app/ocache/entry.go index fbe60e26..f95af1cc 100644 --- a/app/ocache/entry.go +++ b/app/ocache/entry.go @@ -2,9 +2,10 @@ package ocache import ( "context" - "go.uber.org/zap" "sync" "time" + + "go.uber.org/zap" ) type entryState int @@ -25,6 +26,7 @@ type entry struct { value Object close chan struct{} mx sync.Mutex + cancel context.CancelFunc } func newEntry(id string, value Object, state entryState) *entry { @@ -49,6 +51,20 @@ func (e *entry) isClosing() bool { return e.state == entryStateClosed || e.state == entryStateClosing } +func (e *entry) setCancel(cancel context.CancelFunc) { + e.mx.Lock() + defer e.mx.Unlock() + e.cancel = cancel +} + +func (e *entry) cancelLoad() { + e.mx.Lock() + defer e.mx.Unlock() + if e.cancel != nil { + e.cancel() + } +} + func (e *entry) waitLoad(ctx context.Context, id string) (value Object, err error) { select { case <-ctx.Done(): diff --git a/app/ocache/ocache.go b/app/ocache/ocache.go index 4f130b89..c3bc9a0a 100644 --- a/app/ocache/ocache.go +++ b/app/ocache/ocache.go @@ -3,10 +3,11 @@ package ocache import ( "context" "errors" - "github.com/anyproto/any-sync/app/logger" - "go.uber.org/zap" "sync" "time" + + "github.com/anyproto/any-sync/app/logger" + "go.uber.org/zap" ) var ( @@ -157,7 +158,10 @@ func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) func (c *oCache) load(ctx context.Context, id string, e *entry) { defer close(e.load) + ctx, cancel := context.WithCancel(ctx) + e.setCancel(cancel) value, err := c.loadFunc(ctx, id) + cancel() c.mu.Lock() defer c.mu.Unlock() @@ -315,6 +319,7 @@ func (c *oCache) Close() (err error) { close(c.closeCh) var toClose []*entry for _, e := range c.data { + e.cancelLoad() toClose = append(toClose, e) } c.mu.Unlock() diff --git a/app/ocache/ocache_test.go b/app/ocache/ocache_test.go index d78345b0..85a9c551 100644 --- a/app/ocache/ocache_test.go +++ b/app/ocache/ocache_test.go @@ -386,6 +386,25 @@ func Test_OCache_Remove(t *testing.T) { }) } +func TestOCacheCancelWhenRemove(t *testing.T) { + c := New(func(ctx context.Context, id string) (value Object, err error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + } + }, WithTTL(time.Millisecond*10)) + stopLoad := make(chan struct{}) + var err error + go func() { + _, err = c.Get(context.TODO(), "id") + stopLoad <- struct{}{} + }() + time.Sleep(time.Millisecond * 10) + c.Close() + <-stopLoad + require.Equal(t, context.Canceled, err) +} + func TestOCacheFuzzy(t *testing.T) { t.Run("test many objects gc, get and remove simultaneously, close after", func(t *testing.T) { tryCloseIds := make(map[string]bool)