Update cache logic to remove from entries on close

This commit is contained in:
mcrakhman 2023-03-09 20:25:01 +01:00 committed by Mikhail Iudin
parent 53b3e6e706
commit 7feadbfb80
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/util/slice"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
"time" "time"
@ -143,17 +142,6 @@ Load:
return e.waitLoad(ctx, id) return e.waitLoad(ctx, id)
} }
func (c *oCache) metricsGet(hit bool) {
if c.metrics == nil {
return
}
if hit {
c.metrics.hit.Inc()
} else {
c.metrics.miss.Inc()
}
}
func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) { func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) {
c.mu.Lock() c.mu.Lock()
val, ok := c.data[id] val, ok := c.data[id]
@ -191,13 +179,13 @@ func (c *oCache) Remove(id string) (ok bool, err error) {
e, ok := c.data[id] e, ok := c.data[id]
if !ok { if !ok {
c.mu.Unlock() c.mu.Unlock()
return return false, ErrNotExists
} }
c.mu.Unlock() c.mu.Unlock()
return c.remove(e, true) return c.remove(e)
} }
func (c *oCache) remove(e *entry, remData bool) (ok bool, err error) { func (c *oCache) remove(e *entry) (ok bool, err error) {
<-e.load <-e.load
if e.value == nil { if e.value == nil {
return false, ErrNotExists return false, ErrNotExists
@ -205,16 +193,11 @@ func (c *oCache) remove(e *entry, remData bool) (ok bool, err error) {
_, curState := e.setClosing(true) _, curState := e.setClosing(true)
if curState == entryStateClosing { if curState == entryStateClosing {
err = e.value.Close() err = e.value.Close()
c.mu.Lock()
e.setClosed() e.setClosed()
}
if !remData {
return
}
c.mu.Lock()
if curState == entryStateClosing {
delete(c.data, e.id) delete(c.data, e.id)
c.mu.Unlock()
} }
c.mu.Unlock()
return return
} }
@ -284,47 +267,35 @@ func (c *oCache) GC() {
deadline := c.timeNow().Add(-c.ttl) deadline := c.timeNow().Add(-c.ttl)
var toClose []*entry var toClose []*entry
for _, e := range c.data { for _, e := range c.data {
if e.getState() != entryStateActive { if e.isActive() && e.lastUsage.Before(deadline) {
continue
}
if e.lastUsage.Before(deadline) {
e.close = make(chan struct{}) e.close = make(chan struct{})
toClose = append(toClose, e) toClose = append(toClose, e)
} }
} }
size := len(c.data) size := len(c.data)
c.mu.Unlock() c.mu.Unlock()
closedNum := 0
for idx, e := range toClose { for _, e := range toClose {
prevState, _ := e.setClosing(false) prevState, _ := e.setClosing(false)
if prevState == entryStateClosing || prevState == entryStateClosed { if prevState == entryStateClosing || prevState == entryStateClosed {
toClose[idx] = nil
continue continue
} }
ok, err := e.value.TryClose() closed, err := e.value.TryClose()
if !ok {
e.setActive(true)
toClose[idx] = nil
continue
} else {
e.setClosed()
}
if err != nil { if err != nil {
c.log.With("object_id", e.id).Warnf("GC: object close error: %v", err) c.log.With("object_id", e.id).Warnf("GC: object close error: %v", err)
} }
if !closed {
e.setActive(true)
continue
} else {
closedNum++
c.mu.Lock()
e.setClosed()
delete(c.data, e.id)
c.mu.Unlock()
}
} }
toClose = slice.DiscardFromSlice(toClose, func(e *entry) bool { c.metricsClosed(closedNum, size)
return e == nil
})
c.log.Infof("GC: removed %d; cache size: %d", len(toClose), size)
if len(toClose) > 0 && c.metrics != nil {
c.metrics.gc.Add(float64(len(toClose)))
}
c.mu.Lock()
for _, e := range toClose {
delete(c.data, e.id)
}
c.mu.Unlock()
} }
func (c *oCache) Len() int { func (c *oCache) Len() int {
@ -347,9 +318,28 @@ func (c *oCache) Close() (err error) {
} }
c.mu.Unlock() c.mu.Unlock()
for _, e := range toClose { for _, e := range toClose {
if _, err := c.remove(e, false); err != ErrNotExists { if _, err := c.remove(e); err != ErrNotExists {
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err) c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err)
} }
} }
return nil return nil
} }
func (c *oCache) metricsGet(hit bool) {
if c.metrics == nil {
return
}
if hit {
c.metrics.hit.Inc()
} else {
c.metrics.miss.Inc()
}
}
func (c *oCache) metricsClosed(closedLen, size int) {
c.log.Infof("GC: removed %d; cache size: %d", closedLen, size)
if c.metrics == nil || closedLen == 0 {
return
}
c.metrics.gc.Add(float64(closedLen))
}