Merge pull request #26 from anyproto/cache-cancel-load-close

Cancel on cache close
This commit is contained in:
Mikhail Rakhmanov 2023-06-15 17:38:14 +02:00 committed by GitHub
commit 2ea446184d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 3 deletions

View File

@ -2,9 +2,10 @@ package ocache
import ( import (
"context" "context"
"go.uber.org/zap"
"sync" "sync"
"time" "time"
"go.uber.org/zap"
) )
type entryState int type entryState int
@ -25,6 +26,7 @@ type entry struct {
value Object value Object
close chan struct{} close chan struct{}
mx sync.Mutex mx sync.Mutex
cancel context.CancelFunc
} }
func newEntry(id string, value Object, state entryState) *entry { func newEntry(id string, value Object, state entryState) *entry {
@ -49,6 +51,20 @@ func (e *entry) isClosing() bool {
return e.state == entryStateClosed || e.state == entryStateClosing return e.state == entryStateClosed || e.state == entryStateClosing
} }
func (e *entry) setCancel(cancel context.CancelFunc) {
e.mx.Lock()
defer e.mx.Unlock()
e.cancel = cancel
}
func (e *entry) cancelLoad() {
e.mx.Lock()
defer e.mx.Unlock()
if e.cancel != nil {
e.cancel()
}
}
func (e *entry) waitLoad(ctx context.Context, id string) (value Object, err error) { func (e *entry) waitLoad(ctx context.Context, id string) (value Object, err error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -3,10 +3,11 @@ package ocache
import ( import (
"context" "context"
"errors" "errors"
"github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap"
"sync" "sync"
"time" "time"
"github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap"
) )
var ( var (
@ -157,7 +158,10 @@ func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error)
func (c *oCache) load(ctx context.Context, id string, e *entry) { func (c *oCache) load(ctx context.Context, id string, e *entry) {
defer close(e.load) defer close(e.load)
ctx, cancel := context.WithCancel(ctx)
e.setCancel(cancel)
value, err := c.loadFunc(ctx, id) value, err := c.loadFunc(ctx, id)
cancel()
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -315,6 +319,7 @@ func (c *oCache) Close() (err error) {
close(c.closeCh) close(c.closeCh)
var toClose []*entry var toClose []*entry
for _, e := range c.data { for _, e := range c.data {
e.cancelLoad()
toClose = append(toClose, e) toClose = append(toClose, e)
} }
c.mu.Unlock() c.mu.Unlock()

View File

@ -386,6 +386,25 @@ func Test_OCache_Remove(t *testing.T) {
}) })
} }
func TestOCacheCancelWhenRemove(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
}
}, WithTTL(time.Millisecond*10))
stopLoad := make(chan struct{})
var err error
go func() {
_, err = c.Get(context.TODO(), "id")
stopLoad <- struct{}{}
}()
time.Sleep(time.Millisecond * 10)
c.Close()
<-stopLoad
require.Equal(t, context.Canceled, err)
}
func TestOCacheFuzzy(t *testing.T) { func TestOCacheFuzzy(t *testing.T) {
t.Run("test many objects gc, get and remove simultaneously, close after", func(t *testing.T) { t.Run("test many objects gc, get and remove simultaneously, close after", func(t *testing.T) {
tryCloseIds := make(map[string]bool) tryCloseIds := make(map[string]bool)