diff --git a/pkg/acl/acltree/acltree.go b/pkg/acl/acltree/acltree.go index 078feb92..0cfcac3f 100644 --- a/pkg/acl/acltree/acltree.go +++ b/pkg/acl/acltree/acltree.go @@ -50,7 +50,7 @@ type ACLTree interface { ID() string Header() *treepb.TreeHeader ACLState() *ACLState - AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error) + AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*aclpb.RawChange, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error) Heads() []string Root() *Change @@ -59,6 +59,7 @@ type ACLTree interface { HasChange(string) bool SnapshotPath() []string ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) + Storage() treestorage.TreeStorage Close() error } @@ -239,7 +240,11 @@ func (a *aclTree) ACLState() *ACLState { return a.aclState } -func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*Change, error) { +func (a *aclTree) Storage() treestorage.TreeStorage { + return a.treeStorage +} + +func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*aclpb.RawChange, error) { // TODO: add snapshot creation logic defer func() { // TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree) @@ -257,12 +262,13 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild return nil, err } a.fullTree.AddFast(ch) - - err = a.treeStorage.AddRawChange(&aclpb.RawChange{ + rawCh := &aclpb.RawChange{ Payload: marshalled, Signature: ch.Signature(), Id: ch.Id, - }) + } + + err = a.treeStorage.AddRawChange(rawCh) if err != nil { return nil, err } @@ -271,7 +277,7 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild if err != nil { return nil, err } - return ch, nil + return rawCh, nil } func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) { diff --git a/pkg/acl/acltree/change.go b/pkg/acl/acltree/change.go index 67eedf00..c768ef23 100644 --- a/pkg/acl/acltree/change.go +++ b/pkg/acl/acltree/change.go @@ -23,7 +23,6 @@ type Change struct { SnapshotId string IsSnapshot bool DecryptedDocumentChange []byte - Raw *aclpb.RawChange // this will not be present on all changes, we only need it sometimes Content *aclpb.ACLChange Sign []byte diff --git a/service/sync/document/service.go b/service/sync/document/service.go index 075db778..7b8e8f08 100644 --- a/service/sync/document/service.go +++ b/service/sync/document/service.go @@ -1 +1,156 @@ package document + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" + "github.com/gogo/protobuf/proto" +) + +var CName = "DocumentService" + +type service struct { + messageService message.Service + treeCache treecache.Service + account account.Service +} + +type Service interface { + UpdateDocument(ctx context.Context, id, text string) error + CreateDocument(ctx context.Context, text string) (string, error) +} + +func NewService() app.Component { + return &service{} +} + +func (s *service) Init(ctx context.Context, a *app.App) (err error) { + s.account = a.MustComponent(account.CName).(account.Service) + s.messageService = a.MustComponent(message.CName).(message.Service) + s.treeCache = a.MustComponent(treecache.CName).(treecache.Service) + return nil +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) Run(ctx context.Context) (err error) { + return nil +} + +func (s *service) Close(ctx context.Context) (err error) { + return nil +} + +func (s *service) UpdateDocument(ctx context.Context, id, text string) (err error) { + var ( + ch *aclpb.RawChange + header *treepb.TreeHeader + snapshotPath []string + heads []string + ) + + err = s.treeCache.Do(ctx, id, func(tree acltree.ACLTree) error { + ch, err = tree.AddContent(ctx, func(builder acltree.ChangeBuilder) error { + builder.AddChangeContent( + &testchangepb.PlainTextChangeData{ + Content: []*testchangepb.PlainTextChangeContent{ + createAppendTextChangeContent(text), + }, + }) + return nil + }) + if err != nil { + return err + } + + id = tree.ID() + heads = tree.Heads() + header = tree.Header() + snapshotPath = tree.SnapshotPath() + return nil + }) + if err != nil { + return err + } + + return s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{ + Heads: heads, + Changes: []*aclpb.RawChange{ch}, + TreeId: "", + SnapshotPath: snapshotPath, + TreeHeader: header, + })) +} + +func (s *service) CreateDocument(ctx context.Context, text string) (id string, err error) { + acc := s.account.Account() + var ( + ch *aclpb.RawChange + header *treepb.TreeHeader + snapshotPath []string + heads []string + ) + + err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error { + err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin) + if err != nil { + return err + } + builder.AddChangeContent(createInitialChangeContent(text)) + return nil + }, func(tree acltree.ACLTree) error { + id = tree.ID() + heads = tree.Heads() + header = tree.Header() + snapshotPath = tree.SnapshotPath() + ch, err = tree.Storage().GetChange(ctx, heads[0]) + if err != nil { + return err + } + + return nil + }) + if err != nil { + return "", err + } + + err = s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{ + Heads: heads, + Changes: []*aclpb.RawChange{ch}, + TreeId: "", + SnapshotPath: snapshotPath, + TreeHeader: header, + })) + if err != nil { + return "", err + } + return id, err +} + +func createInitialChangeContent(text string) proto.Marshaler { + return &testchangepb.PlainTextChangeData{ + Content: []*testchangepb.PlainTextChangeContent{ + createAppendTextChangeContent(text), + }, + Snapshot: &testchangepb.PlainTextChangeSnapshot{Text: text}, + } +} + +func createAppendTextChangeContent(text string) *testchangepb.PlainTextChangeContent { + return &testchangepb.PlainTextChangeContent{ + Value: &testchangepb.PlainTextChangeContentValueOfTextAppend{ + TextAppend: &testchangepb.PlainTextChangeTextAppend{ + Text: text, + }, + }, + } +} diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index ba30a39c..83269ee5 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -95,7 +95,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, } // if we have incompatible heads, or we haven't seen the tree at all if fullRequest != nil { - return r.messageService.SendMessage(senderId, wrapFullRequest(fullRequest)) + return r.messageService.SendMessage(senderId, syncpb.WrapFullRequest(fullRequest)) } // if error or nothing has changed if err != nil || len(result.Added) == 0 { @@ -109,7 +109,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, TreeId: update.TreeId, TreeHeader: update.TreeHeader, } - return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) + return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) { @@ -138,7 +138,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str if err != nil { return err } - err = r.messageService.SendMessage(senderId, wrapFullResponse(fullResponse)) + err = r.messageService.SendMessage(senderId, syncpb.WrapFullResponse(fullResponse)) // if error or nothing has changed if err != nil || len(result.Added) == 0 { return err @@ -152,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str TreeId: request.TreeId, TreeHeader: request.TreeHeader, } - return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) + return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) { @@ -188,7 +188,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st SnapshotPath: snapshotPath, TreeId: response.TreeId, } - return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) + return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { @@ -247,21 +247,3 @@ func (r *requestHandler) createTree(ctx context.Context, response *syncpb.SyncFu return nil }) } - -func wrapHeadUpdate(update *syncpb.SyncHeadUpdate) *syncpb.SyncContent { - return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ - Value: &syncpb.SyncContentValueValueOfHeadUpdate{HeadUpdate: update}, - }} -} - -func wrapFullRequest(request *syncpb.SyncFullRequest) *syncpb.SyncContent { - return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ - Value: &syncpb.SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request}, - }} -} - -func wrapFullResponse(response *syncpb.SyncFullResponse) *syncpb.SyncContent { - return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ - Value: &syncpb.SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response}, - }} -} diff --git a/service/sync/syncpb/helpers.go b/service/sync/syncpb/helpers.go new file mode 100644 index 00000000..52d7fddd --- /dev/null +++ b/service/sync/syncpb/helpers.go @@ -0,0 +1,19 @@ +package syncpb + +func WrapHeadUpdate(update *SyncHeadUpdate) *SyncContent { + return &SyncContent{Message: &SyncContentValue{ + Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update}, + }} +} + +func WrapFullRequest(request *SyncFullRequest) *SyncContent { + return &SyncContent{Message: &SyncContentValue{ + Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request}, + }} +} + +func WrapFullResponse(response *SyncFullResponse) *SyncContent { + return &SyncContent{Message: &SyncContentValue{ + Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response}, + }} +}