From 0d0bd82dc76b0b91908abe718893148e7367e9f7 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 3 Dec 2022 11:48:18 +0100 Subject: [PATCH] Simplify space locked solution --- common/commonspace/space.go | 62 ++++++-------- common/commonspace/synctree/synctree.go | 107 +++++++++++------------- util/cmd/debug/api/service.go | 17 ++++ 3 files changed, 94 insertions(+), 92 deletions(-) diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 98d434c2..4dc3db76 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -106,16 +106,6 @@ type space struct { treesUsed atomic.Int32 } -func (s *space) StartTree() { - s.treesUsed.Add(1) - log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree") -} - -func (s *space) CloseTree() { - s.treesUsed.Add(-1) - log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree") -} - func (s *space) LastUsage() time.Time { return s.syncService.LastUsage() } @@ -228,15 +218,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay return } deps := synctree.CreateDeps{ - SpaceId: s.id, - Payload: payload, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, - TreeUsageController: s, + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsage: &s.treesUsed, } return synctree.DeriveSyncTree(ctx, deps) } @@ -247,15 +237,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay return } deps := synctree.CreateDeps{ - SpaceId: s.id, - Payload: payload, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, - TreeUsageController: s, + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsage: &s.treesUsed, } return synctree.CreateSyncTree(ctx, deps) } @@ -266,14 +256,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene return } deps := synctree.BuildDeps{ - SpaceId: s.id, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, - TreeUsageController: s, + SpaceId: s.id, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsage: &s.treesUsed, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 4bdcb8d5..69fe04a4 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -3,6 +3,7 @@ package synctree import ( "context" "errors" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" @@ -16,6 +17,8 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "sync/atomic" ) var ( @@ -28,21 +31,16 @@ type SyncTree interface { synchandler.SyncHandler } -type TreeUsageController interface { - StartTree() - CloseTree() -} - // SyncTree sends head updates to sync service and also sends new changes to update listener type syncTree struct { tree.ObjectTree synchandler.SyncHandler - syncClient SyncClient - notifiable diffservice.HeadNotifiable - listener updatelistener.UpdateListener - usageController TreeUsageController - isClosed bool - isDeleted bool + syncClient SyncClient + notifiable diffservice.HeadNotifiable + listener updatelistener.UpdateListener + treeUsage *atomic.Int32 + isClosed bool + isDeleted bool } var log = logger.NewNamed("commonspace.synctree").Sugar() @@ -53,27 +51,27 @@ var buildObjectTree = tree.BuildObjectTree var createSyncClient = newSyncClient type CreateDeps struct { - SpaceId string - Payload tree.ObjectTreeCreatePayload - Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable - StreamPool syncservice.StreamPool - Listener updatelistener.UpdateListener - AclList list.ACLList - SpaceStorage spacestorage.SpaceStorage - TreeUsageController TreeUsageController + SpaceId string + Payload tree.ObjectTreeCreatePayload + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + StreamPool syncservice.StreamPool + Listener updatelistener.UpdateListener + AclList list.ACLList + SpaceStorage spacestorage.SpaceStorage + TreeUsage *atomic.Int32 } type BuildDeps struct { - SpaceId string - StreamPool syncservice.StreamPool - Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable - Listener updatelistener.UpdateListener - AclList list.ACLList - SpaceStorage spacestorage.SpaceStorage - TreeStorage storage.TreeStorage - TreeUsageController TreeUsageController + SpaceId string + StreamPool syncservice.StreamPool + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + Listener updatelistener.UpdateListener + AclList list.ACLList + SpaceStorage spacestorage.SpaceStorage + TreeStorage storage.TreeStorage + TreeUsage *atomic.Int32 } func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) { @@ -87,11 +85,11 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error sharedFactory, deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - usageController: deps.TreeUsageController, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + treeUsage: deps.TreeUsage, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -101,9 +99,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } - if syncTree.usageController != nil { - syncTree.usageController.StartTree() - } + syncTree.treeUsage.Add(1) headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -121,11 +117,11 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error GetRequestFactory(), deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - usageController: deps.TreeUsageController, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + treeUsage: deps.TreeUsage, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -136,9 +132,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } - if syncTree.usageController != nil { - syncTree.usageController.StartTree() - } + syncTree.treeUsage.Add(1) headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -188,6 +182,10 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t if err != nil { return } + if resp.GetContent().GetFullSyncResponse() == nil { + err = fmt.Errorf("expected to get full sync response, but got something else") + return + } fullSyncResp := resp.GetContent().GetFullSyncResponse() payload := storage.TreeStorageCreatePayload{ @@ -197,6 +195,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t } // basically building tree with in-memory storage and validating that it was without errors + log.With(zap.String("id", id)).Debug("validating tree") err = tree.ValidateRawTree(payload, deps.AclList) if err != nil { return @@ -220,11 +219,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy GetRequestFactory(), deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - usageController: deps.TreeUsageController, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + treeUsage: deps.TreeUsage, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -234,9 +233,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } - if syncTree.usageController != nil { - syncTree.usageController.StartTree() - } + syncTree.treeUsage.Add(1) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // here we will have different behaviour based on who is sending this update @@ -330,9 +327,7 @@ func (s *syncTree) Close() (err error) { if s.isClosed { return ErrSyncTreeClosed } - if s.usageController != nil { - s.usageController.CloseTree() - } + s.treeUsage.Add(-1) s.isClosed = true return } diff --git a/util/cmd/debug/api/service.go b/util/cmd/debug/api/service.go index 607509a1..4f7fc740 100644 --- a/util/cmd/debug/api/service.go +++ b/util/cmd/debug/api/service.go @@ -209,6 +209,23 @@ func (s *service) registerClientCommands() { } return }} + s.clientCommands["all-spaces"] = Command{Cmd: func(server peers.Peer, params []string) (res string, err error) { + if len(params) != 0 { + err = ErrIncorrectParamsCount + return + } + resp, err := client.AllSpaces(context.Background(), server.Address, &apiproto.AllSpacesRequest{}) + if err != nil { + return + } + for treeIdx, spaceId := range resp.SpaceIds { + res += spaceId + if treeIdx != len(resp.SpaceIds)-1 { + res += "\n" + } + } + return + }} } func (s *service) registerNodeCommands() {