From de1a5baacad57b0c49e139269922092f6d0effd1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Sep 2022 22:01:11 +0200 Subject: [PATCH] Change sync logic --- pkg/acl/tree/objecttree.go | 98 +++++-------------- pkg/acl/tree/rawloader.go | 84 +++++++++------- service/sync/requesthandler/requesthandler.go | 68 ++----------- 3 files changed, 87 insertions(+), 163 deletions(-) diff --git a/pkg/acl/tree/objecttree.go b/pkg/acl/tree/objecttree.go index b56616ca..2fc0eaa5 100644 --- a/pkg/acl/tree/objecttree.go +++ b/pkg/acl/tree/objecttree.go @@ -72,12 +72,13 @@ type ObjectTree interface { } type objectTree struct { - treeStorage storage.TreeStorage - changeBuilder ChangeBuilder - updateListener ObjectTreeUpdateListener - validator ObjectTreeValidator - treeBuilder *treeBuilder - aclList list.ACLList + treeStorage storage.TreeStorage + changeBuilder ChangeBuilder + updateListener ObjectTreeUpdateListener + validator ObjectTreeValidator + rawChangeLoader *rawChangeLoader + treeBuilder *treeBuilder + aclList list.ACLList id string header *aclpb.Header @@ -97,12 +98,13 @@ type objectTree struct { } type objectTreeDeps struct { - changeBuilder ChangeBuilder - treeBuilder *treeBuilder - treeStorage storage.TreeStorage - updateListener ObjectTreeUpdateListener - validator ObjectTreeValidator - aclList list.ACLList + changeBuilder ChangeBuilder + treeBuilder *treeBuilder + treeStorage storage.TreeStorage + updateListener ObjectTreeUpdateListener + validator ObjectTreeValidator + rawChangeLoader *rawChangeLoader + aclList list.ACLList } func defaultObjectTreeDeps( @@ -114,12 +116,13 @@ func defaultObjectTreeDeps( changeBuilder := newChangeBuilder(keychain) treeBuilder := newTreeBuilder(treeStorage, changeBuilder) return objectTreeDeps{ - changeBuilder: changeBuilder, - treeBuilder: treeBuilder, - treeStorage: treeStorage, - updateListener: listener, - validator: newTreeValidator(), - aclList: aclList, + changeBuilder: changeBuilder, + treeBuilder: treeBuilder, + treeStorage: treeStorage, + updateListener: listener, + validator: newTreeValidator(), + rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), + aclList: aclList, } } @@ -131,6 +134,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { validator: deps.validator, aclList: deps.aclList, changeBuilder: deps.changeBuilder, + rawChangeLoader: deps.rawChangeLoader, tree: nil, keys: make(map[uint64]*symmetric.Key), tmpChangesBuf: make([]*Change, 0, 10), @@ -531,66 +535,16 @@ func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string) if commonSnapshot == ot.tree.RootId() { return ot.getChangesFromTree(theirHeads) } else { - return ot.getChangesFromDB(commonSnapshot, theirHeads, needFullDocument) + return ot.getChangesFromDB(commonSnapshot, theirHeads) } } func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { - ot.tree.dfsPrev( - ot.tree.HeadsChanges(), - theirHeads, - func(ch *Change) bool { - var marshalled []byte - marshalled, err = ch.Content.Marshal() - if err != nil { - return false - } - - raw := &aclpb.RawChange{ - Payload: marshalled, - Signature: ch.Signature(), - Id: ch.Id, - } - rawChanges = append(rawChanges, raw) - return true - }, - func(changes []*Change) {}) - - return + return ot.rawChangeLoader.LoadFromTree(ot.tree, theirHeads) } -func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) { - load := func(id string) (*Change, error) { - raw, err := ot.treeStorage.GetRawChange(context.Background(), id) - if err != nil { - return nil, err - } - - ch, err := NewChangeFromRaw(raw) - if err != nil { - return nil, err - } - - rawChanges = append(rawChanges, raw) - return ch, nil - } - // setting breakpoints to include head and common snapshot - // that means that we will ignore all changes which start at theirHeads - breakpoints := make([]string, 0, len(theirHeads)+1) - breakpoints = append(breakpoints, commonSnapshot) - breakpoints = append(breakpoints, theirHeads...) - - _, err = ot.treeBuilder.dfs(ot.tree.Heads(), breakpoints, load) - if err != nil { - return - } - - if needStartSnapshot { - // adding snapshot to raw changes - _, err = load(commonSnapshot) - } - - return +func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { + return ot.rawChangeLoader.LoadFromStorage(commonSnapshot, ot.tree.headIds, theirHeads) } func (ot *objectTree) snapshotPathIsActual() bool { diff --git a/pkg/acl/tree/rawloader.go b/pkg/acl/tree/rawloader.go index 473b2017..27cb7ba7 100644 --- a/pkg/acl/tree/rawloader.go +++ b/pkg/acl/tree/rawloader.go @@ -71,6 +71,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. // if we stopped at breakpoints or there are no breakpoints if !rootVisited || len(breakpoints) == 0 { + // in this case we will add root if there are no breakpoints return convert(results) } @@ -81,6 +82,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. } // doing another dfs to get all changes before breakpoints, we need to exclude them from results + // if we don't have some breakpoints we will just ignore them t.dfsPrev( stack, []string{}, @@ -98,35 +100,12 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. return convert(results) } -func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) { - var ok bool - if entry, ok = r.cache[id]; ok { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - rawChange, err := r.treeStorage.GetRawChange(ctx, id) - if err != nil { - return - } - - change, err := r.changeBuilder.ConvertFromRaw(rawChange) - if err != nil { - return - } - entry = rawCacheEntry{ - change: change, - rawChange: rawChange, - } - r.cache[id] = entry - return -} - func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) { - // initializing buffers - r.idStack = r.idStack[:0] + // resetting cache + r.cache = make(map[string]rawCacheEntry) + defer func() { + r.cache = nil + }() // updating map bufPosMap := make(map[string]int) @@ -142,6 +121,10 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi shouldVisit func(counter int, mapExists bool) bool, visit func(prevCounter int, entry rawCacheEntry) int) bool { + // resetting stack + r.idStack = r.idStack[:0] + r.idStack = append(r.idStack, heads...) + commonSnapshotVisited := false for len(r.idStack) > 0 { id := r.idStack[len(r.idStack)-1] @@ -152,6 +135,7 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi continue } + // TODO: add proper error handling, we must ignore errors on missing breakpoints though entry, err := r.loadEntry(id) if err != nil { continue @@ -189,16 +173,22 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi }) // checking if we stopped at breakpoints - if !rootVisited || len(breakpoints) == 0 { + if !rootVisited { return buffer, nil } - // resetting stack - r.idStack = r.idStack[:0] - r.idStack = append(r.idStack, breakpoints...) + // if there are no breakpoints then we should load root also + if len(breakpoints) == 0 { + common, err := r.loadEntry(commonSnapshot) + if err != nil { + return nil, err + } + buffer = append(buffer, common.rawChange) + return buffer, nil + } // marking all visited as nil - dfs(commonSnapshot, heads, len(buffer), + dfs(commonSnapshot, breakpoints, len(buffer), func(counter int, mapExists bool) bool { return !mapExists || counter < len(buffer) }, @@ -208,6 +198,8 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi } return len(buffer) + 1 }) + + // discarding visited discardFromSlice(buffer, func(change *aclpb.RawChange) bool { return change == nil }) @@ -215,6 +207,32 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi return buffer, nil } +func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) { + var ok bool + if entry, ok = r.cache[id]; ok { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + rawChange, err := r.treeStorage.GetRawChange(ctx, id) + if err != nil { + return + } + + change, err := r.changeBuilder.ConvertFromRaw(rawChange) + if err != nil { + return + } + entry = rawCacheEntry{ + change: change, + rawChange: rawChange, + } + r.cache[id] = entry + return +} + func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) { var ( finishedIdx = 0 diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index c9108acf..0b50d156 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -101,10 +101,10 @@ func (r *requestHandler) HandleHeadUpdate( } // if we couldn't add all the changes - shouldFullSync := !slice.UnsortedEquals(update.Heads, objTree.Heads()) + shouldFullSync := len(update.Changes) != len(result.Added) snapshotPath = objTree.SnapshotPath() if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(update.SnapshotPath, objTree) + fullRequest, err = r.prepareFullSyncRequest(objTree) if err != nil { return err } @@ -141,26 +141,13 @@ func (r *requestHandler) HandleFullSyncRequest( header *aclpb.Header, treeId string) (err error) { - var ( - fullResponse *syncproto.SyncFullResponse - snapshotPath []string - result tree.AddResult - ) + var fullResponse *syncproto.SyncFullResponse err = r.treeCache.Do(ctx, treeId, func(obj any) error { objTree := obj.(tree.ObjectTree) objTree.Lock() defer objTree.Unlock() - log.Info("getting tree from treeCache", zap.String("aclId", objTree.Header().AclListId)) - // if we have non-empty request - if len(request.Heads) != 0 { - result, err = objTree.AddRawChanges(ctx, request.Changes...) - if err != nil { - return err - } - } - snapshotPath = objTree.SnapshotPath() - fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, objTree) + fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) if err != nil { return err } @@ -170,19 +157,7 @@ func (r *requestHandler) HandleFullSyncRequest( if err != nil { return err } - err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) } func (r *requestHandler) HandleFullSyncResponse( @@ -196,19 +171,17 @@ func (r *requestHandler) HandleFullSyncResponse( snapshotPath []string result tree.AddResult ) - log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). - Debug("processing full sync response") err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { objTree := obj.(tree.ObjectTree) objTree.Lock() defer objTree.Unlock() + // if we already have the heads for whatever reason if slice.UnsortedEquals(response.Heads, objTree.Heads()) { return nil } - // TODO: check if we already have those changes result, err = objTree.AddRawChanges(ctx, response.Changes...) if err != nil { return err @@ -263,46 +236,25 @@ func (r *requestHandler) HandleACLList( return err } -func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) - if err != nil { - return nil, err - } +func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { return &syncproto.SyncFullRequest{ Heads: t.Heads(), - Changes: ourChanges, SnapshotPath: t.SnapshotPath(), }, nil } func (r *requestHandler) prepareFullSyncResponse( treeId string, - theirPath []string, - theirChanges []*aclpb.RawChange, + theirPath, theirHeads []string, t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { - // TODO: we can probably use the common snapshot calculated on the request step from previous peer - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) + ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads) if err != nil { return nil, err } - theirMap := make(map[string]struct{}) - for _, ch := range theirChanges { - theirMap[ch.Id] = struct{}{} - } - - // filtering our changes, so we will not send the same changes back - var final []*aclpb.RawChange - for _, ch := range ourChanges { - if _, exists := theirMap[ch.Id]; !exists { - final = append(final, ch) - } - } - log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)). - Debug("preparing changes for tree") return &syncproto.SyncFullResponse{ Heads: t.Heads(), - Changes: final, + Changes: ourChanges, SnapshotPath: t.SnapshotPath(), }, nil }