From e05dc905e5db65de50ab21d16f2a0184c4bcf35e Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 14 Feb 2023 16:53:43 +0300 Subject: [PATCH] handleQueue thread close / onObjectClose method --- commonspace/object/tree/synctree/synctree.go | 10 ++++------ commonspace/space.go | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index c63dcc96..08bffb01 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -15,7 +15,6 @@ import ( "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "go.uber.org/zap" - "sync/atomic" ) var ( @@ -46,7 +45,7 @@ type syncTree struct { syncStatus syncstatus.StatusUpdater notifiable HeadNotifiable listener updatelistener.UpdateListener - treeUsage *atomic.Int32 + onClose func(id string) isClosed bool isDeleted bool } @@ -69,7 +68,7 @@ type BuildDeps struct { AclList list.AclList SpaceStorage spacestorage.SpaceStorage TreeStorage treestorage.TreeStorage - TreeUsage *atomic.Int32 + OnClose func(id string) SyncStatus syncstatus.StatusUpdater PeerGetter ResponsiblePeersGetter WaitTreeRemoteSync bool @@ -106,7 +105,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy ObjectTree: objTree, syncClient: syncClient, notifiable: deps.HeadNotifiable, - treeUsage: deps.TreeUsage, + onClose: deps.OnClose, listener: deps.Listener, syncStatus: deps.SyncStatus, } @@ -213,7 +212,7 @@ func (s *syncTree) Close() (err error) { if s.isClosed { return ErrSyncTreeClosed } - s.treeUsage.Add(-1) + s.onClose(s.Id()) s.isClosed = true return } @@ -239,7 +238,6 @@ func (s *syncTree) afterBuild() { if s.listener != nil { s.listener.Rebuild(s) } - s.treeUsage.Add(1) if s.notifiable != nil { s.notifiable.UpdateHeads(s.Id(), s.Heads()) } diff --git a/commonspace/space.go b/commonspace/space.go index b29cd513..7188b67a 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -128,7 +128,7 @@ type space struct { handleQueue multiqueue.MultiQueue[HandleMessage] isClosed *atomic.Bool - treesUsed atomic.Int32 + treesUsed *atomic.Int32 } func (s *space) LastUsage() time.Time { @@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea Listener: listener, AclList: s.aclList, SpaceStorage: s.storage, - TreeUsage: &s.treesUsed, + OnClose: func(id string) {}, SyncStatus: s.syncStatus, PeerGetter: s.peerManager, } @@ -326,12 +326,16 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t Listener: opts.Listener, AclList: s.aclList, SpaceStorage: s.storage, - TreeUsage: &s.treesUsed, + OnClose: s.onObjectClose, SyncStatus: s.syncStatus, WaitTreeRemoteSync: opts.WaitTreeRemoteSync, PeerGetter: s.peerManager, } - return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) + if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { + return nil, err + } + s.treesUsed.Add(1) + return } func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { @@ -392,6 +396,11 @@ func (s *space) handleMessage(msg HandleMessage) { } } +func (s *space) onObjectClose(id string) { + s.treesUsed.Add(-1) + _ = s.handleQueue.CloseThread(id) +} + func (s *space) Close() error { if s.isClosed.Swap(true) { log.Warn("call space.Close on closed space", zap.String("id", s.id))