Merge pull request #94 from anytypeio/sync-protocol-no-transaction-add

This commit is contained in:
Mikhail Rakhmanov 2023-05-11 18:50:58 +02:00 committed by GitHub
commit 56c1d65e88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 347 additions and 77 deletions

View File

@ -154,24 +154,21 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) {
for _, tId := range trees {
log := d.log.With(zap.String("treeId", tId))
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
if err != nil {
d.log.InfoCtx(ctx, "can't load tree", zap.Error(err))
log.WarnCtx(ctx, "can't load tree", zap.Error(err))
continue
}
syncTree, ok := tree.(synctree.SyncTree)
if !ok {
d.log.InfoCtx(ctx, "not a sync tree", zap.String("objectId", tId))
log.WarnCtx(ctx, "not a sync tree")
continue
}
// the idea why we call it directly is that if we try to get it from cache
// it may be already there (i.e. loaded)
// and build func will not be called, thus we won't sync the tree
// therefore we just do it manually
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
d.log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err), zap.String("treeId", tId))
log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err))
} else {
d.log.DebugCtx(ctx, "success synctree.SyncWithPeer", zap.String("treeId", tId))
log.DebugCtx(ctx, "success synctree.SyncWithPeer")
}
}
}

View File

@ -116,13 +116,25 @@ type objectTree struct {
}
func (ot *objectTree) rebuildFromStorage(theirHeads []string, newChanges []*Change) (err error) {
oldTree := ot.tree
ot.treeBuilder.Reset()
ot.tree, err = ot.treeBuilder.Build(theirHeads, newChanges)
if err != nil {
return
}
// in case there are new heads
if theirHeads != nil && oldTree != nil {
// checking that old root is still in tree
rootCh, rootExists := ot.tree.attached[oldTree.RootId()]
// checking the case where theirHeads were actually below prevHeads
// so if we did load some extra data in the tree, let's reduce it to old root
if slice.UnsortedEquals(oldTree.headIds, ot.tree.headIds) && rootExists && ot.tree.RootId() != oldTree.RootId() {
ot.tree.makeRootAndRemove(rootCh)
}
}
// during building the tree we may have marked some changes as possible roots,
// but obviously they are not roots, because of the way how we construct the tree
ot.tree.clearPossibleRoots()
@ -187,7 +199,7 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont
panic(err)
}
err = ot.treeStorage.TransactionAdd([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
err = ot.treeStorage.AddRawChangesSetHeads([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
if err != nil {
return
}
@ -284,7 +296,11 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
addResult.Mode = Rebuild
}
err = ot.treeStorage.TransactionAdd(addResult.Added, addResult.Heads)
err = ot.treeStorage.AddRawChangesSetHeads(addResult.Added, addResult.Heads)
if err != nil {
// rolling back all changes made to inmemory state
ot.rebuildFromStorage(nil, nil)
}
return
}
@ -347,7 +363,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang
}
// checks if we need to go to database
isOldSnapshot := func(ch *Change) bool {
snapshotNotInTree := func(ch *Change) bool {
if ch.SnapshotId == ot.tree.RootId() {
return false
}
@ -362,26 +378,12 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang
shouldRebuildFromStorage := false
// checking if we have some changes with different snapshot and then rebuilding
for idx, ch := range ot.newChangesBuf {
if isOldSnapshot(ch) {
var exists bool
// checking if it exists in the storage, if yes, then at some point it was added to the tree
// thus we don't need to look at this change
exists, err = ot.treeStorage.HasChange(ctx, ch.Id)
if err != nil {
return
}
if exists {
// marking as nil to delete after
ot.newChangesBuf[idx] = nil
continue
}
// we haven't seen the change, and it refers to old snapshot, so we should rebuild
for _, ch := range ot.newChangesBuf {
if snapshotNotInTree(ch) {
shouldRebuildFromStorage = true
break
}
}
// discarding all previously seen changes
ot.newChangesBuf = slice.DiscardFromSlice(ot.newChangesBuf, func(ch *Change) bool { return ch == nil })
if shouldRebuildFromStorage {
err = ot.rebuildFromStorage(changesPayload.NewHeads, ot.newChangesBuf)
@ -549,22 +551,8 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate
}
func (ot *objectTree) HasChanges(chs ...string) bool {
hasChange := func(s string) bool {
_, attachedExists := ot.tree.attached[s]
if attachedExists {
return attachedExists
}
has, err := ot.treeStorage.HasChange(context.Background(), s)
if err != nil {
return false
}
return has
}
for _, ch := range chs {
if !hasChange(ch) {
if _, attachedExists := ot.tree.attached[ch]; !attachedExists {
return false
}
}

View File

@ -2,6 +2,7 @@ package objecttree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/accountdata"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
@ -64,7 +65,7 @@ func prepareContext(
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
if additionalChanges != nil {
payload := additionalChanges(changeCreator)
err := treeStorage.TransactionAdd(payload.RawChanges, payload.NewHeads)
err := treeStorage.AddRawChangesSetHeads(payload.RawChanges, payload.NewHeads)
require.NoError(t, err)
}
objTree, err := objTreeBuilder(treeStorage, aclList)
@ -421,6 +422,99 @@ func TestObjectTree(t *testing.T) {
})
})
t.Run("rollback when add to storage returns error", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
store := ctx.treeStorage.(*treestorage.InMemoryTreeStorage)
addErr := fmt.Errorf("error saving")
store.SetReturnErrorOnAdd(addErr)
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
}
payload := RawChangesPayload{
NewHeads: []string{"1"},
RawChanges: rawChanges,
}
_, err := objTree.AddRawChanges(context.Background(), payload)
require.Error(t, err, addErr)
require.Equal(t, "0", objTree.Root().Id)
})
t.Run("their heads before common snapshot", func(t *testing.T) {
// checking that adding old changes did not affect the tree
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
changeCreator.CreateRaw("2", aclList.Head().Id, "1", false, "1"),
changeCreator.CreateRaw("3", aclList.Head().Id, "1", true, "2"),
changeCreator.CreateRaw("4", aclList.Head().Id, "1", false, "2"),
changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"),
}
payload := RawChangesPayload{
NewHeads: []string{rawChanges[len(rawChanges)-1].Id},
RawChanges: rawChanges,
}
_, err := objTree.AddRawChanges(context.Background(), payload)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "6", objTree.Root().Id)
rawChangesPrevious := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
}
payload = RawChangesPayload{
NewHeads: []string{"1"},
RawChanges: rawChangesPrevious,
}
_, err = objTree.AddRawChanges(context.Background(), payload)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "6", objTree.Root().Id)
})
t.Run("stored changes will not break the pipeline if heads were not updated", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
store := ctx.treeStorage.(*treestorage.InMemoryTreeStorage)
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
}
payload := RawChangesPayload{
NewHeads: []string{rawChanges[len(rawChanges)-1].Id},
RawChanges: rawChanges,
}
_, err := objTree.AddRawChanges(context.Background(), payload)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "1", objTree.Root().Id)
// creating changes to save in the storage
// to imitate the condition where all changes are in the storage
// but the head was not updated
storageChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("2", aclList.Head().Id, "1", false, "1"),
changeCreator.CreateRaw("3", aclList.Head().Id, "1", true, "2"),
changeCreator.CreateRaw("4", aclList.Head().Id, "1", false, "2"),
changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"),
}
store.AddRawChangesSetHeads(storageChanges, []string{"1"})
// updating with subset of those changes to see that everything will still work
payload = RawChangesPayload{
NewHeads: []string{"6"},
RawChanges: storageChanges,
}
_, err = objTree.AddRawChanges(context.Background(), payload)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "6", objTree.Root().Id)
})
t.Run("changes from tree after common snapshot complex", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
@ -654,7 +748,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.TransactionAdd(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: false,
@ -686,7 +780,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "1", true, "3", "4"),
changeCreator.CreateRaw("6", aclList.Head().Id, "5", false, "5"),
}
deps.treeStorage.TransactionAdd(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BuildFullTree: true,
})
@ -716,7 +810,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.TransactionAdd(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: true,

View File

@ -121,8 +121,8 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error)
if err != nil {
return
}
tb.tree.AddFast(ch)
changes := tb.dfs(heads, breakpoint)
tb.tree.AddFast(ch)
tb.tree.AddFast(changes...)
return
}

