Merge pull request #21 from anytypeio/fix-ocache-closing-deadlock
Fix ocache closing deadlock
This commit is contained in:
commit
95b7927a4c
@ -176,9 +176,13 @@ Load:
|
|||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
if closing {
|
if closing {
|
||||||
<-e.close
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-e.close:
|
||||||
goto Load
|
goto Load
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if load {
|
if load {
|
||||||
go c.load(ctx, id, e)
|
go c.load(ctx, id, e)
|
||||||
|
|||||||
@ -5,18 +5,21 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type commonGetter struct {
|
type commonGetter struct {
|
||||||
treegetter.TreeGetter
|
treegetter.TreeGetter
|
||||||
spaceId string
|
spaceId string
|
||||||
reservedObjects []syncobjectgetter.SyncObject
|
reservedObjects []syncobjectgetter.SyncObject
|
||||||
|
spaceIsClosed *atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter {
|
func newCommonGetter(spaceId string, getter treegetter.TreeGetter, spaceIsClosed *atomic.Bool) *commonGetter {
|
||||||
return &commonGetter{
|
return &commonGetter{
|
||||||
TreeGetter: getter,
|
TreeGetter: getter,
|
||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
|
spaceIsClosed: spaceIsClosed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,6 +28,9 @@ func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
|
func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
|
||||||
|
if c.spaceIsClosed.Load() {
|
||||||
|
return nil, ErrSpaceClosed
|
||||||
|
}
|
||||||
if obj := c.getReservedObject(treeId); obj != nil {
|
if obj := c.getReservedObject(treeId); obj != nil {
|
||||||
return obj.(objecttree.ObjectTree), nil
|
return obj.(objecttree.ObjectTree), nil
|
||||||
}
|
}
|
||||||
@ -41,6 +47,9 @@ func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {
|
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {
|
||||||
|
if c.spaceIsClosed.Load() {
|
||||||
|
return nil, ErrSpaceClosed
|
||||||
|
}
|
||||||
if obj := c.getReservedObject(objectId); obj != nil {
|
if obj := c.getReservedObject(objectId); obj != nil {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -58,7 +58,7 @@ func NewHeadSync(
|
|||||||
l := log.With(zap.String("spaceId", spaceId))
|
l := log.With(zap.String("spaceId", spaceId))
|
||||||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||||
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
|
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
|
||||||
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
|
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
|
||||||
|
|
||||||
return &headSync{
|
return &headSync{
|
||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
|
|||||||
@ -127,7 +127,7 @@ type space struct {
|
|||||||
|
|
||||||
handleQueue multiqueue.MultiQueue[HandleMessage]
|
handleQueue multiqueue.MultiQueue[HandleMessage]
|
||||||
|
|
||||||
isClosed atomic.Bool
|
isClosed *atomic.Bool
|
||||||
treesUsed atomic.Int32
|
treesUsed atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
"github.com/anytypeio/any-sync/net/pool"
|
"github.com/anytypeio/any-sync/net/pool"
|
||||||
"github.com/anytypeio/any-sync/nodeconf"
|
"github.com/anytypeio/any-sync/nodeconf"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CName = "common.commonspace"
|
const CName = "common.commonspace"
|
||||||
@ -116,7 +117,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
lastConfiguration := s.configurationService.GetLast()
|
lastConfiguration := s.configurationService.GetLast()
|
||||||
getter := newCommonGetter(st.Id(), s.treeGetter)
|
var spaceIsClosed = &atomic.Bool{}
|
||||||
|
getter := newCommonGetter(st.Id(), s.treeGetter, spaceIsClosed)
|
||||||
syncStatus := syncstatus.NewNoOpSyncStatus()
|
syncStatus := syncstatus.NewNoOpSyncStatus()
|
||||||
// this will work only for clients, not the best solution, but...
|
// this will work only for clients, not the best solution, but...
|
||||||
if !lastConfiguration.IsResponsible(st.Id()) {
|
if !lastConfiguration.IsResponsible(st.Id()) {
|
||||||
@ -141,6 +143,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
configuration: lastConfiguration,
|
configuration: lastConfiguration,
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
storage: st,
|
storage: st,
|
||||||
|
isClosed: spaceIsClosed,
|
||||||
}
|
}
|
||||||
return sp, nil
|
return sp, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user