diff --git a/common/commonspace/cache/treecache.go b/common/commonspace/cache/treecache.go index 9d688f50..6703d004 100644 --- a/common/commonspace/cache/treecache.go +++ b/common/commonspace/cache/treecache.go @@ -3,7 +3,6 @@ package cache import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" ) @@ -21,5 +20,4 @@ type TreeResult struct { type TreeCache interface { app.ComponentRunnable GetTree(ctx context.Context, id string) (TreeResult, error) - AddTree(ctx context.Context, payload storage.TreeStorageCreatePayload) error } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 91657bb7..1d6e981f 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -22,8 +22,8 @@ type Space interface { SyncService() syncservice.SyncService DiffService() diffservice.DiffService - CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) - BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) + CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error) + BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (tree.ObjectTree, error) Close() error } @@ -65,11 +65,11 @@ func (s *space) DiffService() diffservice.DiffService { return s.diffService } -func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) { - return synctree.CreateSyncTree(payload, s.syncService, listener, nil, s.storage.CreateTreeStorage) +func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error) { + return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, nil, s.storage.CreateTreeStorage) } -func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (t tree.ObjectTree, err error) { +func (s *space) BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (t tree.ObjectTree, err error) { getTreeRemote := func() (*spacesyncproto.ObjectSyncMessage, error) { // TODO: add empty context handling (when this is not happening due to head update) peerId, err := syncservice.GetPeerIdFromStreamContext(ctx) @@ -115,7 +115,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr return } } - return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) + return synctree.BuildSyncTree(ctx, s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) } func (s *space) getObjectIds() []string { diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 65197270..0e2626e9 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -120,6 +120,7 @@ Loop: delete(s.peerStreams, id) continue Loop default: + break } streams = append(streams, stream) } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index d9dcb4e4..e45c3ffa 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -10,24 +10,35 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" ) +type UpdateListener interface { + Update(tree tree.ObjectTree) + Rebuild(tree tree.ObjectTree) +} + type SyncTree struct { tree.ObjectTree syncService syncservice.SyncService + listener UpdateListener } func CreateSyncTree( + ctx context.Context, payload tree.ObjectTreeCreatePayload, syncService syncservice.SyncService, - listener tree.ObjectTreeUpdateListener, + listener UpdateListener, aclList list.ACLList, createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { - t, err = tree.CreateObjectTree(payload, listener, aclList, createStorage) + t, err = tree.CreateObjectTree(payload, aclList, createStorage) if err != nil { return } + t = &SyncTree{ + ObjectTree: t, + syncService: syncService, + listener: listener, + } - // TODO: use context where it is needed - err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ + err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ Heads: t.Heads(), SnapshotPath: t.SnapshotPath(), }) @@ -35,25 +46,31 @@ func CreateSyncTree( } func BuildSyncTree( + ctx context.Context, syncService syncservice.SyncService, treeStorage storage.TreeStorage, - listener tree.ObjectTreeUpdateListener, + listener UpdateListener, aclList list.ACLList) (t tree.ObjectTree, err error) { - return buildSyncTree(syncService, treeStorage, listener, aclList) + return buildSyncTree(ctx, syncService, treeStorage, listener, aclList) } func buildSyncTree( + ctx context.Context, syncService syncservice.SyncService, treeStorage storage.TreeStorage, - listener tree.ObjectTreeUpdateListener, + listener UpdateListener, aclList list.ACLList) (t tree.ObjectTree, err error) { - t, err = tree.BuildObjectTree(treeStorage, listener, aclList) + t, err = tree.BuildObjectTree(treeStorage, aclList) if err != nil { return } + t = &SyncTree{ + ObjectTree: t, + syncService: syncService, + listener: listener, + } - // TODO: use context where it is needed - err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ + err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ Heads: t.Heads(), SnapshotPath: t.SnapshotPath(), }) @@ -75,9 +92,18 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (res tree.AddResult, err error) { res, err = s.AddRawChanges(ctx, changes...) - if err != nil || res.Mode == tree.Nothing { + if err != nil { return } + switch res.Mode { + case tree.Nothing: + return + case tree.Append: + s.listener.Update(s) + case tree.Rebuild: + s.listener.Rebuild(s) + } + err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ Heads: res.Heads, Changes: res.Added, @@ -85,3 +111,7 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeC }) return } + +func (s *SyncTree) Tree() tree.ObjectTree { + return s +} diff --git a/pkg/acl/tree/objecttree.go b/pkg/acl/tree/objecttree.go index 186db5af..c2b3e0b9 100644 --- a/pkg/acl/tree/objecttree.go +++ b/pkg/acl/tree/objecttree.go @@ -11,11 +11,6 @@ import ( "sync" ) -type ObjectTreeUpdateListener interface { - Update(tree ObjectTree) - Rebuild(tree ObjectTree) -} - type RWLocker interface { sync.Locker RLock() @@ -71,7 +66,6 @@ type objectTree struct { rawChangeLoader *rawChangeLoader treeBuilder *treeBuilder aclList list.ACLList - updateListener ObjectTreeUpdateListener id string header *aclpb.TreeHeader @@ -94,7 +88,6 @@ type objectTreeDeps struct { changeBuilder ChangeBuilder treeBuilder *treeBuilder treeStorage storage.TreeStorage - updateListener ObjectTreeUpdateListener validator ObjectTreeValidator rawChangeLoader *rawChangeLoader aclList list.ACLList @@ -102,7 +95,6 @@ type objectTreeDeps struct { func defaultObjectTreeDeps( treeStorage storage.TreeStorage, - listener ObjectTreeUpdateListener, aclList list.ACLList) objectTreeDeps { keychain := common.NewKeychain() @@ -112,7 +104,6 @@ func defaultObjectTreeDeps( changeBuilder: changeBuilder, treeBuilder: treeBuilder, treeStorage: treeStorage, - updateListener: listener, validator: newTreeValidator(), rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), aclList: aclList, @@ -211,8 +202,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt } func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (addResult AddResult, err error) { - var mode Mode - mode, addResult, err = ot.addRawChanges(ctx, rawChanges...) + addResult, err = ot.addRawChanges(ctx, rawChanges...) if err != nil { return } @@ -230,26 +220,10 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.Ra // setting heads err = ot.treeStorage.SetHeads(ot.tree.Heads()) - if err != nil { - return - } - - if ot.updateListener == nil { - return - } - - switch mode { - case Append: - ot.updateListener.Update(ot) - case Rebuild: - ot.updateListener.Rebuild(ot) - default: - break - } return } -func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (mode Mode, addResult AddResult, err error) { +func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (addResult AddResult, err error) { // resetting buffers ot.tmpChangesBuf = ot.tmpChangesBuf[:0] ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0] diff --git a/pkg/acl/tree/objecttreefactory.go b/pkg/acl/tree/objecttreefactory.go index a62c8add..44dc1fb5 100644 --- a/pkg/acl/tree/objecttreefactory.go +++ b/pkg/acl/tree/objecttreefactory.go @@ -19,19 +19,18 @@ type ObjectTreeCreatePayload struct { TreeType aclpb.TreeHeaderType } -func BuildObjectTree(treeStorage storage.TreeStorage, listener ObjectTreeUpdateListener, aclList list.ACLList) (ObjectTree, error) { - deps := defaultObjectTreeDeps(treeStorage, listener, aclList) +func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) { + deps := defaultObjectTreeDeps(treeStorage, aclList) return buildObjectTree(deps) } func CreateObjectTree( payload ObjectTreeCreatePayload, - listener ObjectTreeUpdateListener, aclList list.ACLList, createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { aclList.RLock() var ( - deps = defaultObjectTreeDeps(nil, listener, aclList) + deps = defaultObjectTreeDeps(nil, aclList) state = aclList.ACLState() aclId = aclList.ID() aclHeadId = aclList.Head().Id @@ -91,7 +90,6 @@ func CreateObjectTree( func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { objTree := &objectTree{ treeStorage: deps.treeStorage, - updateListener: deps.updateListener, treeBuilder: deps.treeBuilder, validator: deps.validator, aclList: deps.aclList, diff --git a/service/sync/message/service.go b/service/sync/message/service.go deleted file mode 100644 index 6e1ee015..00000000 --- a/service/sync/message/service.go +++ /dev/null @@ -1,125 +0,0 @@ -package message - -import ( - "context" - "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - pool2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/gogo/protobuf/proto" - "sync" - "time" -) - -var log = logger.NewNamed("messageservice") - -const CName = "MessageService" - -type service struct { - nodes []*node.Node - requestHandler requesthandler.RequestHandler - pool pool2.Pool - sync.RWMutex -} - -func New() app.Component { - return &service{} -} - -type Service interface { - SendMessageAsync(peerId string, msg *syncproto.Sync) error - SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error -} - -func (s *service) Init(a *app.App) (err error) { - s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) - s.nodes = a.MustComponent(node.CName).(node.Service).Nodes() - s.pool = a.MustComponent(pool2.CName).(pool2.Pool) - s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage) - return nil -} - -func (s *service) Name() (name string) { - return CName -} - -func (s *service) Run(ctx context.Context) (err error) { - return nil -} - -func (s *service) Close(ctx context.Context) (err error) { - return nil -} - -func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) { - defer func() { - if err != nil { - msg.AckError(syncproto.System_Error_UNKNOWN, err.Error()) - } else { - msg.Ack() - } - }() - - syncMsg := &syncproto.Sync{} - err = proto.Unmarshal(msg.Data, syncMsg) - if err != nil { - return - } - - timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - err = s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg) - return -} - -func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) { - _, err = s.pool.DialAndAddPeer(context.Background(), peerId) - if err != nil { - return - } - - marshalled, err := proto.Marshal(msg) - if err != nil { - return - } - - go s.sendAsync(peerId, msgInfo(msg), marshalled) - return -} - -func (s *service) SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error { - for _, rp := range s.nodes { - s.SendMessageAsync(rp.PeerId, msg) - } - return nil -} - -func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - return s.pool.SendAndWait(ctx, peerId, &syncproto.Message{ - Header: &syncproto.Header{ - Type: syncproto.MessageType_MessageTypeSync, - DebugInfo: msgTypeStr, - }, - Data: marshalled, - }) -} - -func msgInfo(content *syncproto.Sync) (syncMethod string) { - msg := content.GetMessage() - switch { - case msg.GetFullSyncRequest() != nil: - syncMethod = "FullSyncRequest" - case msg.GetFullSyncResponse() != nil: - syncMethod = "FullSyncResponse" - case msg.GetHeadUpdate() != nil: - syncMethod = "HeadUpdate" - } - syncMethod = fmt.Sprintf("method: %s, treeType: %s", syncMethod, content.TreeHeader.DocType.String()) - return -} diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go deleted file mode 100644 index 9c17171f..00000000 --- a/service/sync/requesthandler/requesthandler.go +++ /dev/null @@ -1,291 +0,0 @@ -package requesthandler - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" - "go.uber.org/zap" -) - -type requestHandler struct { - treeCache treecache.Service - account account.Service - messageService MessageSender -} - -var log = logger.NewNamed("requesthandler") - -func New() app.Component { - return &requestHandler{} -} - -type RequestHandler interface { - HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error) -} - -type MessageSender interface { - SendMessageAsync(peerId string, msg *syncproto.Sync) error - SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error -} - -const CName = "SyncRequestHandler" - -func (r *requestHandler) Init(a *app.App) (err error) { - r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) - r.account = a.MustComponent(account.CName).(account.Service) - r.messageService = a.MustComponent("MessageService").(MessageSender) - return nil -} - -func (r *requestHandler) Name() (name string) { - return CName -} - -func (r *requestHandler) Run(ctx context.Context) (err error) { - return nil -} - -func (r *requestHandler) Close(ctx context.Context) (err error) { - return nil -} - -func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error { - msg := content.GetMessage() - switch { - case msg.GetFullSyncRequest() != nil: - return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId()) - case msg.GetFullSyncResponse() != nil: - return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId()) - case msg.GetHeadUpdate() != nil: - return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId()) - } - return nil -} - -func (r *requestHandler) HandleHeadUpdate( - ctx context.Context, - senderId string, - update *syncproto.SyncHeadUpdate, - header *aclpb.Header, - treeId string) (err error) { - - var ( - fullRequest *syncproto.SyncFullRequest - snapshotPath []string - result tree.AddResult - ) - log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). - Debug("processing head update") - - err = r.treeCache.Do(ctx, treeId, func(obj any) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - if slice.UnsortedEquals(update.Heads, objTree.Heads()) { - return nil - } - - result, err = objTree.AddRawChanges(ctx, update.Changes...) - if err != nil { - return err - } - - // if we couldn't add all the changes - shouldFullSync := len(update.Changes) != len(result.Added) - snapshotPath = objTree.SnapshotPath() - if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(objTree) - if err != nil { - return err - } - } - return nil - }) - - // if there are no such tree - if err == storage.ErrUnknownTreeId { - fullRequest = &syncproto.SyncFullRequest{} - } - // if we have incompatible heads, or we haven't seen the tree at all - if fullRequest != nil { - return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId)) - } - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) -} - -func (r *requestHandler) HandleFullSyncRequest( - ctx context.Context, - senderId string, - request *syncproto.SyncFullRequest, - header *aclpb.Header, - treeId string) (err error) { - - var fullResponse *syncproto.SyncFullResponse - err = r.treeCache.Do(ctx, treeId, func(obj any) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) - if err != nil { - return err - } - return nil - }) - - if err != nil { - return err - } - return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) -} - -func (r *requestHandler) HandleFullSyncResponse( - ctx context.Context, - senderId string, - response *syncproto.SyncFullResponse, - header *aclpb.Header, - treeId string) (err error) { - - var ( - snapshotPath []string - result tree.AddResult - ) - - err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - // if we already have the heads for whatever reason - if slice.UnsortedEquals(response.Heads, objTree.Heads()) { - return nil - } - - result, err = objTree.AddRawChanges(ctx, response.Changes...) - if err != nil { - return err - } - snapshotPath = objTree.SnapshotPath() - return nil - }) - - // if error or nothing has changed - if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId { - return err - } - // if we have a new tree - if err == storage.ErrUnknownTreeId { - err = r.createTree(ctx, response, header, treeId) - if err != nil { - return err - } - result = tree.AddResult{ - OldHeads: []string{}, - Heads: response.Heads, - Added: response.Changes, - } - } - // sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) -} - -func (r *requestHandler) HandleACLList( - ctx context.Context, - senderId string, - req *syncproto.SyncACLList, - header *aclpb.Header, - id string) (err error) { - - err = r.treeCache.Do(ctx, id, func(obj interface{}) error { - return nil - }) - // do nothing if already added - if err == nil { - return nil - } - // if not found then add to storage - if err == storage.ErrUnknownTreeId { - return r.createACLList(ctx, req, header, id) - } - return err -} - -func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { - return &syncproto.SyncFullRequest{ - Heads: t.Heads(), - SnapshotPath: t.SnapshotPath(), - }, nil -} - -func (r *requestHandler) prepareFullSyncResponse( - treeId string, - theirPath, theirHeads []string, - t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads) - if err != nil { - return nil, err - } - - return &syncproto.SyncFullResponse{ - Heads: t.Heads(), - Changes: ourChanges, - SnapshotPath: t.SnapshotPath(), - }, nil -} - -func (r *requestHandler) createTree( - ctx context.Context, - response *syncproto.SyncFullResponse, - header *aclpb.Header, - treeId string) error { - - return r.treeCache.Add( - ctx, - treeId, - storage.TreeStorageCreatePayload{ - TreeId: treeId, - Header: header, - Changes: response.Changes, - Heads: response.Heads, - }) -} - -func (r *requestHandler) createACLList( - ctx context.Context, - req *syncproto.SyncACLList, - header *aclpb.Header, - treeId string) error { - - return r.treeCache.Add( - ctx, - treeId, - storage.ACLListStorageCreatePayload{ - ListId: treeId, - Header: header, - Records: req.Records, - }) -}