diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 3d1a9a24..98d434c2 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -18,6 +18,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/zeebo/errs" @@ -65,6 +66,9 @@ func NewSpaceId(id string, repKey uint64) string { } type Space interface { + ocache.ObjectLocker + ocache.ObjectLastUsage + Id() string Init(ctx context.Context) error @@ -98,13 +102,29 @@ type space struct { configuration nodeconf.Configuration settingsDocument settingsdocument.SettingsDocument - isClosed atomic.Bool + isClosed atomic.Bool + treesUsed atomic.Int32 +} + +func (s *space) StartTree() { + s.treesUsed.Add(1) + log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree") +} + +func (s *space) CloseTree() { + s.treesUsed.Add(-1) + log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree") } func (s *space) LastUsage() time.Time { return s.syncService.LastUsage() } +func (s *space) Locked() bool { + log.With(zap.Bool("locked", s.treesUsed.Load() > 1)).Debug("space lock status check") + return s.treesUsed.Load() > 1 +} + func (s *space) Id() string { return s.id } @@ -131,6 +151,7 @@ func (s *space) Description() (desc SpaceDescription, err error) { } func (s *space) Init(ctx context.Context) (err error) { + log.With(zap.String("spaceId", s.id)).Debug("initializing space") s.storage = newCommonStorage(s.storage) header, err := s.storage.SpaceHeader() @@ -207,14 +228,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay return } deps := synctree.CreateDeps{ - SpaceId: s.id, - Payload: payload, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsageController: s, } return synctree.DeriveSyncTree(ctx, deps) } @@ -225,14 +247,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay return } deps := synctree.CreateDeps{ - SpaceId: s.id, - Payload: payload, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsageController: s, } return synctree.CreateSyncTree(ctx, deps) } @@ -243,13 +266,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene return } deps := synctree.BuildDeps{ - SpaceId: s.id, - StreamPool: s.syncService.StreamPool(), - Configuration: s.configuration, - HeadNotifiable: s.diffService, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, + SpaceId: s.id, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + SpaceStorage: s.storage, + TreeUsageController: s, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 983f5698..1dfd9243 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -9,6 +9,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" + "go.uber.org/zap" "time" ) @@ -36,6 +37,7 @@ type syncService struct { stopStreamLoop context.CancelFunc connector nodeconf.ConfConnector streamLoopDone chan struct{} + log *zap.SugaredLogger // TODO: change to logger } func NewSyncService( @@ -62,6 +64,7 @@ func newSyncService( connector: connector, clientFactory: clientFactory, spaceId: spaceId, + log: log.With(zap.String("id", spaceId)), streamLoopDone: make(chan struct{}), } } @@ -83,6 +86,7 @@ func (s *syncService) LastUsage() time.Time { } func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + s.log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId)).Debug("handling message") obj, err := s.objectGetter.GetObject(ctx, message.ObjectId) if err != nil { return @@ -93,18 +97,21 @@ func (s *syncService) HandleMessage(ctx context.Context, senderId string, messag func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { defer close(s.streamLoopDone) checkResponsiblePeers := func() { + s.log.Debug("dialing responsible peers") respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId) if err != nil { + s.log.Error("failed to dial peers", zap.Error(err)) return } for _, p := range respPeers { if s.streamPool.HasActiveStream(p.Id()) { + s.log.Debug("has active stream for", zap.String("id", p.Id())) continue } stream, err := s.clientFactory.Client(p).Stream(ctx) if err != nil { err = rpcerr.Unwrap(err) - log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err) + s.log.Errorf("failed to open stream: %v", err) // so here probably the request is failed because there is no such space, // but diffService should handle such cases by sending pushSpace continue @@ -113,9 +120,10 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) if err != nil { err = rpcerr.Unwrap(err) - log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) + s.log.Errorf("failed to send first message to stream: %v", err) continue } + s.log.Debug("continue reading stream for", zap.String("id", p.Id())) s.streamPool.AddAndReadStreamAsync(stream) } } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index fbc643ac..4bdcb8d5 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -28,15 +28,21 @@ type SyncTree interface { synchandler.SyncHandler } +type TreeUsageController interface { + StartTree() + CloseTree() +} + // SyncTree sends head updates to sync service and also sends new changes to update listener type syncTree struct { tree.ObjectTree synchandler.SyncHandler - syncClient SyncClient - notifiable diffservice.HeadNotifiable - listener updatelistener.UpdateListener - isClosed bool - isDeleted bool + syncClient SyncClient + notifiable diffservice.HeadNotifiable + listener updatelistener.UpdateListener + usageController TreeUsageController + isClosed bool + isDeleted bool } var log = logger.NewNamed("commonspace.synctree").Sugar() @@ -47,25 +53,27 @@ var buildObjectTree = tree.BuildObjectTree var createSyncClient = newSyncClient type CreateDeps struct { - SpaceId string - Payload tree.ObjectTreeCreatePayload - Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable - StreamPool syncservice.StreamPool - Listener updatelistener.UpdateListener - AclList list.ACLList - SpaceStorage spacestorage.SpaceStorage + SpaceId string + Payload tree.ObjectTreeCreatePayload + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + StreamPool syncservice.StreamPool + Listener updatelistener.UpdateListener + AclList list.ACLList + SpaceStorage spacestorage.SpaceStorage + TreeUsageController TreeUsageController } type BuildDeps struct { - SpaceId string - StreamPool syncservice.StreamPool - Configuration nodeconf.Configuration - HeadNotifiable diffservice.HeadNotifiable - Listener updatelistener.UpdateListener - AclList list.ACLList - SpaceStorage spacestorage.SpaceStorage - TreeStorage storage.TreeStorage + SpaceId string + StreamPool syncservice.StreamPool + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + Listener updatelistener.UpdateListener + AclList list.ACLList + SpaceStorage spacestorage.SpaceStorage + TreeStorage storage.TreeStorage + TreeUsageController TreeUsageController } func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) { @@ -79,10 +87,11 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error sharedFactory, deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + usageController: deps.TreeUsageController, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -92,6 +101,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } + if syncTree.usageController != nil { + syncTree.usageController.StartTree() + } headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -109,10 +121,11 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error GetRequestFactory(), deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + usageController: deps.TreeUsageController, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -123,6 +136,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } + if syncTree.usageController != nil { + syncTree.usageController.StartTree() + } headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -204,10 +220,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy GetRequestFactory(), deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + usageController: deps.TreeUsageController, + listener: deps.Listener, } syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler @@ -217,6 +234,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if syncTree.listener != nil { syncTree.listener.Rebuild(syncTree) } + if syncTree.usageController != nil { + syncTree.usageController.StartTree() + } headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // here we will have different behaviour based on who is sending this update @@ -310,6 +330,9 @@ func (s *syncTree) Close() (err error) { if s.isClosed { return ErrSyncTreeClosed } + if s.usageController != nil { + s.usageController.CloseTree() + } s.isClosed = true return }