View File

@ -112,7 +112,7 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot
}
// generating initial tree
initialRes := genChanges(changeCreator, params)
err = storage.TransactionAdd(initialRes.changes, initialRes.heads)
err = storage.AddRawChangesSetHeads(initialRes.changes, initialRes.heads)
require.NoError(t, err)
deps := fixtureDeps{
aclList: aclList,
@ -183,3 +183,179 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot
}
}
}
func TestTreeStorageHasExtraChanges(t *testing.T) {
var (
rnd = rand.New(rand.NewSource(time.Now().Unix()))
levels = 20
perLevel = 40
)
// simulating cases where one peer has some extra changes saved in storage
// and checking that this will not break the sync
t.Run("tree storage has extra simple", func(t *testing.T) {
testTreeStorageHasExtra(t, levels, perLevel, false, func() bool {
return false
})
testTreeStorageHasExtra(t, levels, perLevel, false, func() bool {
return rnd.Intn(10) > 5
})
testTreeStorageHasExtra(t, levels, perLevel, true, func() bool {
return false
})
testTreeStorageHasExtra(t, levels, perLevel, true, func() bool {
return rnd.Intn(10) > 5
})
})
t.Run("tree storage has extra three parts", func(t *testing.T) {
testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool {
return false
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool {
return rnd.Intn(10) > 5
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool {
return false
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool {
return rnd.Intn(10) > 5
})
})
}
func testTreeStorageHasExtra(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: levels,
perLevel: perLevel,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: isSnapshot,
hasData: hasData,
}
deps := fixtureDeps{
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
treeBuilder: builder,
}
fx := newProtocolFixture(t, spaceId, deps)
// generating initial tree
initialRes := genChanges(changeCreator, params)
fx.run(t)
// adding some changes to store, but without updating heads
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddRawChangesSetHeads(initialRes.changes, oldHeads)
// sending those changes to other peer
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: initialRes.heads,
RawChanges: initialRes.changes,
})
time.Sleep(50 * time.Millisecond)
// here we want that the saved changes in storage should not affect the sync protocol
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.True(t, slice.UnsortedEquals(initialRes.heads, firstHeads))
}
func testTreeStorageHasExtraThreeParts(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: levels,
perLevel: perLevel,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: isSnapshot,
hasData: hasData,
}
deps := fixtureDeps{
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
treeBuilder: builder,
}
fx := newProtocolFixture(t, spaceId, deps)
// generating parts
firstPart := genChanges(changeCreator, params)
params.startIdx = levels
params.snapshotId = firstPart.snapshotId
params.prevHeads = firstPart.heads
secondPart := genChanges(changeCreator, params)
params.startIdx = levels * 2
params.snapshotId = secondPart.snapshotId
params.prevHeads = secondPart.heads
thirdPart := genChanges(changeCreator, params)
// adding part1 to first peer and saving part2 and part3 in its storage
res, _ := fx.handlers["peer1"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: firstPart.heads,
RawChanges: firstPart.changes,
})
require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads))
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddRawChangesSetHeads(secondPart.changes, oldHeads)
store.AddRawChangesSetHeads(thirdPart.changes, oldHeads)
var peer2Initial []*treechangeproto.RawTreeChangeWithId
peer2Initial = append(peer2Initial, firstPart.changes...)
peer2Initial = append(peer2Initial, secondPart.changes...)
// adding part1 and part2 to second peer
res, _ = fx.handlers["peer2"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: secondPart.heads,
RawChanges: peer2Initial,
})
require.True(t, slice.UnsortedEquals(res.Heads, secondPart.heads))
fx.run(t)
// sending part3 changes to other peer
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: thirdPart.heads,
RawChanges: thirdPart.changes,
})
time.Sleep(50 * time.Millisecond)
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.True(t, slice.UnsortedEquals(thirdPart.heads, firstHeads))
}

