diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index fc04468d..e53373b5 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -47,9 +47,8 @@ func (s *syncHandler) HandleHeadUpdate( treeId string) (err error) { var ( - fullRequest *spacesyncproto.ObjectFullSyncRequest - snapshotPath []string - result tree.AddResult + fullRequest *spacesyncproto.ObjectFullSyncRequest + result tree.AddResult // in case update changes are empty then we want to sync the whole tree sendChangesOnFullSync = len(update.Changes) == 0 ) @@ -76,7 +75,6 @@ func (s *syncHandler) HandleHeadUpdate( // if we couldn't add all the changes shouldFullSync := len(update.Changes) != len(result.Added) - snapshotPath = objTree.SnapshotPath() if shouldFullSync { fullRequest, err = s.prepareFullSyncRequest(objTree, sendChangesOnFullSync) if err != nil { @@ -94,18 +92,7 @@ func (s *syncHandler) HandleHeadUpdate( if fullRequest != nil { return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) } - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &spacesyncproto.ObjectHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return } func (s *syncHandler) HandleFullSyncRequest( @@ -148,10 +135,7 @@ func (s *syncHandler) HandleFullSyncResponse( header *aclpb.TreeHeader, treeId string) (err error) { - var ( - snapshotPath []string - result tree.AddResult - ) + var result tree.AddResult res, err := s.treeCache.GetTree(ctx, treeId) if err != nil { @@ -159,24 +143,21 @@ func (s *syncHandler) HandleFullSyncResponse( } err = func() error { - syncTree := res.Tree - syncTree.Lock() + objTree := res.Tree + objTree.Lock() defer res.Release() - defer syncTree.Unlock() + defer objTree.Unlock() // if we already have the heads for whatever reason - if slice.UnsortedEquals(response.Heads, syncTree.Heads()) { + if slice.UnsortedEquals(response.Heads, objTree.Heads()) { return nil } - // syncTree -> syncService: HeadUpdate() - // AddRawChanges -> syncTree.addRawChanges(); syncService.HeadUpdate() - result, err = syncTree.AddRawChanges(ctx, response.Changes...) + result, err = objTree.AddRawChanges(ctx, response.Changes...) if err != nil { return err } - snapshotPath = syncTree.SnapshotPath() return nil }() @@ -186,23 +167,9 @@ func (s *syncHandler) HandleFullSyncResponse( } // if we have a new tree if err == storage.ErrUnknownTreeId { - err = s.addTree(ctx, response, header, treeId) - if err != nil { - return err - } - result = tree.AddResult{ - OldHeads: []string{}, - Heads: response.Heads, - Added: response.Changes, - } + return s.addTree(ctx, response, header, treeId) } - // sending heads update message - newUpdate := &spacesyncproto.ObjectHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return } func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) { diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go new file mode 100644 index 00000000..8a948735 --- /dev/null +++ b/common/commonspace/synctree/synctree.go @@ -0,0 +1,151 @@ +package synctree + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" +) + +type SyncTree struct { + objTree tree.ObjectTree + syncService syncservice.SyncService +} + +func CreateSyncTree( + syncService syncservice.SyncService, + payload tree.ObjectTreeCreatePayload, + listener tree.ObjectTreeUpdateListener, + aclList list.ACLList, + createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { + t, err = tree.CreateObjectTree(payload, listener, aclList, createStorage) + if err != nil { + return + } + + // TODO: use context where it is needed + err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ + Heads: t.Heads(), + SnapshotPath: t.SnapshotPath(), + }) + return +} + +func BuildSyncTree( + syncService syncservice.SyncService, + treeStorage storage.TreeStorage, + listener tree.ObjectTreeUpdateListener, + aclList list.ACLList) (t tree.ObjectTree, err error) { + return buildSyncTree(syncService, treeStorage, listener, aclList) +} + +func buildSyncTree( + syncService syncservice.SyncService, + treeStorage storage.TreeStorage, + listener tree.ObjectTreeUpdateListener, + aclList list.ACLList) (t tree.ObjectTree, err error) { + t, err = tree.BuildObjectTree(treeStorage, listener, aclList) + if err != nil { + return + } + + // TODO: use context where it is needed + err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ + Heads: t.Heads(), + SnapshotPath: t.SnapshotPath(), + }) + return +} + +func (s *SyncTree) Lock() { + s.objTree.Lock() +} + +func (s *SyncTree) Unlock() { + s.objTree.Unlock() +} + +func (s *SyncTree) RLock() { + s.objTree.RLock() +} + +func (s *SyncTree) RUnlock() { + s.objTree.RUnlock() +} + +func (s *SyncTree) ID() string { + return s.objTree.ID() +} + +func (s *SyncTree) Header() *aclpb.TreeHeader { + return s.objTree.Header() +} + +func (s *SyncTree) Heads() []string { + return s.objTree.Heads() +} + +func (s *SyncTree) Root() *tree.Change { + return s.objTree.Root() +} + +func (s *SyncTree) HasChange(id string) bool { + return s.objTree.HasChange(id) +} + +func (s *SyncTree) Iterate(convert tree.ChangeConvertFunc, iterate tree.ChangeIterateFunc) error { + return s.objTree.Iterate(convert, iterate) +} + +func (s *SyncTree) IterateFrom(id string, convert tree.ChangeConvertFunc, iterate tree.ChangeIterateFunc) error { + return s.objTree.IterateFrom(id, convert, iterate) +} + +func (s *SyncTree) SnapshotPath() []string { + return s.objTree.SnapshotPath() +} + +func (s *SyncTree) ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawTreeChangeWithId, error) { + return s.objTree.ChangesAfterCommonSnapshot(snapshotPath, heads) +} + +func (s *SyncTree) Storage() storage.TreeStorage { + return s.objTree.Storage() +} + +func (s *SyncTree) DebugDump() (string, error) { + return s.objTree.DebugDump() +} + +func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeContent) (res tree.AddResult, err error) { + res, err = s.objTree.AddContent(ctx, content) + if err != nil { + return + } + err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ + Heads: res.Heads, + Changes: res.Added, + SnapshotPath: s.SnapshotPath(), + }) + return +} + +func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (res tree.AddResult, err error) { + res, err = s.objTree.AddRawChanges(ctx, changes...) + if err != nil || res.Mode == tree.Nothing { + return + } + err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ + Heads: res.Heads, + Changes: res.Added, + SnapshotPath: s.SnapshotPath(), + }) + return +} + +func (s *SyncTree) Close() error { + return s.objTree.Close() +} diff --git a/pkg/acl/tree/objecttree.go b/pkg/acl/tree/objecttree.go index 31365b87..186db5af 100644 --- a/pkg/acl/tree/objecttree.go +++ b/pkg/acl/tree/objecttree.go @@ -149,12 +149,6 @@ func (ot *objectTree) Storage() storage.TreeStorage { } func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) { - defer func() { - if err == nil && ot.updateListener != nil { - ot.updateListener.Update(ot) - } - }() - payload, err := ot.prepareBuilderContent(content) if err != nil { return