diff --git a/app/ocache/ocache.go b/app/ocache/ocache.go index 7375c643..8c9011e1 100644 --- a/app/ocache/ocache.go +++ b/app/ocache/ocache.go @@ -85,7 +85,7 @@ type OCache interface { // Returns error when object exists Add(id string, value Object) (err error) // Remove closes and removes object - Remove(id string) (ok bool, err error) + Remove(ctx context.Context, id string) (ok bool, err error) // ForEach iterates over all loaded objects, breaks when callback returns false ForEach(f func(v Object) (isContinue bool)) // GC frees not used and expired objects @@ -170,7 +170,7 @@ func (c *oCache) load(ctx context.Context, id string, e *entry) { } } -func (c *oCache) Remove(id string) (ok bool, err error) { +func (c *oCache) Remove(ctx context.Context, id string) (ok bool, err error) { c.mu.Lock() if c.closed { c.mu.Unlock() @@ -183,13 +183,12 @@ func (c *oCache) Remove(id string) (ok bool, err error) { return false, ErrNotExists } c.mu.Unlock() - return c.remove(e) + return c.remove(ctx, e) } -func (c *oCache) remove(e *entry) (ok bool, err error) { - <-e.load - if e.value == nil { - return false, ErrNotExists +func (c *oCache) remove(ctx context.Context, e *entry) (ok bool, err error) { + if _, err = e.waitLoad(ctx, e.id); err != nil { + return false, err } _, curState := e.setClosing(true) if curState == entryStateClosing { @@ -320,7 +319,7 @@ func (c *oCache) Close() (err error) { } c.mu.Unlock() for _, e := range toClose { - if _, err := c.remove(e); err != nil && err != ErrNotExists { + if _, err := c.remove(context.Background(), e); err != nil && err != ErrNotExists { c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err) } } diff --git a/app/ocache/ocache_test.go b/app/ocache/ocache_test.go index 843586b7..d78345b0 100644 --- a/app/ocache/ocache_test.go +++ b/app/ocache/ocache_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" ) +var ctx = context.Background() + type testObject struct { name string closeErr error @@ -281,7 +283,7 @@ func Test_OCache_Remove(t *testing.T) { assert.Equal(t, 1, c.Len()) // removing the object, so we will wait on closing go func() { - _, err := c.Remove("id") + _, err := c.Remove(ctx, "id") require.NoError(t, err) }() time.Sleep(time.Millisecond * 20) @@ -318,7 +320,7 @@ func Test_OCache_Remove(t *testing.T) { time.Sleep(time.Millisecond * 20) var events []string go func() { - ok, err := c.Remove("id") + ok, err := c.Remove(ctx, "id") require.NoError(t, err) require.True(t, ok) events = append(events, "remove") @@ -347,7 +349,7 @@ func Test_OCache_Remove(t *testing.T) { time.Sleep(time.Millisecond * 20) var events []string go func() { - ok, err := c.Remove("id") + ok, err := c.Remove(ctx, "id") require.NoError(t, err) require.False(t, ok) events = append(events, "remove") @@ -372,7 +374,7 @@ func Test_OCache_Remove(t *testing.T) { require.NotNil(t, val) assert.Equal(t, 1, c.Len()) go func() { - ok, err := c.Remove("id") + ok, err := c.Remove(ctx, "id") require.NoError(t, err) require.True(t, ok) close(removeCh) @@ -430,7 +432,7 @@ func TestOCacheFuzzy(t *testing.T) { defer wg.Done() for j := 0; j < 10; j++ { for i := 0; i < max; i++ { - c.Remove(getId(i)) + c.Remove(ctx, getId(i)) } } }() @@ -477,7 +479,7 @@ func TestOCacheFuzzy(t *testing.T) { go func() { for j := 0; j < 10; j++ { for i := 0; i < max; i++ { - c.Remove(getId(i)) + c.Remove(ctx, getId(i)) } } }() diff --git a/net/pool/pool.go b/net/pool/pool.go index e5a8cc1d..287ab512 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -49,7 +49,7 @@ func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) { default: return pr, nil } - _, _ = p.cache.Remove(id) + _, _ = p.cache.Remove(ctx, id) return p.Get(ctx, id) }