diff --git a/service/account/service.go b/service/account/service.go index b64a6a83..67adf192 100644 --- a/service/account/service.go +++ b/service/account/service.go @@ -26,7 +26,7 @@ func (s *service) Account() *account.AccountData { } type StaticAccount struct { - SigningKey string `yaml:"siginingKey"` + SigningKey string `yaml:"signingKey"` EncryptionKey string `yaml:"encryptionKey"` } diff --git a/service/sync/client/client.go b/service/sync/client/client.go new file mode 100644 index 00000000..8cc292de --- /dev/null +++ b/service/sync/client/client.go @@ -0,0 +1,56 @@ +package client + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" +) + +const CName = "SyncClient" + +type client struct { + handler requesthandler.RequestHandler +} + +func NewClient() app.Component { + return &client{} +} + +type Client interface { + NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error + RequestFullSync(id string, request *syncpb.SyncFullRequest) error + SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error +} + +func (c *client) Init(ctx context.Context, a *app.App) (err error) { + c.handler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) + return nil +} + +func (c *client) Name() (name string) { + return CName +} + +func (c *client) Run(ctx context.Context) (err error) { + return nil +} + +func (c *client) Close(ctx context.Context) (err error) { + return nil +} + +func (c *client) NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error { + //TODO implement me + panic("implement me") +} + +func (c *client) RequestFullSync(id string, request *syncpb.SyncFullRequest) error { + //TODO implement me + panic("implement me") +} + +func (c *client) SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error { + //TODO implement me + panic("implement me") +} diff --git a/service/sync/requesthandler.go b/service/sync/requesthandler.go deleted file mode 100644 index 349f59d3..00000000 --- a/service/sync/requesthandler.go +++ /dev/null @@ -1,86 +0,0 @@ -package sync - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" -) - -type requestHander struct { - treeCache treecache.Service - client SyncClient -} - -func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) { - var ( - fullRequest *syncpb.SyncFullRequest - snapshotPath []string - result acltree.AddResult - ) - - err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - result, err = tree.AddRawChanges(ctx, update.Changes...) - if err != nil { - return err - } - shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) - snapshotPath = tree.SnapshotPath() - if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree) - if err != nil { - return err - } - } - return nil - }) - // if there are no such tree - if err == treestorage.ErrUnknownTreeId { - fullRequest = &syncpb.SyncFullRequest{ - TreeId: update.TreeId, - } - } - // if we have incompatible heads, or we haven't seen the tree at all - if fullRequest != nil { - return r.client.RequestFullSync(senderId, fullRequest) - } - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - // otherwise sending heads update message - newUpdate := &syncpb.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - TreeId: update.TreeId, - } - err = r.client.NotifyHeadsChanged(newUpdate) - return -} - -func (r *requestHander) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { - // TODO: add case of new tree - return nil -} - -func (r *requestHander) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { - // TODO: add case of new tree - return nil -} - -func (r *requestHander) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { - ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) - if err != nil { - return nil, err - } - return &syncpb.SyncFullRequest{ - Heads: tree.Heads(), - Changes: ourChanges, - TreeId: treeId, - SnapshotPath: tree.SnapshotPath(), - }, nil -} diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go new file mode 100644 index 00000000..702cf909 --- /dev/null +++ b/service/sync/requesthandler/requesthandler.go @@ -0,0 +1,207 @@ +package requesthandler + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/client" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" +) + +type requestHandler struct { + treeCache treecache.Service + client client.Client +} + +func NewRequestHandler() app.Component { + return &requestHandler{} +} + +type RequestHandler interface { + HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) + HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) + HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) +} + +const CName = "SyncRequestHandler" + +func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) { + r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) + return nil +} + +func (r *requestHandler) Name() (name string) { + return CName +} + +func (r *requestHandler) Run(ctx context.Context) (err error) { + return nil +} + +func (r *requestHandler) Close(ctx context.Context) (err error) { + return nil +} + +func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) { + var ( + fullRequest *syncpb.SyncFullRequest + snapshotPath []string + result acltree.AddResult + ) + + err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { + // TODO: check if we already have those changes + result, err = tree.AddRawChanges(ctx, update.Changes...) + if err != nil { + return err + } + shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) + snapshotPath = tree.SnapshotPath() + if shouldFullSync { + fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree) + if err != nil { + return err + } + } + return nil + }) + // if there are no such tree + if err == treestorage.ErrUnknownTreeId { + fullRequest = &syncpb.SyncFullRequest{ + TreeId: update.TreeId, + } + } + // if we have incompatible heads, or we haven't seen the tree at all + if fullRequest != nil { + return r.client.RequestFullSync(senderId, fullRequest) + } + // if error or nothing has changed + if err != nil || len(result.Added) == 0 { + return err + } + // otherwise sending heads update message + newUpdate := &syncpb.SyncHeadUpdate{ + Heads: result.Heads, + Changes: result.Added, + SnapshotPath: snapshotPath, + TreeId: update.TreeId, + } + return r.client.NotifyHeadsChanged(newUpdate) +} + +func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) { + var ( + fullResponse *syncpb.SyncFullResponse + snapshotPath []string + result acltree.AddResult + ) + + err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { + // TODO: check if we already have those changes + result, err = tree.AddRawChanges(ctx, request.Changes...) + if err != nil { + return err + } + snapshotPath = tree.SnapshotPath() + fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + err = r.client.SendFullSyncResponse(senderId, fullResponse) + // if error or nothing has changed + if err != nil || len(result.Added) == 0 { + return err + } + + // otherwise sending heads update message + newUpdate := &syncpb.SyncHeadUpdate{ + Heads: result.Heads, + Changes: result.Added, + SnapshotPath: snapshotPath, + TreeId: request.TreeId, + } + return r.client.NotifyHeadsChanged(newUpdate) +} + +func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) { + var ( + snapshotPath []string + result acltree.AddResult + ) + + err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { + // TODO: check if we already have those changes + result, err = tree.AddRawChanges(ctx, request.Changes...) + if err != nil { + return err + } + snapshotPath = tree.SnapshotPath() + return nil + }) + if err != nil { + return err + } + // if error or nothing has changed + if err != nil || len(result.Added) == 0 { + return err + } + + // TODO: probably here we should not send an update message, because the other node had already sent it after updating with our data + // otherwise sending heads update message + newUpdate := &syncpb.SyncHeadUpdate{ + Heads: result.Heads, + Changes: result.Added, + SnapshotPath: snapshotPath, + TreeId: request.TreeId, + } + return r.client.NotifyHeadsChanged(newUpdate) +} + +func (r *requestHandler) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { + ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) + if err != nil { + return nil, err + } + return &syncpb.SyncFullRequest{ + Heads: tree.Heads(), + Changes: ourChanges, + TreeId: treeId, + SnapshotPath: tree.SnapshotPath(), + }, nil +} + +func (r *requestHandler) prepareFullSyncResponse(treeId string, theirPath []string, theirChanges []*aclpb.RawChange, tree acltree.ACLTree) (*syncpb.SyncFullResponse, error) { + // TODO: we can probably use the common snapshot calculated on the request step from previous peer + ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) + if err != nil { + return nil, err + } + theirMap := make(map[string]struct{}) + for _, ch := range theirChanges { + theirMap[ch.Id] = struct{}{} + } + + // filtering our changes, so we will not send the same changes back + var final []*aclpb.RawChange + for _, ch := range ourChanges { + if _, exists := theirMap[ch.Id]; exists { + final = append(final, ch) + } + } + + return &syncpb.SyncFullResponse{ + Heads: tree.Heads(), + Changes: final, + TreeId: treeId, + SnapshotPath: tree.SnapshotPath(), + }, nil +} diff --git a/service/sync/service.go b/service/sync/service.go deleted file mode 100644 index fd6839bb..00000000 --- a/service/sync/service.go +++ /dev/null @@ -1,43 +0,0 @@ -package sync - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" -) - -type service struct { -} - -const CName = "SyncService" - -func (s *service) Init(ctx context.Context, a *app.App) (err error) { - return nil -} - -func (s *service) Name() (name string) { - return CName -} - -func (s *service) Run(ctx context.Context) (err error) { - ch := make(chan *PubSubPayload) - err = s.pubSub.Listen(ch) - if err != nil { - return - } - return nil -} - -func (s *service) Close(ctx context.Context) (err error) { - return nil -} - -func (s *service) listen(ctx context.Context, ch chan *PubSubPayload) { - for { - select { - case <-ctx.Done(): - return - case payload := <-ch: - // TODO: get object from object service and try to perform sync - } - } -} diff --git a/service/sync/syncclient.go b/service/sync/syncclient.go deleted file mode 100644 index 95354982..00000000 --- a/service/sync/syncclient.go +++ /dev/null @@ -1,8 +0,0 @@ -package sync - -import "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" - -type SyncClient interface { - NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error - RequestFullSync(id string, request *syncpb.SyncFullRequest) error -}