diff --git a/pkg/ocache/ocache.go b/pkg/ocache/ocache.go index 44dc9d42..998faf6d 100644 --- a/pkg/ocache/ocache.go +++ b/pkg/ocache/ocache.go @@ -89,6 +89,8 @@ type entry struct { load chan struct{} loadErr error value Object + isClosing bool + close chan struct{} } func (e *entry) locked() bool { @@ -148,6 +150,7 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { ok bool load bool ) +Load: c.mu.Lock() if c.closed { c.mu.Unlock() @@ -161,11 +164,18 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { } c.data[id] = e } - e.lastUsage = c.timeNow() - if !c.noRefCounter { - e.refCount++ + closing := e.isClosing + if !e.isClosing { + e.lastUsage = c.timeNow() + if !c.noRefCounter { + e.refCount++ + } } c.mu.Unlock() + if closing { + <-e.close + goto Load + } if load { go c.load(ctx, id, e) @@ -181,17 +191,18 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) { c.mu.Lock() val, ok := c.data[id] - c.mu.Unlock() - if !ok { + if !ok || val.isClosing { + c.mu.Unlock() return nil, ErrNotExists } + c.mu.Unlock() + select { case <-ctx.Done(): return nil, ctx.Err() case <-val.load: + return val.value, val.loadErr } - <-val.load - return val.value, val.loadErr } func (c *oCache) load(ctx context.Context, id string, e *entry) { @@ -238,17 +249,30 @@ func (c *oCache) Reset(id string) bool { func (c *oCache) Remove(id string) (ok bool, err error) { c.mu.Lock() - e, ok := c.data[id] - if ok { - delete(c.data, id) + if c.closed { + c.mu.Unlock() + err = ErrClosed + return } + var e *entry + e, ok = c.data[id] + if !ok || e.isClosing { + c.mu.Unlock() + return + } + e.isClosing = true + e.close = make(chan struct{}) c.mu.Unlock() - if ok { - <-e.load - if e.value != nil { - err = e.value.Close() - } + + <-e.load + if e.value != nil { + err = e.value.Close() } + c.mu.Lock() + close(e.close) + delete(c.data, e.id) + c.mu.Unlock() + return } @@ -288,7 +312,7 @@ func (c *oCache) ForEach(f func(obj Object) (isContinue bool)) { for _, v := range c.data { select { case <-v.load: - if v.value != nil { + if v.value != nil && !v.isClosing { objects = append(objects, v.value) } default: @@ -323,18 +347,23 @@ func (c *oCache) GC() { } deadline := c.timeNow().Add(-c.ttl) var toClose []*entry - for k, e := range c.data { + for _, e := range c.data { + if e.isClosing { + continue + } lu := e.lastUsage if lug, ok := e.value.(ObjectLastUsage); ok { lu = lug.LastUsage() } if !e.locked() && e.refCount <= 0 && lu.Before(deadline) { - delete(c.data, k) + e.isClosing = true + e.close = make(chan struct{}) toClose = append(toClose, e) } } size := len(c.data) c.mu.Unlock() + c.log.Infof("GC: removed %d; cache size: %d", len(toClose), size) for _, e := range toClose { <-e.load @@ -344,6 +373,13 @@ func (c *oCache) GC() { } } } + + c.mu.Lock() + for _, e := range toClose { + close(e.close) + delete(c.data, e.id) + } + c.mu.Unlock() } func (c *oCache) Len() int {