Add document service
This commit is contained in:
parent
57c09e263d
commit
35528f6024
@ -50,7 +50,7 @@ type ACLTree interface {
|
||||
ID() string
|
||||
Header() *treepb.TreeHeader
|
||||
ACLState() *ACLState
|
||||
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error)
|
||||
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*aclpb.RawChange, error)
|
||||
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
|
||||
Heads() []string
|
||||
Root() *Change
|
||||
@ -59,6 +59,7 @@ type ACLTree interface {
|
||||
HasChange(string) bool
|
||||
SnapshotPath() []string
|
||||
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
|
||||
Storage() treestorage.TreeStorage
|
||||
|
||||
Close() error
|
||||
}
|
||||
@ -239,7 +240,11 @@ func (a *aclTree) ACLState() *ACLState {
|
||||
return a.aclState
|
||||
}
|
||||
|
||||
func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*Change, error) {
|
||||
func (a *aclTree) Storage() treestorage.TreeStorage {
|
||||
return a.treeStorage
|
||||
}
|
||||
|
||||
func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*aclpb.RawChange, error) {
|
||||
// TODO: add snapshot creation logic
|
||||
defer func() {
|
||||
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
|
||||
@ -257,12 +262,13 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild
|
||||
return nil, err
|
||||
}
|
||||
a.fullTree.AddFast(ch)
|
||||
|
||||
err = a.treeStorage.AddRawChange(&aclpb.RawChange{
|
||||
rawCh := &aclpb.RawChange{
|
||||
Payload: marshalled,
|
||||
Signature: ch.Signature(),
|
||||
Id: ch.Id,
|
||||
})
|
||||
}
|
||||
|
||||
err = a.treeStorage.AddRawChange(rawCh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -271,7 +277,7 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ch, nil
|
||||
return rawCh, nil
|
||||
}
|
||||
|
||||
func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) {
|
||||
|
||||
@ -23,7 +23,6 @@ type Change struct {
|
||||
SnapshotId string
|
||||
IsSnapshot bool
|
||||
DecryptedDocumentChange []byte
|
||||
Raw *aclpb.RawChange // this will not be present on all changes, we only need it sometimes
|
||||
|
||||
Content *aclpb.ACLChange
|
||||
Sign []byte
|
||||
|
||||
@ -1 +1,156 @@
|
||||
package document
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/testutils/testchanges/testchangepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
var CName = "DocumentService"
|
||||
|
||||
type service struct {
|
||||
messageService message.Service
|
||||
treeCache treecache.Service
|
||||
account account.Service
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
UpdateDocument(ctx context.Context, id, text string) error
|
||||
CreateDocument(ctx context.Context, text string) (string, error)
|
||||
}
|
||||
|
||||
func NewService() app.Component {
|
||||
return &service{}
|
||||
}
|
||||
|
||||
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
||||
s.account = a.MustComponent(account.CName).(account.Service)
|
||||
s.messageService = a.MustComponent(message.CName).(message.Service)
|
||||
s.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Name() (name string) {
|
||||
return CName
|
||||
}
|
||||
|
||||
func (s *service) Run(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Close(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) UpdateDocument(ctx context.Context, id, text string) (err error) {
|
||||
var (
|
||||
ch *aclpb.RawChange
|
||||
header *treepb.TreeHeader
|
||||
snapshotPath []string
|
||||
heads []string
|
||||
)
|
||||
|
||||
err = s.treeCache.Do(ctx, id, func(tree acltree.ACLTree) error {
|
||||
ch, err = tree.AddContent(ctx, func(builder acltree.ChangeBuilder) error {
|
||||
builder.AddChangeContent(
|
||||
&testchangepb.PlainTextChangeData{
|
||||
Content: []*testchangepb.PlainTextChangeContent{
|
||||
createAppendTextChangeContent(text),
|
||||
},
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id = tree.ID()
|
||||
heads = tree.Heads()
|
||||
header = tree.Header()
|
||||
snapshotPath = tree.SnapshotPath()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
Changes: []*aclpb.RawChange{ch},
|
||||
TreeId: "",
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeHeader: header,
|
||||
}))
|
||||
}
|
||||
|
||||
func (s *service) CreateDocument(ctx context.Context, text string) (id string, err error) {
|
||||
acc := s.account.Account()
|
||||
var (
|
||||
ch *aclpb.RawChange
|
||||
header *treepb.TreeHeader
|
||||
snapshotPath []string
|
||||
heads []string
|
||||
)
|
||||
|
||||
err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error {
|
||||
err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
builder.AddChangeContent(createInitialChangeContent(text))
|
||||
return nil
|
||||
}, func(tree acltree.ACLTree) error {
|
||||
id = tree.ID()
|
||||
heads = tree.Heads()
|
||||
header = tree.Header()
|
||||
snapshotPath = tree.SnapshotPath()
|
||||
ch, err = tree.Storage().GetChange(ctx, heads[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
err = s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
Changes: []*aclpb.RawChange{ch},
|
||||
TreeId: "",
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeHeader: header,
|
||||
}))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return id, err
|
||||
}
|
||||
|
||||
func createInitialChangeContent(text string) proto.Marshaler {
|
||||
return &testchangepb.PlainTextChangeData{
|
||||
Content: []*testchangepb.PlainTextChangeContent{
|
||||
createAppendTextChangeContent(text),
|
||||
},
|
||||
Snapshot: &testchangepb.PlainTextChangeSnapshot{Text: text},
|
||||
}
|
||||
}
|
||||
|
||||
func createAppendTextChangeContent(text string) *testchangepb.PlainTextChangeContent {
|
||||
return &testchangepb.PlainTextChangeContent{
|
||||
Value: &testchangepb.PlainTextChangeContentValueOfTextAppend{
|
||||
TextAppend: &testchangepb.PlainTextChangeTextAppend{
|
||||
Text: text,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
||||
}
|
||||
// if we have incompatible heads, or we haven't seen the tree at all
|
||||
if fullRequest != nil {
|
||||
return r.messageService.SendMessage(senderId, wrapFullRequest(fullRequest))
|
||||
return r.messageService.SendMessage(senderId, syncpb.WrapFullRequest(fullRequest))
|
||||
}
|
||||
// if error or nothing has changed
|
||||
if err != nil || len(result.Added) == 0 {
|
||||
@ -109,7 +109,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
||||
TreeId: update.TreeId,
|
||||
TreeHeader: update.TreeHeader,
|
||||
}
|
||||
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) {
|
||||
@ -138,7 +138,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.messageService.SendMessage(senderId, wrapFullResponse(fullResponse))
|
||||
err = r.messageService.SendMessage(senderId, syncpb.WrapFullResponse(fullResponse))
|
||||
// if error or nothing has changed
|
||||
if err != nil || len(result.Added) == 0 {
|
||||
return err
|
||||
@ -152,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
||||
TreeId: request.TreeId,
|
||||
TreeHeader: request.TreeHeader,
|
||||
}
|
||||
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) {
|
||||
@ -188,7 +188,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeId: response.TreeId,
|
||||
}
|
||||
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendMessage("", syncpb.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
|
||||
@ -247,21 +247,3 @@ func (r *requestHandler) createTree(ctx context.Context, response *syncpb.SyncFu
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func wrapHeadUpdate(update *syncpb.SyncHeadUpdate) *syncpb.SyncContent {
|
||||
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
|
||||
Value: &syncpb.SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
|
||||
}}
|
||||
}
|
||||
|
||||
func wrapFullRequest(request *syncpb.SyncFullRequest) *syncpb.SyncContent {
|
||||
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
|
||||
Value: &syncpb.SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
|
||||
}}
|
||||
}
|
||||
|
||||
func wrapFullResponse(response *syncpb.SyncFullResponse) *syncpb.SyncContent {
|
||||
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
|
||||
Value: &syncpb.SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
|
||||
}}
|
||||
}
|
||||
|
||||
19
service/sync/syncpb/helpers.go
Normal file
19
service/sync/syncpb/helpers.go
Normal file
@ -0,0 +1,19 @@
|
||||
package syncpb
|
||||
|
||||
func WrapHeadUpdate(update *SyncHeadUpdate) *SyncContent {
|
||||
return &SyncContent{Message: &SyncContentValue{
|
||||
Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
|
||||
}}
|
||||
}
|
||||
|
||||
func WrapFullRequest(request *SyncFullRequest) *SyncContent {
|
||||
return &SyncContent{Message: &SyncContentValue{
|
||||
Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
|
||||
}}
|
||||
}
|
||||
|
||||
func WrapFullResponse(response *SyncFullResponse) *SyncContent {
|
||||
return &SyncContent{Message: &SyncContentValue{
|
||||
Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
|
||||
}}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user