View File

@ -76,12 +76,15 @@ type BuildDeps struct {
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
remoteGetter := treeRemoteGetter{treeId: id, deps: deps}
deps.TreeStorage, err = remoteGetter.getTree(ctx)
var (
remoteGetter = treeRemoteGetter{treeId: id, deps: deps}
isRemote bool
)
deps.TreeStorage, isRemote, err = remoteGetter.getTree(ctx)
if err != nil {
return
}
return buildSyncTree(ctx, true, deps)
return buildSyncTree(ctx, isRemote, deps)
}
func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) {
@ -92,7 +95,7 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo
return buildSyncTree(ctx, true, deps)
}
func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t SyncTree, err error) {
func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t SyncTree, err error) {
objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList)
if err != nil {
return
@ -113,7 +116,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
syncTree.afterBuild()
syncTree.Unlock()
if isFirstBuild {
if sendUpdate {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree
syncTree.syncClient.Broadcast(ctx, headUpdate)

View File

@ -93,7 +93,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
defer func() {
if err != nil {
log.With(zap.Error(err)).Debug("head update finished with error")
log.ErrorCtx(ctx, "head update finished with error", zap.Error(err))
} else if fullRequest != nil {
cnt := fullRequest.Content.GetFullSyncRequest()
log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes)))
@ -168,7 +168,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
defer func() {
if err != nil {
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err))
s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
return
} else if fullResponse != nil {
@ -212,7 +212,7 @@ func (s *syncTreeHandler) handleFullSyncResponse(
defer func() {
if err != nil {
log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed")
log.ErrorCtx(ctx, "full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "full sync response succeeded")
}

View File

@ -90,12 +90,11 @@ Loop:
}
}
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) {
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) {
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
if err == nil {
return
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
@ -109,6 +108,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
return
}
isRemote = true
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
if err != nil {
return
@ -139,5 +139,6 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
return
}
// now we are sure that we can save it to the storage
return t.deps.SpaceStorage.CreateTreeStorage(payload)
treeStorage, err = t.deps.SpaceStorage.CreateTreeStorage(payload)
return
}

View File

@ -13,13 +13,21 @@ type InMemoryTreeStorage struct {
root *treechangeproto.RawTreeChangeWithId
heads []string
Changes map[string]*treechangeproto.RawTreeChangeWithId
addErr error
sync.RWMutex
}
func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
func (t *InMemoryTreeStorage) SetReturnErrorOnAdd(err error) {
t.addErr = err
}
func (t *InMemoryTreeStorage) AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
t.RLock()
defer t.RUnlock()
if t.addErr != nil {
return t.addErr
}
for _, ch := range changes {
t.Changes[ch.Id] = ch

View File

@ -49,6 +49,20 @@ func (mr *MockTreeStorageMockRecorder) AddRawChange(arg0 interface{}) *gomock.Ca
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChange", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChange), arg0)
}
// AddRawChangesSetHeads mocks base method.
func (m *MockTreeStorage) AddRawChangesSetHeads(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRawChangesSetHeads", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawChangesSetHeads indicates an expected call of AddRawChangesSetHeads.
func (mr *MockTreeStorageMockRecorder) AddRawChangesSetHeads(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesSetHeads", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChangesSetHeads), arg0, arg1)
}
// Delete mocks base method.
func (m *MockTreeStorage) Delete() error {
m.ctrl.T.Helper()
@ -150,17 +164,3 @@ func (mr *MockTreeStorageMockRecorder) SetHeads(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeads", reflect.TypeOf((*MockTreeStorage)(nil).SetHeads), arg0)
}
// TransactionAdd mocks base method.
func (m *MockTreeStorage) TransactionAdd(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TransactionAdd", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// TransactionAdd indicates an expected call of TransactionAdd.
func (mr *MockTreeStorageMockRecorder) TransactionAdd(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionAdd", reflect.TypeOf((*MockTreeStorage)(nil).TransactionAdd), arg0, arg1)
}

View File

@ -31,7 +31,7 @@ type TreeStorage interface {
Heads() ([]string, error)
SetHeads(heads []string) error
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
TransactionAdd(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
HasChange(ctx context.Context, id string) (bool, error)

View File

@ -385,6 +385,9 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
_ = s.handleQueue.CloseThread(threadId)
}()
}
if hm.PeerCtx == nil {
hm.PeerCtx = ctx
}
err = s.handleQueue.Add(ctx, threadId, hm)
if err == mb.ErrOverflowed {
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))