Update sync layer to work with different trees

This commit is contained in:
mcrakhman 2022-08-11 15:33:39 +02:00
parent 37eacb5d61
commit 028cb99586
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
5 changed files with 206 additions and 86 deletions

View File

@ -51,22 +51,11 @@ var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
type ACLTree interface { type ACLTree interface {
RWLocker RWLocker
ID() string CommonTree
Header() *treepb.TreeHeader
ACLState() *ACLState ACLState() *ACLState
AddContent(ctx context.Context, f func(builder ACLChangeBuilder) error) (*aclpb.RawChange, error) AddContent(ctx context.Context, f func(builder ACLChangeBuilder) error) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
Heads() []string
Root() *Change
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() treestorage.TreeStorage
DebugDump() (string, error)
Close() error
} }
type aclTree struct { type aclTree struct {
@ -389,10 +378,6 @@ func (a *aclTree) Root() *Change {
return a.tree.Root() return a.tree.Root()
} }
func (a *aclTree) Close() error {
return nil
}
func (a *aclTree) SnapshotPath() []string { func (a *aclTree) SnapshotPath() []string {
// TODO: think about caching this // TODO: think about caching this

View File

@ -0,0 +1,21 @@
package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
)
type CommonTree interface {
ID() string
Header() *treepb.TreeHeader
Heads() []string
Root() *Change
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() treestorage.TreeStorage
DebugDump() (string, error)
}

View File

@ -21,21 +21,9 @@ type TreeUpdateListener interface {
type DocTree interface { type DocTree interface {
RWLocker RWLocker
ID() string CommonTree
Header() *treepb.TreeHeader
AddContent(ctx context.Context, aclTree ACLTree, content proto.Marshaler, isSnapshot bool) (*aclpb.RawChange, error) AddContent(ctx context.Context, aclTree ACLTree, content proto.Marshaler, isSnapshot bool) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, aclTree ACLTree, changes ...*aclpb.RawChange) (AddResult, error) AddRawChanges(ctx context.Context, aclTree ACLTree, changes ...*aclpb.RawChange) (AddResult, error)
Heads() []string
Root() *Change
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() treestorage.TreeStorage
DebugDump() (string, error)
Close() error
} }
type docTree struct { type docTree struct {
@ -372,10 +360,6 @@ func (d *docTree) Root() *Change {
return d.tree.Root() return d.tree.Root()
} }
func (d *docTree) Close() error {
return nil
}
func (d *docTree) SnapshotPath() []string { func (d *docTree) SnapshotPath() []string {
// TODO: think about caching this // TODO: think about caching this

View File

@ -77,6 +77,8 @@ func (s *service) UpdateDocumentTree(ctx context.Context, id, text string) (err
err = s.treeCache.Do(ctx, id, func(obj interface{}) error { err = s.treeCache.Do(ctx, id, func(obj interface{}) error {
docTree := obj.(tree.DocTree) docTree := obj.(tree.DocTree)
docTree.Lock()
defer docTree.Unlock()
err = s.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error { err = s.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error {
aclTree := obj.(tree.ACLTree) aclTree := obj.(tree.ACLTree)
aclTree.RLock() aclTree.RLock()

View File

@ -2,10 +2,11 @@ package requesthandler
import ( import (
"context" "context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
@ -23,6 +24,8 @@ type requestHandler struct {
var log = logger.NewNamed("requesthandler") var log = logger.NewNamed("requesthandler")
var ErrIncorrectDocType = errors.New("incorrec doc type")
func New() app.Component { func New() app.Component {
return &requestHandler{} return &requestHandler{}
} }
@ -74,29 +77,74 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
var ( var (
fullRequest *syncproto.SyncFullRequest fullRequest *syncproto.SyncFullRequest
snapshotPath []string snapshotPath []string
result acltree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
Debug("processing head update") Debug("processing head update")
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { updateACLTree := func() {
// TODO: check if we already have those changes err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
result, err = tree.AddRawChanges(ctx, update.Changes...) t := obj.(tree.ACLTree)
if err != nil { t.Lock()
return err defer t.Unlock()
} // TODO: check if we already have those changes
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())). result, err = t.AddRawChanges(ctx, update.Changes...)
Debug("comparing heads after head update")
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads())
snapshotPath = tree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, tree)
if err != nil { if err != nil {
return err return err
} }
} log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", t.Heads())).
return nil Debug("comparing heads after head update")
}) shouldFullSync := !slice.UnsortedEquals(update.Heads, t.Heads())
snapshotPath = t.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, t)
if err != nil {
return err
}
}
return nil
})
}
updateDocTree := func() {
err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
docTree := obj.(tree.DocTree)
docTree.Lock()
defer docTree.Unlock()
return r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
aclTree := obj.(tree.ACLTree)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...)
if err != nil {
return err
}
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", docTree.Heads())).
Debug("comparing heads after head update")
shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads())
snapshotPath = docTree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, docTree)
if err != nil {
return err
}
}
return nil
})
})
}
switch update.TreeHeader.Type {
case treepb.TreeHeader_ACLTree:
updateACLTree()
case treepb.TreeHeader_DocTree:
updateDocTree()
default:
return ErrIncorrectDocType
}
// if there are no such tree // if there are no such tree
if err == treestorage.ErrUnknownTreeId { if err == treestorage.ErrUnknownTreeId {
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request // TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
@ -128,27 +176,71 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
var ( var (
fullResponse *syncproto.SyncFullResponse fullResponse *syncproto.SyncFullResponse
snapshotPath []string snapshotPath []string
result acltree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
Debug("processing full sync request") Debug("processing full sync request")
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { requestACLTree := func() {
// TODO: check if we already have those changes err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
// if we have non-empty request t := obj.(tree.ACLTree)
if len(request.Heads) != 0 { t.Lock()
result, err = tree.AddRawChanges(ctx, request.Changes...) defer t.Unlock()
// TODO: check if we already have those changes
// if we have non-empty request
if len(request.Heads) != 0 {
result, err = t.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
}
}
snapshotPath = t.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, t)
if err != nil { if err != nil {
return err return err
} }
} return nil
snapshotPath = tree.SnapshotPath() })
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree) }
if err != nil {
return err requestDocTree := func() {
} err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
return nil docTree := obj.(tree.DocTree)
}) docTree.Lock()
defer docTree.Unlock()
return r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
aclTree := obj.(tree.ACLTree)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
// if we have non-empty request
if len(request.Heads) != 0 {
result, err = docTree.AddRawChanges(ctx, aclTree, request.Changes...)
if err != nil {
return err
}
}
snapshotPath = docTree.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, docTree)
if err != nil {
return err
}
return nil
})
})
}
switch request.TreeHeader.Type {
case treepb.TreeHeader_ACLTree:
requestACLTree()
case treepb.TreeHeader_DocTree:
requestDocTree()
default:
return ErrIncorrectDocType
}
if err != nil { if err != nil {
return err return err
} }
@ -172,20 +264,56 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) { func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) {
var ( var (
snapshotPath []string snapshotPath []string
result acltree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
Debug("processing full sync response") Debug("processing full sync response")
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { responseACLTree := func() {
// TODO: check if we already have those changes err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
result, err = tree.AddRawChanges(ctx, response.Changes...) t := obj.(tree.ACLTree)
if err != nil { t.Lock()
return err defer t.Unlock()
} // TODO: check if we already have those changes
snapshotPath = tree.SnapshotPath() result, err = t.AddRawChanges(ctx, response.Changes...)
return nil if err != nil {
}) return err
}
snapshotPath = t.SnapshotPath()
return nil
})
}
responseDocTree := func() {
err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
docTree := obj.(tree.DocTree)
docTree.Lock()
defer docTree.Unlock()
return r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
aclTree := obj.(tree.ACLTree)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
result, err = docTree.AddRawChanges(ctx, aclTree, response.Changes...)
if err != nil {
return err
}
snapshotPath = docTree.SnapshotPath()
return nil
})
})
}
switch response.TreeHeader.Type {
case treepb.TreeHeader_ACLTree:
responseACLTree()
case treepb.TreeHeader_DocTree:
responseDocTree()
default:
return ErrIncorrectDocType
}
// if error or nothing has changed // if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != treestorage.ErrUnknownTreeId { if (err != nil || len(result.Added) == 0) && err != treestorage.ErrUnknownTreeId {
return err return err
@ -196,7 +324,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
if err != nil { if err != nil {
return err return err
} }
result = acltree.AddResult{ result = tree.AddResult{
OldHeads: []string{}, OldHeads: []string{},
Heads: response.Heads, Heads: response.Heads,
Added: response.Changes, Added: response.Changes,
@ -212,16 +340,16 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
} }
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) { func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) {
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &syncproto.SyncFullRequest{ return &syncproto.SyncFullRequest{
Heads: tree.Heads(), Heads: t.Heads(),
Changes: ourChanges, Changes: ourChanges,
TreeId: treeId, TreeId: treeId,
SnapshotPath: tree.SnapshotPath(), SnapshotPath: t.SnapshotPath(),
TreeHeader: header, TreeHeader: header,
}, nil }, nil
} }
@ -230,9 +358,9 @@ func (r *requestHandler) prepareFullSyncResponse(
treeId string, treeId string,
theirPath []string, theirPath []string,
theirChanges []*aclpb.RawChange, theirChanges []*aclpb.RawChange,
tree acltree.ACLTree) (*syncproto.SyncFullResponse, error) { t tree.CommonTree) (*syncproto.SyncFullResponse, error) {
// TODO: we can probably use the common snapshot calculated on the request step from previous peer // TODO: we can probably use the common snapshot calculated on the request step from previous peer
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -252,11 +380,11 @@ func (r *requestHandler) prepareFullSyncResponse(
Debug("preparing changes for tree") Debug("preparing changes for tree")
return &syncproto.SyncFullResponse{ return &syncproto.SyncFullResponse{
Heads: tree.Heads(), Heads: t.Heads(),
Changes: final, Changes: final,
TreeId: treeId, TreeId: treeId,
SnapshotPath: tree.SnapshotPath(), SnapshotPath: t.SnapshotPath(),
TreeHeader: tree.Header(), TreeHeader: t.Header(),
}, nil }, nil
} }
@ -266,7 +394,7 @@ func (r *requestHandler) createTree(ctx context.Context, response *syncproto.Syn
response.TreeId, response.TreeId,
response.TreeHeader, response.TreeHeader,
response.Changes, response.Changes,
func(tree acltree.ACLTree) error { func(obj interface{}) error {
return nil return nil
}) })
} }