Change sync logic

This commit is contained in:
mcrakhman 2022-09-07 22:01:11 +02:00 committed by Mikhail Iudin
parent 246e0aa674
commit cd0f601afa
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
3 changed files with 120 additions and 223 deletions

View File

@ -76,6 +76,7 @@ type objectTree struct {
changeBuilder ChangeBuilder changeBuilder ChangeBuilder
updateListener ObjectTreeUpdateListener updateListener ObjectTreeUpdateListener
validator ObjectTreeValidator validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader
treeBuilder *treeBuilder treeBuilder *treeBuilder
aclList list.ACLList aclList list.ACLList
@ -102,6 +103,7 @@ type objectTreeDeps struct {
treeStorage storage.TreeStorage treeStorage storage.TreeStorage
updateListener ObjectTreeUpdateListener updateListener ObjectTreeUpdateListener
validator ObjectTreeValidator validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader
aclList list.ACLList aclList list.ACLList
} }
@ -119,6 +121,7 @@ func defaultObjectTreeDeps(
treeStorage: treeStorage, treeStorage: treeStorage,
updateListener: listener, updateListener: listener,
validator: newTreeValidator(), validator: newTreeValidator(),
rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder),
aclList: aclList, aclList: aclList,
} }
} }
@ -131,6 +134,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
validator: deps.validator, validator: deps.validator,
aclList: deps.aclList, aclList: deps.aclList,
changeBuilder: deps.changeBuilder, changeBuilder: deps.changeBuilder,
rawChangeLoader: deps.rawChangeLoader,
tree: nil, tree: nil,
keys: make(map[uint64]*symmetric.Key), keys: make(map[uint64]*symmetric.Key),
tmpChangesBuf: make([]*Change, 0, 10), tmpChangesBuf: make([]*Change, 0, 10),
@ -531,66 +535,16 @@ func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string)
if commonSnapshot == ot.tree.RootId() { if commonSnapshot == ot.tree.RootId() {
return ot.getChangesFromTree(theirHeads) return ot.getChangesFromTree(theirHeads)
} else { } else {
return ot.getChangesFromDB(commonSnapshot, theirHeads, needFullDocument) return ot.getChangesFromDB(commonSnapshot, theirHeads)
} }
} }
func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) {
ot.tree.dfsPrev( return ot.rawChangeLoader.LoadFromTree(ot.tree, theirHeads)
ot.tree.HeadsChanges(),
theirHeads,
func(ch *Change) bool {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
return false
}
raw := &aclpb.RawChange{
Payload: marshalled,
Signature: ch.Signature(),
Id: ch.Id,
}
rawChanges = append(rawChanges, raw)
return true
},
func(changes []*Change) {})
return
} }
func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) { func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string) (rawChanges []*aclpb.RawChange, err error) {
load := func(id string) (*Change, error) { return ot.rawChangeLoader.LoadFromStorage(commonSnapshot, ot.tree.headIds, theirHeads)
raw, err := ot.treeStorage.GetRawChange(context.Background(), id)
if err != nil {
return nil, err
}
ch, err := NewChangeFromRaw(raw)
if err != nil {
return nil, err
}
rawChanges = append(rawChanges, raw)
return ch, nil
}
// setting breakpoints to include head and common snapshot
// that means that we will ignore all changes which start at theirHeads
breakpoints := make([]string, 0, len(theirHeads)+1)
breakpoints = append(breakpoints, commonSnapshot)
breakpoints = append(breakpoints, theirHeads...)
_, err = ot.treeBuilder.dfs(ot.tree.Heads(), breakpoints, load)
if err != nil {
return
}
if needStartSnapshot {
// adding snapshot to raw changes
_, err = load(commonSnapshot)
}
return
} }
func (ot *objectTree) snapshotPathIsActual() bool { func (ot *objectTree) snapshotPathIsActual() bool {

View File

@ -71,6 +71,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb.
// if we stopped at breakpoints or there are no breakpoints // if we stopped at breakpoints or there are no breakpoints
if !rootVisited || len(breakpoints) == 0 { if !rootVisited || len(breakpoints) == 0 {
// in this case we will add root if there are no breakpoints
return convert(results) return convert(results)
} }
@ -81,6 +82,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb.
} }
// doing another dfs to get all changes before breakpoints, we need to exclude them from results // doing another dfs to get all changes before breakpoints, we need to exclude them from results
// if we don't have some breakpoints we will just ignore them
t.dfsPrev( t.dfsPrev(
stack, stack,
[]string{}, []string{},
@ -98,35 +100,12 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb.
return convert(results) return convert(results)
} }
func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) {
var ok bool
if entry, ok = r.cache[id]; ok {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
rawChange, err := r.treeStorage.GetRawChange(ctx, id)
if err != nil {
return
}
change, err := r.changeBuilder.ConvertFromRaw(rawChange)
if err != nil {
return
}
entry = rawCacheEntry{
change: change,
rawChange: rawChange,
}
r.cache[id] = entry
return
}
func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) { func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) {
// initializing buffers // resetting cache
r.idStack = r.idStack[:0] r.cache = make(map[string]rawCacheEntry)
defer func() {
r.cache = nil
}()
// updating map // updating map
bufPosMap := make(map[string]int) bufPosMap := make(map[string]int)
@ -142,6 +121,10 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
shouldVisit func(counter int, mapExists bool) bool, shouldVisit func(counter int, mapExists bool) bool,
visit func(prevCounter int, entry rawCacheEntry) int) bool { visit func(prevCounter int, entry rawCacheEntry) int) bool {
// resetting stack
r.idStack = r.idStack[:0]
r.idStack = append(r.idStack, heads...)
commonSnapshotVisited := false commonSnapshotVisited := false
for len(r.idStack) > 0 { for len(r.idStack) > 0 {
id := r.idStack[len(r.idStack)-1] id := r.idStack[len(r.idStack)-1]
@ -152,6 +135,7 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
continue continue
} }
// TODO: add proper error handling, we must ignore errors on missing breakpoints though
entry, err := r.loadEntry(id) entry, err := r.loadEntry(id)
if err != nil { if err != nil {
continue continue
@ -189,16 +173,22 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
}) })
// checking if we stopped at breakpoints // checking if we stopped at breakpoints
if !rootVisited || len(breakpoints) == 0 { if !rootVisited {
return buffer, nil return buffer, nil
} }
// resetting stack // if there are no breakpoints then we should load root also
r.idStack = r.idStack[:0] if len(breakpoints) == 0 {
r.idStack = append(r.idStack, breakpoints...) common, err := r.loadEntry(commonSnapshot)
if err != nil {
return nil, err
}
buffer = append(buffer, common.rawChange)
return buffer, nil
}
// marking all visited as nil // marking all visited as nil
dfs(commonSnapshot, heads, len(buffer), dfs(commonSnapshot, breakpoints, len(buffer),
func(counter int, mapExists bool) bool { func(counter int, mapExists bool) bool {
return !mapExists || counter < len(buffer) return !mapExists || counter < len(buffer)
}, },
@ -208,6 +198,8 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
} }
return len(buffer) + 1 return len(buffer) + 1
}) })
// discarding visited
discardFromSlice(buffer, func(change *aclpb.RawChange) bool { discardFromSlice(buffer, func(change *aclpb.RawChange) bool {
return change == nil return change == nil
}) })
@ -215,6 +207,32 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi
return buffer, nil return buffer, nil
} }
func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) {
var ok bool
if entry, ok = r.cache[id]; ok {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
rawChange, err := r.treeStorage.GetRawChange(ctx, id)
if err != nil {
return
}
change, err := r.changeBuilder.ConvertFromRaw(rawChange)
if err != nil {
return
}
entry = rawCacheEntry{
change: change,
rawChange: rawChange,
}
r.cache[id] = entry
return
}
func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) { func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) {
var ( var (
finishedIdx = 0 finishedIdx = 0

View File

@ -5,7 +5,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
@ -87,42 +86,34 @@ func (r *requestHandler) HandleHeadUpdate(
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing head update") Debug("processing head update")
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { err = r.treeCache.Do(ctx, treeId, func(obj any) error {
docTree := obj.(tree.ObjectTree) objTree := obj.(tree.ObjectTree)
docTree.Lock() objTree.Lock()
defer docTree.Unlock() defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, docTree.Heads()) { if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil return nil
} }
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error { result, err = objTree.AddRawChanges(ctx, update.Changes...)
aclTree := obj.(list.ACLList)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...)
if err != nil { if err != nil {
return err return err
} }
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", docTree.Heads())).
Debug("comparing heads after head update") // if we couldn't add all the changes
shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads()) shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = docTree.SnapshotPath() snapshotPath = objTree.SnapshotPath()
if shouldFullSync { if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.SnapshotPath, docTree) fullRequest, err = r.prepareFullSyncRequest(objTree)
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
}) })
})
// if there are no such tree // if there are no such tree
if err == storage.ErrUnknownTreeId { if err == storage.ErrUnknownTreeId {
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
fullRequest = &syncproto.SyncFullRequest{} fullRequest = &syncproto.SyncFullRequest{}
} }
// if we have incompatible heads, or we haven't seen the tree at all // if we have incompatible heads, or we haven't seen the tree at all
@ -133,7 +124,7 @@ func (r *requestHandler) HandleHeadUpdate(
if err != nil || len(result.Added) == 0 { if err != nil || len(result.Added) == 0 {
return err return err
} }
log.Info("res", zap.Int("result added", len(result.Added)))
// otherwise sending heads update message // otherwise sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
@ -150,61 +141,23 @@ func (r *requestHandler) HandleFullSyncRequest(
header *aclpb.Header, header *aclpb.Header,
treeId string) (err error) { treeId string) (err error) {
var ( var fullResponse *syncproto.SyncFullResponse
fullResponse *syncproto.SyncFullResponse err = r.treeCache.Do(ctx, treeId, func(obj any) error {
snapshotPath []string objTree := obj.(tree.ObjectTree)
result tree.AddResult objTree.Lock()
) defer objTree.Unlock()
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing full sync request")
log.Info("getting doc tree from treeCache", zap.String("treeId", treeId)) fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
docTree := obj.(tree.ObjectTree)
docTree.Lock()
defer docTree.Unlock()
//if slice.UnsortedEquals(request.Heads, docTree.Heads()) {
// return nil
//}
log.Info("getting tree from treeCache", zap.String("aclId", docTree.Header().AclListId))
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error {
aclTree := obj.(list.ACLList)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
// if we have non-empty request
if len(request.Heads) != 0 {
result, err = docTree.AddRawChanges(ctx, aclTree, request.Changes...)
if err != nil {
return err
}
}
snapshotPath = docTree.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, docTree)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
}) })
})
if err != nil { if err != nil {
return err return err
} }
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
} }
func (r *requestHandler) HandleFullSyncResponse( func (r *requestHandler) HandleFullSyncResponse(
@ -218,31 +171,24 @@ func (r *requestHandler) HandleFullSyncResponse(
snapshotPath []string snapshotPath []string
result tree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing full sync response")
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
docTree := obj.(tree.ObjectTree) objTree := obj.(tree.ObjectTree)
docTree.Lock() objTree.Lock()
defer docTree.Unlock() defer objTree.Unlock()
if slice.UnsortedEquals(response.Heads, docTree.Heads()) { // if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil return nil
} }
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error { result, err = objTree.AddRawChanges(ctx, response.Changes...)
aclTree := obj.(list.ACLList)
aclTree.RLock()
defer aclTree.RUnlock()
// TODO: check if we already have those changes
result, err = docTree.AddRawChanges(ctx, aclTree, response.Changes...)
if err != nil { if err != nil {
return err return err
} }
snapshotPath = docTree.SnapshotPath() snapshotPath = objTree.SnapshotPath()
return nil return nil
}) })
})
// if error or nothing has changed // if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId { if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
@ -290,46 +236,25 @@ func (r *requestHandler) HandleACLList(
return err return err
} }
func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
}
return &syncproto.SyncFullRequest{ return &syncproto.SyncFullRequest{
Heads: t.Heads(), Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(), SnapshotPath: t.SnapshotPath(),
}, nil }, nil
} }
func (r *requestHandler) prepareFullSyncResponse( func (r *requestHandler) prepareFullSyncResponse(
treeId string, treeId string,
theirPath []string, theirPath, theirHeads []string,
theirChanges []*aclpb.RawChange,
t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
// TODO: we can probably use the common snapshot calculated on the request step from previous peer ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
theirMap := make(map[string]struct{})
for _, ch := range theirChanges {
theirMap[ch.Id] = struct{}{}
}
// filtering our changes, so we will not send the same changes back
var final []*aclpb.RawChange
for _, ch := range ourChanges {
if _, exists := theirMap[ch.Id]; !exists {
final = append(final, ch)
}
}
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
Debug("preparing changes for tree")
return &syncproto.SyncFullResponse{ return &syncproto.SyncFullResponse{
Heads: t.Heads(), Heads: t.Heads(),
Changes: final, Changes: ourChanges,
SnapshotPath: t.SnapshotPath(), SnapshotPath: t.SnapshotPath(),
}, nil }, nil
} }