diff --git a/pkg/acl/tree/objecttree.go b/pkg/acl/tree/objecttree.go index b56616ca..2fc0eaa5 100644 --- a/pkg/acl/tree/objecttree.go +++ b/pkg/acl/tree/objecttree.go @@ -72,12 +72,13 @@ type ObjectTree interface { } type objectTree struct { - treeStorage storage.TreeStorage - changeBuilder ChangeBuilder - updateListener ObjectTreeUpdateListener - validator ObjectTreeValidator - treeBuilder *treeBuilder - aclList list.ACLList + treeStorage storage.TreeStorage + changeBuilder ChangeBuilder + updateListener ObjectTreeUpdateListener + validator ObjectTreeValidator + rawChangeLoader *rawChangeLoader + treeBuilder *treeBuilder + aclList list.ACLList id string header *aclpb.Header @@ -97,12 +98,13 @@ type objectTree struct { } type objectTreeDeps struct { - changeBuilder ChangeBuilder - treeBuilder *treeBuilder - treeStorage storage.TreeStorage - updateListener ObjectTreeUpdateListener - validator ObjectTreeValidator - aclList list.ACLList + changeBuilder ChangeBuilder + treeBuilder *treeBuilder + treeStorage storage.TreeStorage + updateListener ObjectTreeUpdateListener + validator ObjectTreeValidator + rawChangeLoader *rawChangeLoader + aclList list.ACLList } func defaultObjectTreeDeps( @@ -114,12 +116,13 @@ func defaultObjectTreeDeps( changeBuilder := newChangeBuilder(keychain) treeBuilder := newTreeBuilder(treeStorage, changeBuilder) return objectTreeDeps{ - changeBuilder: changeBuilder, - treeBuilder: treeBuilder, - treeStorage: treeStorage, - updateListener: listener, - validator: newTreeValidator(), - aclList: aclList, + changeBuilder: changeBuilder, + treeBuilder: treeBuilder, + treeStorage: treeStorage, + updateListener: listener, + validator: newTreeValidator(), + rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), + aclList: aclList, } } @@ -131,6 +134,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { validator: deps.validator, aclList: deps.aclList, changeBuilder: deps.changeBuilder, + rawChangeLoader: deps.rawChangeLoader, tree: nil, keys: make(map[uint64]*symmetric.Key), tmpChangesBuf: make([]*Change, 0, 10), @@ -531,66 +535,16 @@ func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string) if commonSnapshot == ot.tree.RootId() { return ot.getChangesFromTree(theirHeads) } else { - return ot.getChangesFromDB(commonSnapshot, theirHeads, needFullDocument) + return ot.getChangesFromDB(commonSnapshot, theirHeads) } } func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { - ot.tree.dfsPrev( - ot.tree.HeadsChanges(), - theirHeads, - func(ch *Change) bool { - var marshalled []byte - marshalled, err = ch.Content.Marshal() - if err != nil { - return false - } - - raw := &aclpb.RawChange{ - Payload: marshalled, - Signature: ch.Signature(), - Id: ch.Id, - } - rawChanges = append(rawChanges, raw) - return true - }, - func(changes []*Change) {}) - - return + return ot.rawChangeLoader.LoadFromTree(ot.tree, theirHeads) } -func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) { - load := func(id string) (*Change, error) { - raw, err := ot.treeStorage.GetRawChange(context.Background(), id) - if err != nil { - return nil, err - } - - ch, err := NewChangeFromRaw(raw) - if err != nil { - return nil, err - } - - rawChanges = append(rawChanges, raw) - return ch, nil - } - // setting breakpoints to include head and common snapshot - // that means that we will ignore all changes which start at theirHeads - breakpoints := make([]string, 0, len(theirHeads)+1) - breakpoints = append(breakpoints, commonSnapshot) - breakpoints = append(breakpoints, theirHeads...) - - _, err = ot.treeBuilder.dfs(ot.tree.Heads(), breakpoints, load) - if err != nil { - return - } - - if needStartSnapshot { - // adding snapshot to raw changes - _, err = load(commonSnapshot) - } - - return +func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { + return ot.rawChangeLoader.LoadFromStorage(commonSnapshot, ot.tree.headIds, theirHeads) } func (ot *objectTree) snapshotPathIsActual() bool { diff --git a/pkg/acl/tree/rawloader.go b/pkg/acl/tree/rawloader.go index 473b2017..27cb7ba7 100644 --- a/pkg/acl/tree/rawloader.go +++ b/pkg/acl/tree/rawloader.go @@ -71,6 +71,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. // if we stopped at breakpoints or there are no breakpoints if !rootVisited || len(breakpoints) == 0 { + // in this case we will add root if there are no breakpoints return convert(results) } @@ -81,6 +82,7 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. } // doing another dfs to get all changes before breakpoints, we need to exclude them from results + // if we don't have some breakpoints we will just ignore them t.dfsPrev( stack, []string{}, @@ -98,35 +100,12 @@ func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb. return convert(results) } -func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) { - var ok bool - if entry, ok = r.cache[id]; ok { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - rawChange, err := r.treeStorage.GetRawChange(ctx, id) - if err != nil { - return - } - - change, err := r.changeBuilder.ConvertFromRaw(rawChange) - if err != nil { - return - } - entry = rawCacheEntry{ - change: change, - rawChange: rawChange, - } - r.cache[id] = entry - return -} - func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) { - // initializing buffers - r.idStack = r.idStack[:0] + // resetting cache + r.cache = make(map[string]rawCacheEntry) + defer func() { + r.cache = nil + }() // updating map bufPosMap := make(map[string]int) @@ -142,6 +121,10 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi shouldVisit func(counter int, mapExists bool) bool, visit func(prevCounter int, entry rawCacheEntry) int) bool { + // resetting stack + r.idStack = r.idStack[:0] + r.idStack = append(r.idStack, heads...) + commonSnapshotVisited := false for len(r.idStack) > 0 { id := r.idStack[len(r.idStack)-1] @@ -152,6 +135,7 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi continue } + // TODO: add proper error handling, we must ignore errors on missing breakpoints though entry, err := r.loadEntry(id) if err != nil { continue @@ -189,16 +173,22 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi }) // checking if we stopped at breakpoints - if !rootVisited || len(breakpoints) == 0 { + if !rootVisited { return buffer, nil } - // resetting stack - r.idStack = r.idStack[:0] - r.idStack = append(r.idStack, breakpoints...) + // if there are no breakpoints then we should load root also + if len(breakpoints) == 0 { + common, err := r.loadEntry(commonSnapshot) + if err != nil { + return nil, err + } + buffer = append(buffer, common.rawChange) + return buffer, nil + } // marking all visited as nil - dfs(commonSnapshot, heads, len(buffer), + dfs(commonSnapshot, breakpoints, len(buffer), func(counter int, mapExists bool) bool { return !mapExists || counter < len(buffer) }, @@ -208,6 +198,8 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi } return len(buffer) + 1 }) + + // discarding visited discardFromSlice(buffer, func(change *aclpb.RawChange) bool { return change == nil }) @@ -215,6 +207,32 @@ func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoi return buffer, nil } +func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) { + var ok bool + if entry, ok = r.cache[id]; ok { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + rawChange, err := r.treeStorage.GetRawChange(ctx, id) + if err != nil { + return + } + + change, err := r.changeBuilder.ConvertFromRaw(rawChange) + if err != nil { + return + } + entry = rawCacheEntry{ + change: change, + rawChange: rawChange, + } + r.cache[id] = entry + return +} + func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) { var ( finishedIdx = 0 diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index c7212bc9..0b50d156 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -5,7 +5,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" @@ -87,42 +86,34 @@ func (r *requestHandler) HandleHeadUpdate( log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). Debug("processing head update") - err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { - docTree := obj.(tree.ObjectTree) - docTree.Lock() - defer docTree.Unlock() + err = r.treeCache.Do(ctx, treeId, func(obj any) error { + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() - if slice.UnsortedEquals(update.Heads, docTree.Heads()) { + if slice.UnsortedEquals(update.Heads, objTree.Heads()) { return nil } - return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error { - aclTree := obj.(list.ACLList) - aclTree.RLock() - defer aclTree.RUnlock() + result, err = objTree.AddRawChanges(ctx, update.Changes...) + if err != nil { + return err + } - // TODO: check if we already have those changes - result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...) + // if we couldn't add all the changes + shouldFullSync := len(update.Changes) != len(result.Added) + snapshotPath = objTree.SnapshotPath() + if shouldFullSync { + fullRequest, err = r.prepareFullSyncRequest(objTree) if err != nil { return err } - log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", docTree.Heads())). - Debug("comparing heads after head update") - shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads()) - snapshotPath = docTree.SnapshotPath() - if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(update.SnapshotPath, docTree) - if err != nil { - return err - } - } - return nil - }) + } + return nil }) // if there are no such tree if err == storage.ErrUnknownTreeId { - // TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request fullRequest = &syncproto.SyncFullRequest{} } // if we have incompatible heads, or we haven't seen the tree at all @@ -133,7 +124,7 @@ func (r *requestHandler) HandleHeadUpdate( if err != nil || len(result.Added) == 0 { return err } - log.Info("res", zap.Int("result added", len(result.Added))) + // otherwise sending heads update message newUpdate := &syncproto.SyncHeadUpdate{ Heads: result.Heads, @@ -150,61 +141,23 @@ func (r *requestHandler) HandleFullSyncRequest( header *aclpb.Header, treeId string) (err error) { - var ( - fullResponse *syncproto.SyncFullResponse - snapshotPath []string - result tree.AddResult - ) - log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). - Debug("processing full sync request") + var fullResponse *syncproto.SyncFullResponse + err = r.treeCache.Do(ctx, treeId, func(obj any) error { + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() - log.Info("getting doc tree from treeCache", zap.String("treeId", treeId)) - err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { - docTree := obj.(tree.ObjectTree) - docTree.Lock() - defer docTree.Unlock() - - //if slice.UnsortedEquals(request.Heads, docTree.Heads()) { - // return nil - //} - log.Info("getting tree from treeCache", zap.String("aclId", docTree.Header().AclListId)) - return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error { - aclTree := obj.(list.ACLList) - aclTree.RLock() - defer aclTree.RUnlock() - // TODO: check if we already have those changes - // if we have non-empty request - if len(request.Heads) != 0 { - result, err = docTree.AddRawChanges(ctx, aclTree, request.Changes...) - if err != nil { - return err - } - } - snapshotPath = docTree.SnapshotPath() - fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, docTree) - if err != nil { - return err - } - return nil - }) + fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) + if err != nil { + return err + } + return nil }) if err != nil { return err } - err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) } func (r *requestHandler) HandleFullSyncResponse( @@ -218,30 +171,23 @@ func (r *requestHandler) HandleFullSyncResponse( snapshotPath []string result tree.AddResult ) - log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). - Debug("processing full sync response") err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { - docTree := obj.(tree.ObjectTree) - docTree.Lock() - defer docTree.Unlock() + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() - if slice.UnsortedEquals(response.Heads, docTree.Heads()) { + // if we already have the heads for whatever reason + if slice.UnsortedEquals(response.Heads, objTree.Heads()) { return nil } - return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error { - aclTree := obj.(list.ACLList) - aclTree.RLock() - defer aclTree.RUnlock() - // TODO: check if we already have those changes - result, err = docTree.AddRawChanges(ctx, aclTree, response.Changes...) - if err != nil { - return err - } - snapshotPath = docTree.SnapshotPath() - return nil - }) + result, err = objTree.AddRawChanges(ctx, response.Changes...) + if err != nil { + return err + } + snapshotPath = objTree.SnapshotPath() + return nil }) // if error or nothing has changed @@ -290,46 +236,25 @@ func (r *requestHandler) HandleACLList( return err } -func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) - if err != nil { - return nil, err - } +func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { return &syncproto.SyncFullRequest{ Heads: t.Heads(), - Changes: ourChanges, SnapshotPath: t.SnapshotPath(), }, nil } func (r *requestHandler) prepareFullSyncResponse( treeId string, - theirPath []string, - theirChanges []*aclpb.RawChange, + theirPath, theirHeads []string, t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { - // TODO: we can probably use the common snapshot calculated on the request step from previous peer - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) + ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads) if err != nil { return nil, err } - theirMap := make(map[string]struct{}) - for _, ch := range theirChanges { - theirMap[ch.Id] = struct{}{} - } - - // filtering our changes, so we will not send the same changes back - var final []*aclpb.RawChange - for _, ch := range ourChanges { - if _, exists := theirMap[ch.Id]; !exists { - final = append(final, ch) - } - } - log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)). - Debug("preparing changes for tree") return &syncproto.SyncFullResponse{ Heads: t.Heads(), - Changes: final, + Changes: ourChanges, SnapshotPath: t.SnapshotPath(), }, nil }