From 5df4f13938882eeea6d3f0aab7e8f89d47d2613f Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 17 Dec 2022 16:55:15 +0100 Subject: [PATCH] Add preliminary interfaces for syncstatus --- common/commonspace/diffservice/diffsyncer.go | 16 ++++++++++++++-- .../commonspace/statusservice/statusservice.go | 10 ++++++++++ common/commonspace/synctree/synctree.go | 17 +++++++++++------ common/commonspace/synctree/synctreehandler.go | 14 ++++++++++---- common/pkg/acl/treechangeproto/treechange.go | 14 ++++++++++++++ util/cmd/debug/commands/scripts.go | 2 +- 6 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 common/commonspace/statusservice/statusservice.go diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 49947c0a..962e5cac 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -5,6 +5,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" @@ -51,6 +52,7 @@ type diffSyncer struct { clientFactory spacesyncproto.ClientFactory log *zap.Logger deletionState deletionstate.DeletionState + statusService statusservice.StatusService } func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) { @@ -91,8 +93,14 @@ func (d *diffSyncer) Sync(ctx context.Context) error { } func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { - cl := d.clientFactory.Client(p) - rdiff := remotediff.NewRemoteDiff(d.spaceId, cl) + var ( + cl = d.clientFactory.Client(p) + rdiff = remotediff.NewRemoteDiff(d.spaceId, cl) + stateCounter uint64 = 0 + ) + if d.statusService != nil { + stateCounter = d.statusService.StateCounter() + } newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) err = rpcerr.Unwrap(err) if err != nil && err != spacesyncproto.ErrSpaceMissing { @@ -105,6 +113,10 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) // not syncing ids which were removed through settings document filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) + if d.statusService != nil { + d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter) + } + ctx = peer.CtxWithPeerId(ctx, p.Id()) d.pingTreesInCache(ctx, filteredIds) diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go new file mode 100644 index 00000000..0d79c315 --- /dev/null +++ b/common/commonspace/statusservice/statusservice.go @@ -0,0 +1,10 @@ +package statusservice + +type StatusService interface { + HeadsChange(treeId string, heads []string) + HeadsReceive(senderId, treeId string, heads []string) + Watch(treeId string, ch chan struct{}) + Unwatch(treeId string) + StateCounter() uint64 + RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) +} diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 221e71e3..ddbdd19e 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" @@ -39,12 +40,13 @@ type SyncTree interface { type syncTree struct { tree.ObjectTree synchandler.SyncHandler - syncClient SyncClient - notifiable HeadNotifiable - listener updatelistener.UpdateListener - treeUsage *atomic.Int32 - isClosed bool - isDeleted bool + syncClient SyncClient + statusService statusservice.StatusService + notifiable HeadNotifiable + listener updatelistener.UpdateListener + treeUsage *atomic.Int32 + isClosed bool + isDeleted bool } var log = logger.NewNamed("commonspace.synctree").Sugar() @@ -241,6 +243,9 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo if s.notifiable != nil { s.notifiable.UpdateHeads(s.ID(), res.Heads) } + if s.statusService != nil { + s.statusService.HeadsChange(s.ID(), res.Heads) + } headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) err = s.syncClient.BroadcastAsync(headUpdate) return diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 923965c4..96b6bdcb 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -3,6 +3,7 @@ package synctree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" @@ -13,10 +14,11 @@ import ( ) type syncTreeHandler struct { - objTree tree.ObjectTree - syncClient SyncClient - handlerLock sync.Mutex - queue ReceiveQueue + objTree tree.ObjectTree + syncClient SyncClient + statusService statusservice.StatusService + handlerLock sync.Mutex + queue ReceiveQueue } const maxQueueSize = 5 @@ -37,6 +39,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms return } + if s.statusService != nil { + s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) + } + queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId) if queueFull { return diff --git a/common/pkg/acl/treechangeproto/treechange.go b/common/pkg/acl/treechangeproto/treechange.go index 9e4f1854..3aba1fb0 100644 --- a/common/pkg/acl/treechangeproto/treechange.go +++ b/common/pkg/acl/treechangeproto/treechange.go @@ -35,3 +35,17 @@ func WrapError(err error, rootChange *RawTreeChangeWithId) *TreeSyncMessage { RootChange: rootChange, } } + +func GetHeads(msg *TreeSyncMessage) (heads []string) { + content := msg.GetContent() + switch { + case content.GetHeadUpdate() != nil: + return content.GetHeadUpdate().Heads + case content.GetFullSyncRequest() != nil: + return content.GetFullSyncRequest().Heads + case content.GetFullSyncResponse() != nil: + return content.GetFullSyncResponse().Heads + default: + return nil + } +} diff --git a/util/cmd/debug/commands/scripts.go b/util/cmd/debug/commands/scripts.go index 02fb6c55..5f1c14f4 100644 --- a/util/cmd/debug/commands/scripts.go +++ b/util/cmd/debug/commands/scripts.go @@ -48,7 +48,7 @@ func (s *service) registerScripts() { SpaceId: space, DocumentId: document, Text: args[0], - IsSnapshot: rand.Int()%2 == 0, + IsSnapshot: rand.Int()%10 == 0, }) if err != nil { mError.Add(err)