Change ocache to include wait for closing

This commit is contained in:
mcrakhman 2022-10-13 13:27:18 +02:00 committed by Mikhail Iudin
parent 003a8688d8
commit 00251a7439
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0

View File

@ -89,6 +89,8 @@ type entry struct {
load chan struct{}
loadErr error
value Object
isClosing bool
close chan struct{}
}
func (e *entry) locked() bool {
@ -148,6 +150,7 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
ok bool
load bool
)
Load:
c.mu.Lock()
if c.closed {
c.mu.Unlock()
@ -161,11 +164,18 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
}
c.data[id] = e
}
e.lastUsage = c.timeNow()
if !c.noRefCounter {
e.refCount++
closing := e.isClosing
if !e.isClosing {
e.lastUsage = c.timeNow()
if !c.noRefCounter {
e.refCount++
}
}
c.mu.Unlock()
if closing {
<-e.close
goto Load
}
if load {
go c.load(ctx, id, e)
@ -181,17 +191,18 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) {
c.mu.Lock()
val, ok := c.data[id]
c.mu.Unlock()
if !ok {
if !ok || val.isClosing {
c.mu.Unlock()
return nil, ErrNotExists
}
c.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-val.load:
return val.value, val.loadErr
}
<-val.load
return val.value, val.loadErr
}
func (c *oCache) load(ctx context.Context, id string, e *entry) {
@ -238,17 +249,30 @@ func (c *oCache) Reset(id string) bool {
func (c *oCache) Remove(id string) (ok bool, err error) {
c.mu.Lock()
e, ok := c.data[id]
if ok {
delete(c.data, id)
if c.closed {
c.mu.Unlock()
err = ErrClosed
return
}
var e *entry
e, ok = c.data[id]
if !ok || e.isClosing {
c.mu.Unlock()
return
}
e.isClosing = true
e.close = make(chan struct{})
c.mu.Unlock()
if ok {
<-e.load
if e.value != nil {
err = e.value.Close()
}
<-e.load
if e.value != nil {
err = e.value.Close()
}
c.mu.Lock()
close(e.close)
delete(c.data, e.id)
c.mu.Unlock()
return
}
@ -288,7 +312,7 @@ func (c *oCache) ForEach(f func(obj Object) (isContinue bool)) {
for _, v := range c.data {
select {
case <-v.load:
if v.value != nil {
if v.value != nil && !v.isClosing {
objects = append(objects, v.value)
}
default:
@ -323,18 +347,23 @@ func (c *oCache) GC() {
}
deadline := c.timeNow().Add(-c.ttl)
var toClose []*entry
for k, e := range c.data {
for _, e := range c.data {
if e.isClosing {
continue
}
lu := e.lastUsage
if lug, ok := e.value.(ObjectLastUsage); ok {
lu = lug.LastUsage()
}
if !e.locked() && e.refCount <= 0 && lu.Before(deadline) {
delete(c.data, k)
e.isClosing = true
e.close = make(chan struct{})
toClose = append(toClose, e)
}
}
size := len(c.data)
c.mu.Unlock()
c.log.Infof("GC: removed %d; cache size: %d", len(toClose), size)
for _, e := range toClose {
<-e.load
@ -344,6 +373,13 @@ func (c *oCache) GC() {
}
}
}
c.mu.Lock()
for _, e := range toClose {
close(e.close)
delete(c.data, e.id)
}
c.mu.Unlock()
}
func (c *oCache) Len() int {