Ocache remove with ctx
This commit is contained in:
parent
96b43de5f0
commit
65e1577bfd
@ -85,7 +85,7 @@ type OCache interface {
|
|||||||
// Returns error when object exists
|
// Returns error when object exists
|
||||||
Add(id string, value Object) (err error)
|
Add(id string, value Object) (err error)
|
||||||
// Remove closes and removes object
|
// Remove closes and removes object
|
||||||
Remove(id string) (ok bool, err error)
|
Remove(ctx context.Context, id string) (ok bool, err error)
|
||||||
// ForEach iterates over all loaded objects, breaks when callback returns false
|
// ForEach iterates over all loaded objects, breaks when callback returns false
|
||||||
ForEach(f func(v Object) (isContinue bool))
|
ForEach(f func(v Object) (isContinue bool))
|
||||||
// GC frees not used and expired objects
|
// GC frees not used and expired objects
|
||||||
@ -170,7 +170,7 @@ func (c *oCache) load(ctx context.Context, id string, e *entry) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *oCache) Remove(id string) (ok bool, err error) {
|
func (c *oCache) Remove(ctx context.Context, id string) (ok bool, err error) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if c.closed {
|
if c.closed {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@ -183,13 +183,12 @@ func (c *oCache) Remove(id string) (ok bool, err error) {
|
|||||||
return false, ErrNotExists
|
return false, ErrNotExists
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return c.remove(e)
|
return c.remove(ctx, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *oCache) remove(e *entry) (ok bool, err error) {
|
func (c *oCache) remove(ctx context.Context, e *entry) (ok bool, err error) {
|
||||||
<-e.load
|
if _, err = e.waitLoad(ctx, e.id); err != nil {
|
||||||
if e.value == nil {
|
return false, err
|
||||||
return false, ErrNotExists
|
|
||||||
}
|
}
|
||||||
_, curState := e.setClosing(true)
|
_, curState := e.setClosing(true)
|
||||||
if curState == entryStateClosing {
|
if curState == entryStateClosing {
|
||||||
@ -320,7 +319,7 @@ func (c *oCache) Close() (err error) {
|
|||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
for _, e := range toClose {
|
for _, e := range toClose {
|
||||||
if _, err := c.remove(e); err != nil && err != ErrNotExists {
|
if _, err := c.remove(context.Background(), e); err != nil && err != ErrNotExists {
|
||||||
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err)
|
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,6 +13,8 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ctx = context.Background()
|
||||||
|
|
||||||
type testObject struct {
|
type testObject struct {
|
||||||
name string
|
name string
|
||||||
closeErr error
|
closeErr error
|
||||||
@ -281,7 +283,7 @@ func Test_OCache_Remove(t *testing.T) {
|
|||||||
assert.Equal(t, 1, c.Len())
|
assert.Equal(t, 1, c.Len())
|
||||||
// removing the object, so we will wait on closing
|
// removing the object, so we will wait on closing
|
||||||
go func() {
|
go func() {
|
||||||
_, err := c.Remove("id")
|
_, err := c.Remove(ctx, "id")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}()
|
}()
|
||||||
time.Sleep(time.Millisecond * 20)
|
time.Sleep(time.Millisecond * 20)
|
||||||
@ -318,7 +320,7 @@ func Test_OCache_Remove(t *testing.T) {
|
|||||||
time.Sleep(time.Millisecond * 20)
|
time.Sleep(time.Millisecond * 20)
|
||||||
var events []string
|
var events []string
|
||||||
go func() {
|
go func() {
|
||||||
ok, err := c.Remove("id")
|
ok, err := c.Remove(ctx, "id")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
events = append(events, "remove")
|
events = append(events, "remove")
|
||||||
@ -347,7 +349,7 @@ func Test_OCache_Remove(t *testing.T) {
|
|||||||
time.Sleep(time.Millisecond * 20)
|
time.Sleep(time.Millisecond * 20)
|
||||||
var events []string
|
var events []string
|
||||||
go func() {
|
go func() {
|
||||||
ok, err := c.Remove("id")
|
ok, err := c.Remove(ctx, "id")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
events = append(events, "remove")
|
events = append(events, "remove")
|
||||||
@ -372,7 +374,7 @@ func Test_OCache_Remove(t *testing.T) {
|
|||||||
require.NotNil(t, val)
|
require.NotNil(t, val)
|
||||||
assert.Equal(t, 1, c.Len())
|
assert.Equal(t, 1, c.Len())
|
||||||
go func() {
|
go func() {
|
||||||
ok, err := c.Remove("id")
|
ok, err := c.Remove(ctx, "id")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
close(removeCh)
|
close(removeCh)
|
||||||
@ -430,7 +432,7 @@ func TestOCacheFuzzy(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for j := 0; j < 10; j++ {
|
for j := 0; j < 10; j++ {
|
||||||
for i := 0; i < max; i++ {
|
for i := 0; i < max; i++ {
|
||||||
c.Remove(getId(i))
|
c.Remove(ctx, getId(i))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -477,7 +479,7 @@ func TestOCacheFuzzy(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
for j := 0; j < 10; j++ {
|
for j := 0; j < 10; j++ {
|
||||||
for i := 0; i < max; i++ {
|
for i := 0; i < max; i++ {
|
||||||
c.Remove(getId(i))
|
c.Remove(ctx, getId(i))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@ -49,7 +49,7 @@ func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) {
|
|||||||
default:
|
default:
|
||||||
return pr, nil
|
return pr, nil
|
||||||
}
|
}
|
||||||
_, _ = p.cache.Remove(id)
|
_, _ = p.cache.Remove(ctx, id)
|
||||||
return p.Get(ctx, id)
|
return p.Get(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user