diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 1b8a15dc..feee7e11 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -154,24 +154,21 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { for _, tId := range trees { + log := d.log.With(zap.String("treeId", tId)) tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { - d.log.InfoCtx(ctx, "can't load tree", zap.Error(err)) + log.WarnCtx(ctx, "can't load tree", zap.Error(err)) continue } syncTree, ok := tree.(synctree.SyncTree) if !ok { - d.log.InfoCtx(ctx, "not a sync tree", zap.String("objectId", tId)) + log.WarnCtx(ctx, "not a sync tree") continue } - // the idea why we call it directly is that if we try to get it from cache - // it may be already there (i.e. loaded) - // and build func will not be called, thus we won't sync the tree - // therefore we just do it manually if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - d.log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err), zap.String("treeId", tId)) + log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) } else { - d.log.DebugCtx(ctx, "success synctree.SyncWithPeer", zap.String("treeId", tId)) + log.DebugCtx(ctx, "success synctree.SyncWithPeer") } } } diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index d75f76c1..ef909833 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -116,13 +116,25 @@ type objectTree struct { } func (ot *objectTree) rebuildFromStorage(theirHeads []string, newChanges []*Change) (err error) { + oldTree := ot.tree ot.treeBuilder.Reset() - ot.tree, err = ot.treeBuilder.Build(theirHeads, newChanges) if err != nil { return } + // in case there are new heads + if theirHeads != nil && oldTree != nil { + // checking that old root is still in tree + rootCh, rootExists := ot.tree.attached[oldTree.RootId()] + + // checking the case where theirHeads were actually below prevHeads + // so if we did load some extra data in the tree, let's reduce it to old root + if slice.UnsortedEquals(oldTree.headIds, ot.tree.headIds) && rootExists && ot.tree.RootId() != oldTree.RootId() { + ot.tree.makeRootAndRemove(rootCh) + } + } + // during building the tree we may have marked some changes as possible roots, // but obviously they are not roots, because of the way how we construct the tree ot.tree.clearPossibleRoots() @@ -187,7 +199,7 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont panic(err) } - err = ot.treeStorage.TransactionAdd([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id}) + err = ot.treeStorage.AddRawChangesSetHeads([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id}) if err != nil { return } @@ -284,7 +296,11 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang addResult.Mode = Rebuild } - err = ot.treeStorage.TransactionAdd(addResult.Added, addResult.Heads) + err = ot.treeStorage.AddRawChangesSetHeads(addResult.Added, addResult.Heads) + if err != nil { + // rolling back all changes made to inmemory state + ot.rebuildFromStorage(nil, nil) + } return } @@ -347,7 +363,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang } // checks if we need to go to database - isOldSnapshot := func(ch *Change) bool { + snapshotNotInTree := func(ch *Change) bool { if ch.SnapshotId == ot.tree.RootId() { return false } @@ -362,26 +378,12 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang shouldRebuildFromStorage := false // checking if we have some changes with different snapshot and then rebuilding - for idx, ch := range ot.newChangesBuf { - if isOldSnapshot(ch) { - var exists bool - // checking if it exists in the storage, if yes, then at some point it was added to the tree - // thus we don't need to look at this change - exists, err = ot.treeStorage.HasChange(ctx, ch.Id) - if err != nil { - return - } - if exists { - // marking as nil to delete after - ot.newChangesBuf[idx] = nil - continue - } - // we haven't seen the change, and it refers to old snapshot, so we should rebuild + for _, ch := range ot.newChangesBuf { + if snapshotNotInTree(ch) { shouldRebuildFromStorage = true + break } } - // discarding all previously seen changes - ot.newChangesBuf = slice.DiscardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil }) if shouldRebuildFromStorage { err = ot.rebuildFromStorage(changesPayload.NewHeads, ot.newChangesBuf) @@ -549,22 +551,8 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate } func (ot *objectTree) HasChanges(chs ...string) bool { - hasChange := func(s string) bool { - _, attachedExists := ot.tree.attached[s] - if attachedExists { - return attachedExists - } - - has, err := ot.treeStorage.HasChange(context.Background(), s) - if err != nil { - return false - } - - return has - } - for _, ch := range chs { - if !hasChange(ch) { + if _, attachedExists := ot.tree.attached[ch]; !attachedExists { return false } } diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index 9dc5e7be..26cdb202 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -2,6 +2,7 @@ package objecttree import ( "context" + "fmt" "github.com/anytypeio/any-sync/commonspace/object/accountdata" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" @@ -64,7 +65,7 @@ func prepareContext( treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id) if additionalChanges != nil { payload := additionalChanges(changeCreator) - err := treeStorage.TransactionAdd(payload.RawChanges, payload.NewHeads) + err := treeStorage.AddRawChangesSetHeads(payload.RawChanges, payload.NewHeads) require.NoError(t, err) } objTree, err := objTreeBuilder(treeStorage, aclList) @@ -421,6 +422,99 @@ func TestObjectTree(t *testing.T) { }) }) + t.Run("rollback when add to storage returns error", func(t *testing.T) { + ctx := prepareTreeContext(t, aclList) + changeCreator := ctx.changeCreator + objTree := ctx.objTree + store := ctx.treeStorage.(*treestorage.InMemoryTreeStorage) + addErr := fmt.Errorf("error saving") + store.SetReturnErrorOnAdd(addErr) + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + } + payload := RawChangesPayload{ + NewHeads: []string{"1"}, + RawChanges: rawChanges, + } + _, err := objTree.AddRawChanges(context.Background(), payload) + require.Error(t, err, addErr) + require.Equal(t, "0", objTree.Root().Id) + }) + + t.Run("their heads before common snapshot", func(t *testing.T) { + // checking that adding old changes did not affect the tree + ctx := prepareTreeContext(t, aclList) + changeCreator := ctx.changeCreator + objTree := ctx.objTree + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + changeCreator.CreateRaw("2", aclList.Head().Id, "1", false, "1"), + changeCreator.CreateRaw("3", aclList.Head().Id, "1", true, "2"), + changeCreator.CreateRaw("4", aclList.Head().Id, "1", false, "2"), + changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"), + changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"), + } + payload := RawChangesPayload{ + NewHeads: []string{rawChanges[len(rawChanges)-1].Id}, + RawChanges: rawChanges, + } + _, err := objTree.AddRawChanges(context.Background(), payload) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, "6", objTree.Root().Id) + + rawChangesPrevious := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + } + payload = RawChangesPayload{ + NewHeads: []string{"1"}, + RawChanges: rawChangesPrevious, + } + _, err = objTree.AddRawChanges(context.Background(), payload) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, "6", objTree.Root().Id) + }) + + t.Run("stored changes will not break the pipeline if heads were not updated", func(t *testing.T) { + ctx := prepareTreeContext(t, aclList) + changeCreator := ctx.changeCreator + objTree := ctx.objTree + store := ctx.treeStorage.(*treestorage.InMemoryTreeStorage) + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + } + payload := RawChangesPayload{ + NewHeads: []string{rawChanges[len(rawChanges)-1].Id}, + RawChanges: rawChanges, + } + _, err := objTree.AddRawChanges(context.Background(), payload) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, "1", objTree.Root().Id) + + // creating changes to save in the storage + // to imitate the condition where all changes are in the storage + // but the head was not updated + storageChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("2", aclList.Head().Id, "1", false, "1"), + changeCreator.CreateRaw("3", aclList.Head().Id, "1", true, "2"), + changeCreator.CreateRaw("4", aclList.Head().Id, "1", false, "2"), + changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"), + changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"), + } + store.AddRawChangesSetHeads(storageChanges, []string{"1"}) + + // updating with subset of those changes to see that everything will still work + payload = RawChangesPayload{ + NewHeads: []string{"6"}, + RawChanges: storageChanges, + } + _, err = objTree.AddRawChanges(context.Background(), payload) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, "6", objTree.Root().Id) + }) + t.Run("changes from tree after common snapshot complex", func(t *testing.T) { ctx := prepareTreeContext(t, aclList) changeCreator := ctx.changeCreator @@ -654,7 +748,7 @@ func TestObjectTree(t *testing.T) { changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"), changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"), } - deps.treeStorage.TransactionAdd(rawChanges, []string{"6"}) + deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"}) hTree, err := buildHistoryTree(deps, HistoryTreeParams{ BeforeId: "6", IncludeBeforeId: false, @@ -686,7 +780,7 @@ func TestObjectTree(t *testing.T) { changeCreator.CreateRaw("5", aclList.Head().Id, "1", true, "3", "4"), changeCreator.CreateRaw("6", aclList.Head().Id, "5", false, "5"), } - deps.treeStorage.TransactionAdd(rawChanges, []string{"6"}) + deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"}) hTree, err := buildHistoryTree(deps, HistoryTreeParams{ BuildFullTree: true, }) @@ -716,7 +810,7 @@ func TestObjectTree(t *testing.T) { changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"), changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"), } - deps.treeStorage.TransactionAdd(rawChanges, []string{"6"}) + deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"}) hTree, err := buildHistoryTree(deps, HistoryTreeParams{ BeforeId: "6", IncludeBeforeId: true, diff --git a/commonspace/object/tree/objecttree/treebuilder.go b/commonspace/object/tree/objecttree/treebuilder.go index 43e23042..0abb136b 100644 --- a/commonspace/object/tree/objecttree/treebuilder.go +++ b/commonspace/object/tree/objecttree/treebuilder.go @@ -121,8 +121,8 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error) if err != nil { return } - tb.tree.AddFast(ch) changes := tb.dfs(heads, breakpoint) + tb.tree.AddFast(ch) tb.tree.AddFast(changes...) return } diff --git a/commonspace/object/tree/synctree/syncprotocol_test.go b/commonspace/object/tree/synctree/syncprotocol_test.go index 6cac042a..a23554c6 100644 --- a/commonspace/object/tree/synctree/syncprotocol_test.go +++ b/commonspace/object/tree/synctree/syncprotocol_test.go @@ -112,7 +112,7 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot } // generating initial tree initialRes := genChanges(changeCreator, params) - err = storage.TransactionAdd(initialRes.changes, initialRes.heads) + err = storage.AddRawChangesSetHeads(initialRes.changes, initialRes.heads) require.NoError(t, err) deps := fixtureDeps{ aclList: aclList, @@ -183,3 +183,179 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot } } } + +func TestTreeStorageHasExtraChanges(t *testing.T) { + var ( + rnd = rand.New(rand.NewSource(time.Now().Unix())) + levels = 20 + perLevel = 40 + ) + + // simulating cases where one peer has some extra changes saved in storage + // and checking that this will not break the sync + t.Run("tree storage has extra simple", func(t *testing.T) { + testTreeStorageHasExtra(t, levels, perLevel, false, func() bool { + return false + }) + testTreeStorageHasExtra(t, levels, perLevel, false, func() bool { + return rnd.Intn(10) > 5 + }) + testTreeStorageHasExtra(t, levels, perLevel, true, func() bool { + return false + }) + testTreeStorageHasExtra(t, levels, perLevel, true, func() bool { + return rnd.Intn(10) > 5 + }) + }) + t.Run("tree storage has extra three parts", func(t *testing.T) { + testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool { + return false + }) + testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool { + return rnd.Intn(10) > 5 + }) + testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool { + return false + }) + testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool { + return rnd.Intn(10) > 5 + }) + }) +} + +func testTreeStorageHasExtra(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) { + treeId := "treeId" + spaceId := "spaceId" + keys, err := accountdata.NewRandom() + require.NoError(t, err) + aclList, err := list.NewTestDerivedAcl(spaceId, keys) + storage := createStorage(treeId, aclList) + changeCreator := objecttree.NewMockChangeCreator() + builder := objecttree.BuildTestableTree + if hasData { + builder = objecttree.BuildEmptyDataTestableTree + } + params := genParams{ + prefix: "peer1", + aclId: aclList.Id(), + startIdx: 0, + levels: levels, + perLevel: perLevel, + snapshotId: treeId, + prevHeads: []string{treeId}, + isSnapshot: isSnapshot, + hasData: hasData, + } + deps := fixtureDeps{ + aclList: aclList, + initStorage: storage.(*treestorage.InMemoryTreeStorage), + connectionMap: map[string][]string{ + "peer1": []string{"peer2"}, + "peer2": []string{"peer1"}, + }, + treeBuilder: builder, + } + fx := newProtocolFixture(t, spaceId, deps) + + // generating initial tree + initialRes := genChanges(changeCreator, params) + fx.run(t) + + // adding some changes to store, but without updating heads + store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage) + oldHeads, _ := store.Heads() + store.AddRawChangesSetHeads(initialRes.changes, oldHeads) + + // sending those changes to other peer + fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: initialRes.heads, + RawChanges: initialRes.changes, + }) + time.Sleep(50 * time.Millisecond) + + // here we want that the saved changes in storage should not affect the sync protocol + firstHeads := fx.handlers["peer1"].tree().Heads() + secondHeads := fx.handlers["peer2"].tree().Heads() + require.True(t, slice.UnsortedEquals(firstHeads, secondHeads)) + require.True(t, slice.UnsortedEquals(initialRes.heads, firstHeads)) +} + +func testTreeStorageHasExtraThreeParts(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) { + treeId := "treeId" + spaceId := "spaceId" + keys, err := accountdata.NewRandom() + require.NoError(t, err) + aclList, err := list.NewTestDerivedAcl(spaceId, keys) + storage := createStorage(treeId, aclList) + changeCreator := objecttree.NewMockChangeCreator() + builder := objecttree.BuildTestableTree + if hasData { + builder = objecttree.BuildEmptyDataTestableTree + } + params := genParams{ + prefix: "peer1", + aclId: aclList.Id(), + startIdx: 0, + levels: levels, + perLevel: perLevel, + snapshotId: treeId, + prevHeads: []string{treeId}, + isSnapshot: isSnapshot, + hasData: hasData, + } + deps := fixtureDeps{ + aclList: aclList, + initStorage: storage.(*treestorage.InMemoryTreeStorage), + connectionMap: map[string][]string{ + "peer1": []string{"peer2"}, + "peer2": []string{"peer1"}, + }, + treeBuilder: builder, + } + fx := newProtocolFixture(t, spaceId, deps) + + // generating parts + firstPart := genChanges(changeCreator, params) + params.startIdx = levels + params.snapshotId = firstPart.snapshotId + params.prevHeads = firstPart.heads + secondPart := genChanges(changeCreator, params) + params.startIdx = levels * 2 + params.snapshotId = secondPart.snapshotId + params.prevHeads = secondPart.heads + thirdPart := genChanges(changeCreator, params) + + // adding part1 to first peer and saving part2 and part3 in its storage + res, _ := fx.handlers["peer1"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: firstPart.heads, + RawChanges: firstPart.changes, + }) + require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads)) + store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage) + oldHeads, _ := store.Heads() + store.AddRawChangesSetHeads(secondPart.changes, oldHeads) + store.AddRawChangesSetHeads(thirdPart.changes, oldHeads) + + var peer2Initial []*treechangeproto.RawTreeChangeWithId + peer2Initial = append(peer2Initial, firstPart.changes...) + peer2Initial = append(peer2Initial, secondPart.changes...) + + // adding part1 and part2 to second peer + res, _ = fx.handlers["peer2"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: secondPart.heads, + RawChanges: peer2Initial, + }) + require.True(t, slice.UnsortedEquals(res.Heads, secondPart.heads)) + fx.run(t) + + // sending part3 changes to other peer + fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: thirdPart.heads, + RawChanges: thirdPart.changes, + }) + time.Sleep(50 * time.Millisecond) + firstHeads := fx.handlers["peer1"].tree().Heads() + secondHeads := fx.handlers["peer2"].tree().Heads() + require.True(t, slice.UnsortedEquals(firstHeads, secondHeads)) + require.True(t, slice.UnsortedEquals(thirdPart.heads, firstHeads)) +} diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 203d3cac..683691d2 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -76,12 +76,15 @@ type BuildDeps struct { } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - remoteGetter := treeRemoteGetter{treeId: id, deps: deps} - deps.TreeStorage, err = remoteGetter.getTree(ctx) + var ( + remoteGetter = treeRemoteGetter{treeId: id, deps: deps} + isRemote bool + ) + deps.TreeStorage, isRemote, err = remoteGetter.getTree(ctx) if err != nil { return } - return buildSyncTree(ctx, true, deps) + return buildSyncTree(ctx, isRemote, deps) } func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) { @@ -92,7 +95,7 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo return buildSyncTree(ctx, true, deps) } -func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t SyncTree, err error) { +func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t SyncTree, err error) { objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList) if err != nil { return @@ -113,7 +116,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy syncTree.afterBuild() syncTree.Unlock() - if isFirstBuild { + if sendUpdate { headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree syncTree.syncClient.Broadcast(ctx, headUpdate) diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index dab63e92..7584e7fd 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -93,7 +93,7 @@ func (s *syncTreeHandler) handleHeadUpdate( defer func() { if err != nil { - log.With(zap.Error(err)).Debug("head update finished with error") + log.ErrorCtx(ctx, "head update finished with error", zap.Error(err)) } else if fullRequest != nil { cnt := fullRequest.Content.GetFullSyncRequest() log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes))) @@ -168,7 +168,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( defer func() { if err != nil { - log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error") + log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err)) s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId) return } else if fullResponse != nil { @@ -212,7 +212,7 @@ func (s *syncTreeHandler) handleFullSyncResponse( defer func() { if err != nil { - log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed") + log.ErrorCtx(ctx, "full sync response failed", zap.Error(err)) } else { log.DebugCtx(ctx, "full sync response succeeded") } diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index 7c401a5e..e9b8323e 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -90,12 +90,11 @@ Loop: } } -func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) { +func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) { treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId) if err == nil { return } - if err != nil && err != treestorage.ErrUnknownTreeId { return } @@ -109,6 +108,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. return } + isRemote = true resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) if err != nil { return @@ -139,5 +139,6 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. return } // now we are sure that we can save it to the storage - return t.deps.SpaceStorage.CreateTreeStorage(payload) + treeStorage, err = t.deps.SpaceStorage.CreateTreeStorage(payload) + return } diff --git a/commonspace/object/tree/treestorage/inmemory.go b/commonspace/object/tree/treestorage/inmemory.go index af40561d..a3f3aaa3 100644 --- a/commonspace/object/tree/treestorage/inmemory.go +++ b/commonspace/object/tree/treestorage/inmemory.go @@ -13,13 +13,21 @@ type InMemoryTreeStorage struct { root *treechangeproto.RawTreeChangeWithId heads []string Changes map[string]*treechangeproto.RawTreeChangeWithId + addErr error sync.RWMutex } -func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { +func (t *InMemoryTreeStorage) SetReturnErrorOnAdd(err error) { + t.addErr = err +} + +func (t *InMemoryTreeStorage) AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error { t.RLock() defer t.RUnlock() + if t.addErr != nil { + return t.addErr + } for _, ch := range changes { t.Changes[ch.Id] = ch diff --git a/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go b/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go index 2fe6029f..be338599 100644 --- a/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go +++ b/commonspace/object/tree/treestorage/mock_treestorage/mock_treestorage.go @@ -49,6 +49,20 @@ func (mr *MockTreeStorageMockRecorder) AddRawChange(arg0 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChange", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChange), arg0) } +// AddRawChangesSetHeads mocks base method. +func (m *MockTreeStorage) AddRawChangesSetHeads(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddRawChangesSetHeads", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddRawChangesSetHeads indicates an expected call of AddRawChangesSetHeads. +func (mr *MockTreeStorageMockRecorder) AddRawChangesSetHeads(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesSetHeads", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChangesSetHeads), arg0, arg1) +} + // Delete mocks base method. func (m *MockTreeStorage) Delete() error { m.ctrl.T.Helper() @@ -150,17 +164,3 @@ func (mr *MockTreeStorageMockRecorder) SetHeads(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeads", reflect.TypeOf((*MockTreeStorage)(nil).SetHeads), arg0) } - -// TransactionAdd mocks base method. -func (m *MockTreeStorage) TransactionAdd(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TransactionAdd", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// TransactionAdd indicates an expected call of TransactionAdd. -func (mr *MockTreeStorageMockRecorder) TransactionAdd(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionAdd", reflect.TypeOf((*MockTreeStorage)(nil).TransactionAdd), arg0, arg1) -} diff --git a/commonspace/object/tree/treestorage/treestorage.go b/commonspace/object/tree/treestorage/treestorage.go index 14fbad78..db566f86 100644 --- a/commonspace/object/tree/treestorage/treestorage.go +++ b/commonspace/object/tree/treestorage/treestorage.go @@ -31,7 +31,7 @@ type TreeStorage interface { Heads() ([]string, error) SetHeads(heads []string) error AddRawChange(change *treechangeproto.RawTreeChangeWithId) error - TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error + AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error) HasChange(ctx context.Context, id string) (bool, error) diff --git a/commonspace/space.go b/commonspace/space.go index 1d7b3ff8..acd58a22 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -385,6 +385,9 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) _ = s.handleQueue.CloseThread(threadId) }() } + if hm.PeerCtx == nil { + hm.PeerCtx = ctx + } err = s.handleQueue.Add(ctx, threadId, hm) if err == mb.ErrOverflowed { log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))