Merge branch 'main' of github.com:anytypeio/any-sync into GO-802-metrics

This commit is contained in:
Sergey Cherepanov 2023-05-01 12:28:41 +02:00
commit 5fdfb6c8e4
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
13 changed files with 213 additions and 85 deletions

View File

@ -52,18 +52,6 @@ func (c *nonVerifiableChangeBuilder) Marshall(ch *Change) (raw *treechangeproto.
return c.ChangeBuilder.Marshall(ch) return c.ChangeBuilder.Marshall(ch)
} }
type emptyDataChangeBuilder struct {
ChangeBuilder
}
func (c *emptyDataChangeBuilder) Build(payload BuilderContent) (ch *Change, raw *treechangeproto.RawTreeChangeWithId, err error) {
panic("should not be called")
}
func (c *emptyDataChangeBuilder) Marshall(ch *Change) (raw *treechangeproto.RawTreeChangeWithId, err error) {
panic("should not be called")
}
type ChangeBuilder interface { type ChangeBuilder interface {
Unmarshall(rawIdChange *treechangeproto.RawTreeChangeWithId, verify bool) (ch *Change, err error) Unmarshall(rawIdChange *treechangeproto.RawTreeChangeWithId, verify bool) (ch *Change, err error)
Build(payload BuilderContent) (ch *Change, raw *treechangeproto.RawTreeChangeWithId, err error) Build(payload BuilderContent) (ch *Change, raw *treechangeproto.RawTreeChangeWithId, err error)
@ -79,18 +67,6 @@ type changeBuilder struct {
newChange newChangeFunc newChange newChangeFunc
} }
func NewEmptyDataBuilder(keys crypto.KeyStorage, rootChange *treechangeproto.RawTreeChangeWithId) ChangeBuilder {
return &emptyDataChangeBuilder{&changeBuilder{
rootChange: rootChange,
keys: keys,
newChange: func(id string, identity crypto.PubKey, ch *treechangeproto.TreeChange, signature []byte) *Change {
c := NewChange(id, identity, ch, nil)
c.Data = nil
return c
},
}}
}
func NewChangeBuilder(keys crypto.KeyStorage, rootChange *treechangeproto.RawTreeChangeWithId) ChangeBuilder { func NewChangeBuilder(keys crypto.KeyStorage, rootChange *treechangeproto.RawTreeChangeWithId) ChangeBuilder {
return &changeBuilder{keys: keys, rootChange: rootChange, newChange: NewChange} return &changeBuilder{keys: keys, rootChange: rootChange, newChange: NewChange}
} }

View File

@ -473,6 +473,11 @@ func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesA
var added []*treechangeproto.RawTreeChangeWithId var added []*treechangeproto.RawTreeChangeWithId
added, err = getAddedChanges(treeChangesAdded) added, err = getAddedChanges(treeChangesAdded)
if !ot.treeBuilder.keepInMemoryData {
for _, ch := range treeChangesAdded {
ch.Data = nil
}
}
if err != nil { if err != nil {
return return
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"testing" "testing"
@ -27,7 +28,7 @@ func prepareAclList(t *testing.T) list.AclList {
return aclList return aclList
} }
func prepareTreeDeps(aclList list.AclList) (*MockChangeCreator, objectTreeDeps) { func prepareHistoryTreeDeps(aclList list.AclList) (*MockChangeCreator, objectTreeDeps) {
changeCreator := NewMockChangeCreator() changeCreator := NewMockChangeCreator()
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id) treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
root, _ := treeStorage.Root() root, _ := treeStorage.Root()
@ -36,7 +37,7 @@ func prepareTreeDeps(aclList list.AclList) (*MockChangeCreator, objectTreeDeps)
} }
deps := objectTreeDeps{ deps := objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(treeStorage, changeBuilder), treeBuilder: newTreeBuilder(true, treeStorage, changeBuilder),
treeStorage: treeStorage, treeStorage: treeStorage,
rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder),
validator: &noOpTreeValidator{}, validator: &noOpTreeValidator{},
@ -46,9 +47,26 @@ func prepareTreeDeps(aclList list.AclList) (*MockChangeCreator, objectTreeDeps)
} }
func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext { func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext {
return prepareContext(t, aclList, BuildTestableTree, nil)
}
func prepareEmptyDataTreeContext(t *testing.T, aclList list.AclList, additionalChanges func(changeCreator *MockChangeCreator) RawChangesPayload) testTreeContext {
return prepareContext(t, aclList, BuildEmptyDataTestableTree, additionalChanges)
}
func prepareContext(
t *testing.T,
aclList list.AclList,
objTreeBuilder BuildObjectTreeFunc,
additionalChanges func(changeCreator *MockChangeCreator) RawChangesPayload) testTreeContext {
changeCreator := NewMockChangeCreator() changeCreator := NewMockChangeCreator()
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id) treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
objTree, err := BuildTestableTree(aclList, treeStorage) if additionalChanges != nil {
payload := additionalChanges(changeCreator)
err := treeStorage.TransactionAdd(payload.RawChanges, payload.NewHeads)
require.NoError(t, err)
}
objTree, err := objTreeBuilder(treeStorage, aclList)
require.NoError(t, err, "building tree should be without error") require.NoError(t, err, "building tree should be without error")
// check tree iterate // check tree iterate
@ -58,7 +76,9 @@ func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext {
return true return true
}) })
require.NoError(t, err, "iterate should be without error") require.NoError(t, err, "iterate should be without error")
assert.Equal(t, []string{"0"}, iterChangesId) if additionalChanges == nil {
assert.Equal(t, []string{"0"}, iterChangesId)
}
return testTreeContext{ return testTreeContext{
aclList: aclList, aclList: aclList,
treeStorage: treeStorage, treeStorage: treeStorage,
@ -266,6 +286,90 @@ func TestObjectTree(t *testing.T) {
assert.Equal(t, true, objTree.(*objectTree).snapshotPathIsActual()) assert.Equal(t, true, objTree.(*objectTree).snapshotPathIsActual())
}) })
t.Run("test empty data tree", func(t *testing.T) {
t.Run("empty tree add", func(t *testing.T) {
ctx := prepareEmptyDataTreeContext(t, aclList, nil)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
rawChangesFirst := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRawWithData("1", aclList.Head().Id, "0", false, []byte("1"), "0"),
changeCreator.CreateRawWithData("2", aclList.Head().Id, "0", false, []byte("2"), "1"),
changeCreator.CreateRawWithData("3", aclList.Head().Id, "0", false, []byte("3"), "2"),
}
rawChangesSecond := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRawWithData("4", aclList.Head().Id, "0", false, []byte("4"), "2"),
changeCreator.CreateRawWithData("5", aclList.Head().Id, "0", false, []byte("5"), "1"),
changeCreator.CreateRawWithData("6", aclList.Head().Id, "0", false, []byte("6"), "3", "4", "5"),
}
// making them to be saved in unattached
_, err := objTree.AddRawChanges(context.Background(), RawChangesPayload{
NewHeads: []string{"6"},
RawChanges: rawChangesSecond,
})
require.NoError(t, err, "adding changes should be without error")
// attaching them
res, err := objTree.AddRawChanges(context.Background(), RawChangesPayload{
NewHeads: []string{"3"},
RawChanges: rawChangesFirst,
})
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "0", objTree.Root().Id)
require.Equal(t, []string{"6"}, objTree.Heads())
require.Equal(t, 6, len(res.Added))
// checking that added changes still have data
for _, ch := range res.Added {
unmarshallRaw := &treechangeproto.RawTreeChange{}
proto.Unmarshal(ch.RawChange, unmarshallRaw)
treeCh := &treechangeproto.TreeChange{}
proto.Unmarshal(unmarshallRaw.Payload, treeCh)
require.Equal(t, ch.Id, string(treeCh.ChangesData))
}
// checking that the tree doesn't have data in memory
err = objTree.IterateRoot(nil, func(change *Change) bool {
if change.Id == "0" {
return true
}
require.Nil(t, change.Data)
return true
})
})
t.Run("empty tree load", func(t *testing.T) {
ctx := prepareEmptyDataTreeContext(t, aclList, func(changeCreator *MockChangeCreator) RawChangesPayload {
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRawWithData("1", aclList.Head().Id, "0", false, []byte("1"), "0"),
changeCreator.CreateRawWithData("2", aclList.Head().Id, "0", false, []byte("2"), "1"),
changeCreator.CreateRawWithData("3", aclList.Head().Id, "0", false, []byte("3"), "2"),
changeCreator.CreateRawWithData("4", aclList.Head().Id, "0", false, []byte("4"), "2"),
changeCreator.CreateRawWithData("5", aclList.Head().Id, "0", false, []byte("5"), "1"),
changeCreator.CreateRawWithData("6", aclList.Head().Id, "0", false, []byte("6"), "3", "4", "5"),
}
return RawChangesPayload{NewHeads: []string{"6"}, RawChanges: rawChanges}
})
ctx.objTree.IterateRoot(nil, func(change *Change) bool {
if change.Id == "0" {
return true
}
require.Nil(t, change.Data)
return true
})
rawChanges, err := ctx.objTree.ChangesAfterCommonSnapshot([]string{"0"}, []string{"6"})
require.NoError(t, err)
for _, ch := range rawChanges {
unmarshallRaw := &treechangeproto.RawTreeChange{}
proto.Unmarshal(ch.RawChange, unmarshallRaw)
treeCh := &treechangeproto.TreeChange{}
proto.Unmarshal(unmarshallRaw.Payload, treeCh)
require.Equal(t, ch.Id, string(treeCh.ChangesData))
}
})
})
t.Run("changes from tree after common snapshot complex", func(t *testing.T) { t.Run("changes from tree after common snapshot complex", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList) ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator changeCreator := ctx.changeCreator
@ -489,7 +593,7 @@ func TestObjectTree(t *testing.T) {
}) })
t.Run("test history tree not include", func(t *testing.T) { t.Run("test history tree not include", func(t *testing.T) {
changeCreator, deps := prepareTreeDeps(aclList) changeCreator, deps := prepareHistoryTreeDeps(aclList)
rawChanges := []*treechangeproto.RawTreeChangeWithId{ rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"), changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"),
@ -520,7 +624,7 @@ func TestObjectTree(t *testing.T) {
}) })
t.Run("test history tree include", func(t *testing.T) { t.Run("test history tree include", func(t *testing.T) {
changeCreator, deps := prepareTreeDeps(aclList) changeCreator, deps := prepareHistoryTreeDeps(aclList)
rawChanges := []*treechangeproto.RawTreeChangeWithId{ rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"), changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"),
@ -551,7 +655,7 @@ func TestObjectTree(t *testing.T) {
}) })
t.Run("test history tree root", func(t *testing.T) { t.Run("test history tree root", func(t *testing.T) {
_, deps := prepareTreeDeps(aclList) _, deps := prepareHistoryTreeDeps(aclList)
hTree, err := buildHistoryTree(deps, HistoryTreeParams{ hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "0", BeforeId: "0",
IncludeBeforeId: true, IncludeBeforeId: true,

View File

@ -42,7 +42,7 @@ func verifiableTreeDeps(
treeStorage treestorage.TreeStorage, treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps { aclList list.AclList) objectTreeDeps {
changeBuilder := NewChangeBuilder(crypto.NewKeyStorage(), rootChange) changeBuilder := NewChangeBuilder(crypto.NewKeyStorage(), rootChange)
treeBuilder := newTreeBuilder(treeStorage, changeBuilder) treeBuilder := newTreeBuilder(true, treeStorage, changeBuilder)
return objectTreeDeps{ return objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
@ -57,8 +57,8 @@ func emptyDataTreeDeps(
rootChange *treechangeproto.RawTreeChangeWithId, rootChange *treechangeproto.RawTreeChangeWithId,
treeStorage treestorage.TreeStorage, treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps { aclList list.AclList) objectTreeDeps {
changeBuilder := NewEmptyDataBuilder(crypto.NewKeyStorage(), rootChange) changeBuilder := NewChangeBuilder(crypto.NewKeyStorage(), rootChange)
treeBuilder := newTreeBuilder(treeStorage, changeBuilder) treeBuilder := newTreeBuilder(false, treeStorage, changeBuilder)
return objectTreeDeps{ return objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
@ -74,7 +74,7 @@ func nonVerifiableTreeDeps(
treeStorage treestorage.TreeStorage, treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps { aclList list.AclList) objectTreeDeps {
changeBuilder := &nonVerifiableChangeBuilder{NewChangeBuilder(newMockKeyStorage(), rootChange)} changeBuilder := &nonVerifiableChangeBuilder{NewChangeBuilder(newMockKeyStorage(), rootChange)}
treeBuilder := newTreeBuilder(treeStorage, changeBuilder) treeBuilder := newTreeBuilder(true, treeStorage, changeBuilder)
return objectTreeDeps{ return objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
@ -107,14 +107,14 @@ func BuildEmptyDataObjectTree(treeStorage treestorage.TreeStorage, aclList list.
return buildObjectTree(deps) return buildObjectTree(deps)
} }
func BuildTestableTree(aclList list.AclList, treeStorage treestorage.TreeStorage) (ObjectTree, error) { func BuildTestableTree(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) {
root, _ := treeStorage.Root() root, _ := treeStorage.Root()
changeBuilder := &nonVerifiableChangeBuilder{ changeBuilder := &nonVerifiableChangeBuilder{
ChangeBuilder: NewChangeBuilder(newMockKeyStorage(), root), ChangeBuilder: NewChangeBuilder(newMockKeyStorage(), root),
} }
deps := objectTreeDeps{ deps := objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(treeStorage, changeBuilder), treeBuilder: newTreeBuilder(true, treeStorage, changeBuilder),
treeStorage: treeStorage, treeStorage: treeStorage,
rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder),
validator: &noOpTreeValidator{}, validator: &noOpTreeValidator{},
@ -124,14 +124,14 @@ func BuildTestableTree(aclList list.AclList, treeStorage treestorage.TreeStorage
return buildObjectTree(deps) return buildObjectTree(deps)
} }
func BuildEmptyDataTestableTree(aclList list.AclList, treeStorage treestorage.TreeStorage) (ObjectTree, error) { func BuildEmptyDataTestableTree(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) {
root, _ := treeStorage.Root() root, _ := treeStorage.Root()
changeBuilder := &nonVerifiableChangeBuilder{ changeBuilder := &nonVerifiableChangeBuilder{
ChangeBuilder: NewEmptyDataBuilder(newMockKeyStorage(), root), ChangeBuilder: NewChangeBuilder(newMockKeyStorage(), root),
} }
deps := objectTreeDeps{ deps := objectTreeDeps{
changeBuilder: changeBuilder, changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(treeStorage, changeBuilder), treeBuilder: newTreeBuilder(false, treeStorage, changeBuilder),
treeStorage: treeStorage, treeStorage: treeStorage,
rawChangeLoader: newStorageLoader(treeStorage, changeBuilder), rawChangeLoader: newStorageLoader(treeStorage, changeBuilder),
validator: &noOpTreeValidator{}, validator: &noOpTreeValidator{},

View File

@ -89,11 +89,15 @@ func (c *MockChangeCreator) CreateRoot(id, aclId string) *treechangeproto.RawTre
} }
func (c *MockChangeCreator) CreateRaw(id, aclId, snapshotId string, isSnapshot bool, prevIds ...string) *treechangeproto.RawTreeChangeWithId { func (c *MockChangeCreator) CreateRaw(id, aclId, snapshotId string, isSnapshot bool, prevIds ...string) *treechangeproto.RawTreeChangeWithId {
return c.CreateRawWithData(id, aclId, snapshotId, isSnapshot, nil, prevIds...)
}
func (c *MockChangeCreator) CreateRawWithData(id, aclId, snapshotId string, isSnapshot bool, data []byte, prevIds ...string) *treechangeproto.RawTreeChangeWithId {
aclChange := &treechangeproto.TreeChange{ aclChange := &treechangeproto.TreeChange{
TreeHeadIds: prevIds, TreeHeadIds: prevIds,
AclHeadId: aclId, AclHeadId: aclId,
SnapshotBaseId: snapshotId, SnapshotBaseId: snapshotId,
ChangesData: nil, ChangesData: data,
IsSnapshot: isSnapshot, IsSnapshot: isSnapshot,
} }
res, _ := aclChange.Marshal() res, _ := aclChange.Marshal()

View File

@ -21,18 +21,20 @@ type treeBuilder struct {
treeStorage treestorage.TreeStorage treeStorage treestorage.TreeStorage
builder ChangeBuilder builder ChangeBuilder
cache map[string]*Change cache map[string]*Change
tree *Tree tree *Tree
keepInMemoryData bool
// buffers // buffers
idStack []string idStack []string
loadBuffer []*Change loadBuffer []*Change
} }
func newTreeBuilder(storage treestorage.TreeStorage, builder ChangeBuilder) *treeBuilder { func newTreeBuilder(keepData bool, storage treestorage.TreeStorage, builder ChangeBuilder) *treeBuilder {
return &treeBuilder{ return &treeBuilder{
treeStorage: storage, treeStorage: storage,
builder: builder, builder: builder,
keepInMemoryData: keepData,
} }
} }
@ -163,6 +165,9 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !tb.keepInMemoryData {
ch.Data = nil
}
tb.cache[id] = ch tb.cache[id] = ch
return ch, nil return ch, nil

View File

@ -8,6 +8,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/util/slice" "github.com/anytypeio/any-sync/util/slice"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"math/rand" "math/rand"
"testing" "testing"
@ -30,7 +31,8 @@ func TestEmptyClientGetsFullHistory(t *testing.T) {
"peer1": []string{"peer2"}, "peer1": []string{"peer2"},
"peer2": []string{"peer1"}, "peer2": []string{"peer1"},
}, },
emptyTrees: []string{"peer2"}, emptyTrees: []string{"peer2"},
treeBuilder: objecttree.BuildTestableTree,
} }
fx := newProtocolFixture(t, spaceId, deps) fx := newProtocolFixture(t, spaceId, deps)
fx.run(t) fx.run(t)
@ -60,6 +62,11 @@ func TestEmptyClientGetsFullHistory(t *testing.T) {
} }
func TestTreeFuzzyMerge(t *testing.T) { func TestTreeFuzzyMerge(t *testing.T) {
testTreeFuzzyMerge(t, true)
testTreeFuzzyMerge(t, false)
}
func testTreeFuzzyMerge(t *testing.T, withData bool) {
var ( var (
rnd = rand.New(rand.NewSource(time.Now().Unix())) rnd = rand.New(rand.NewSource(time.Now().Unix()))
levels = 20 levels = 20
@ -67,20 +74,20 @@ func TestTreeFuzzyMerge(t *testing.T) {
rounds = 10 rounds = 10
) )
for i := 0; i < rounds; i++ { for i := 0; i < rounds; i++ {
testTreeMerge(t, levels, perLevel, func() bool { testTreeMerge(t, levels, perLevel, withData, func() bool {
return true return true
}) })
testTreeMerge(t, levels, perLevel, func() bool { testTreeMerge(t, levels, perLevel, withData, func() bool {
return false return false
}) })
testTreeMerge(t, levels, perLevel, func() bool { testTreeMerge(t, levels, perLevel, withData, func() bool {
return rnd.Intn(10) > 8 return rnd.Intn(10) > 8
}) })
levels += 2 levels += 2
} }
} }
func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) { func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId" treeId := "treeId"
spaceId := "spaceId" spaceId := "spaceId"
keys, err := accountdata.NewRandom() keys, err := accountdata.NewRandom()
@ -88,15 +95,20 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
aclList, err := list.NewTestDerivedAcl(spaceId, keys) aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList) storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator() changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{ params := genParams{
prefix: "peer1", prefix: "peer1",
aclId: aclList.Id(), aclId: aclList.Id(),
startIdx: 0, startIdx: 0,
levels: levels, levels: levels,
perLevel: perlevel, perLevel: perLevel,
snapshotId: treeId, snapshotId: treeId,
prevHeads: []string{treeId}, prevHeads: []string{treeId},
isSnapshot: isSnapshot, isSnapshot: isSnapshot,
hasData: hasData,
} }
// generating initial tree // generating initial tree
initialRes := genChanges(changeCreator, params) initialRes := genChanges(changeCreator, params)
@ -110,7 +122,8 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
"peer2": []string{"node1"}, "peer2": []string{"node1"},
"node1": []string{"peer1", "peer2"}, "node1": []string{"peer1", "peer2"},
}, },
emptyTrees: []string{"peer2", "node1"}, emptyTrees: []string{"peer2", "node1"},
treeBuilder: builder,
} }
fx := newProtocolFixture(t, spaceId, deps) fx := newProtocolFixture(t, spaceId, deps)
fx.run(t) fx.run(t)
@ -127,11 +140,12 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
aclId: aclList.Id(), aclId: aclList.Id(),
startIdx: levels, startIdx: levels,
levels: levels, levels: levels,
perLevel: perlevel, perLevel: perLevel,
snapshotId: initialRes.snapshotId, snapshotId: initialRes.snapshotId,
prevHeads: initialRes.heads, prevHeads: initialRes.heads,
isSnapshot: isSnapshot, isSnapshot: isSnapshot,
hasData: hasData,
} }
// generating different additions to the tree for different peers // generating different additions to the tree for different peers
peer1Res := genChanges(changeCreator, params) peer1Res := genChanges(changeCreator, params)
@ -156,4 +170,16 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
firstStorage := firstTree.Storage().(*treestorage.InMemoryTreeStorage) firstStorage := firstTree.Storage().(*treestorage.InMemoryTreeStorage)
secondStorage := secondTree.Storage().(*treestorage.InMemoryTreeStorage) secondStorage := secondTree.Storage().(*treestorage.InMemoryTreeStorage)
require.True(t, firstStorage.Equal(secondStorage)) require.True(t, firstStorage.Equal(secondStorage))
if hasData {
for _, ch := range secondStorage.Changes {
if ch.Id == treeId {
continue
}
unmarshallRaw := &treechangeproto.RawTreeChange{}
proto.Unmarshal(ch.RawChange, unmarshallRaw)
treeCh := &treechangeproto.TreeChange{}
proto.Unmarshal(unmarshallRaw.Payload, treeCh)
require.Equal(t, ch.Id, string(treeCh.ChangesData))
}
}
} }

View File

@ -116,9 +116,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if isFirstBuild { if isFirstBuild {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree // send to everybody, because everybody should know that the node or client got new tree
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil { syncTree.syncClient.Broadcast(ctx, headUpdate)
log.ErrorCtx(ctx, "broadcast error", zap.Error(e))
}
} }
return return
} }
@ -155,7 +153,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
} }
s.syncStatus.HeadsChange(s.Id(), res.Heads) s.syncStatus.HeadsChange(s.Id(), res.Heads)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.Broadcast(ctx, headUpdate) s.syncClient.Broadcast(ctx, headUpdate)
return return
} }
@ -182,7 +180,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
s.notifiable.UpdateHeads(s.Id(), res.Heads) s.notifiable.UpdateHeads(s.Id(), res.Heads)
} }
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.Broadcast(ctx, headUpdate) s.syncClient.Broadcast(ctx, headUpdate)
} }
return return
} }

View File

@ -73,7 +73,7 @@ func Test_BuildSyncTree(t *testing.T) {
updateListenerMock.EXPECT().Update(tr) updateListenerMock.EXPECT().Update(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload) res, err := tr.AddRawChanges(ctx, payload)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)
@ -95,7 +95,7 @@ func Test_BuildSyncTree(t *testing.T) {
updateListenerMock.EXPECT().Rebuild(tr) updateListenerMock.EXPECT().Rebuild(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload) res, err := tr.AddRawChanges(ctx, payload)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)
@ -133,7 +133,7 @@ func Test_BuildSyncTree(t *testing.T) {
Return(expectedRes, nil) Return(expectedRes, nil)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddContent(ctx, content) res, err := tr.AddContent(ctx, content)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)

View File

@ -90,6 +90,7 @@ type testSyncHandler struct {
aclList list.AclList aclList list.AclList
log *messageLog log *messageLog
syncClient objectsync.SyncClient syncClient objectsync.SyncClient
builder objecttree.BuildObjectTreeFunc
} }
// createSyncHandler creates a sync handler when a tree is already created // createSyncHandler creates a sync handler when a tree is already created
@ -105,7 +106,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
} }
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler { func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler {
factory := objectsync.NewRequestFactory() factory := objectsync.NewRequestFactory()
syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
@ -116,6 +117,7 @@ func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *m
aclList: aclList, aclList: aclList,
log: log, log: log,
syncClient: syncClient, syncClient: syncClient,
builder: builder,
} }
} }
@ -148,7 +150,7 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re
} }
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil) treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil)
tree, err := createTestTree(h.aclList, treeStorage) tree, err := h.builder(treeStorage, h.aclList)
if err != nil { if err != nil {
return return
} }
@ -296,7 +298,7 @@ func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage
} }
func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) { func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) {
return objecttree.BuildEmptyDataTestableTree(aclList, storage) return objecttree.BuildEmptyDataTestableTree(storage, aclList)
} }
type fixtureDeps struct { type fixtureDeps struct {
@ -304,6 +306,7 @@ type fixtureDeps struct {
initStorage *treestorage.InMemoryTreeStorage initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string connectionMap map[string][]string
emptyTrees []string emptyTrees []string
treeBuilder objecttree.BuildObjectTreeFunc
} }
// protocolFixture is the test environment for sync protocol tests // protocolFixture is the test environment for sync protocol tests
@ -326,10 +329,10 @@ func newProtocolFixture(t *testing.T, spaceId string, deps fixtureDeps) *protoco
for peerId := range deps.connectionMap { for peerId := range deps.connectionMap {
var handler *testSyncHandler var handler *testSyncHandler
if slices.Contains(deps.emptyTrees, peerId) { if slices.Contains(deps.emptyTrees, peerId) {
handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) handler = createEmptySyncHandler(peerId, spaceId, deps.treeBuilder, deps.aclList, log)
} else { } else {
stCopy := deps.initStorage.Copy() stCopy := deps.initStorage.Copy()
testTree, err := createTestTree(deps.aclList, stCopy) testTree, err := deps.treeBuilder(stCopy, deps.aclList)
require.NoError(t, err) require.NoError(t, err)
handler = createSyncHandler(peerId, spaceId, testTree, log) handler = createSyncHandler(peerId, spaceId, testTree, log)
} }
@ -373,6 +376,7 @@ type genParams struct {
snapshotId string snapshotId string
prevHeads []string prevHeads []string
isSnapshot func() bool isSnapshot func() bool
hasData bool
} }
// genResult is the result of genChanges // genResult is the result of genChanges
@ -399,7 +403,11 @@ func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res ge
) )
newChange := func(isSnapshot bool, idx int, prevIds []string) string { newChange := func(isSnapshot bool, idx int, prevIds []string) string {
newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, idx) newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, idx)
newCh := creator.CreateRaw(newId, params.aclId, snapshotId, isSnapshot, prevIds...) var data []byte
if params.hasData {
data = []byte(newId)
}
newCh := creator.CreateRawWithData(newId, params.aclId, snapshotId, isSnapshot, data, prevIds...)
res.changes = append(res.changes, newCh) res.changes = append(res.changes, newCh)
return newId return newId
} }

View File

@ -12,7 +12,7 @@ type InMemoryTreeStorage struct {
id string id string
root *treechangeproto.RawTreeChangeWithId root *treechangeproto.RawTreeChangeWithId
heads []string heads []string
changes map[string]*treechangeproto.RawTreeChangeWithId Changes map[string]*treechangeproto.RawTreeChangeWithId
sync.RWMutex sync.RWMutex
} }
@ -22,7 +22,7 @@ func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeC
defer t.RUnlock() defer t.RUnlock()
for _, ch := range changes { for _, ch := range changes {
t.changes[ch.Id] = ch t.Changes[ch.Id] = ch
} }
t.heads = append(t.heads[:0], heads...) t.heads = append(t.heads[:0], heads...)
return nil return nil
@ -42,13 +42,13 @@ func NewInMemoryTreeStorage(
id: root.Id, id: root.Id,
root: root, root: root,
heads: append([]string(nil), heads...), heads: append([]string(nil), heads...),
changes: allChanges, Changes: allChanges,
RWMutex: sync.RWMutex{}, RWMutex: sync.RWMutex{},
}, nil }, nil
} }
func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) { func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) {
_, exists := t.changes[id] _, exists := t.Changes[id]
return exists, nil return exists, nil
} }
@ -81,14 +81,14 @@ func (t *InMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChange
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// TODO: better to do deep copy // TODO: better to do deep copy
t.changes[change.Id] = change t.Changes[change.Id] = change
return nil return nil
} }
func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) { func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if res, exists := t.changes[changeId]; exists { if res, exists := t.Changes[changeId]; exists {
return res, nil return res, nil
} }
return nil, fmt.Errorf("could not get change with id: %s", changeId) return nil, fmt.Errorf("could not get change with id: %s", changeId)
@ -100,7 +100,7 @@ func (t *InMemoryTreeStorage) Delete() error {
func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage { func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage {
var changes []*treechangeproto.RawTreeChangeWithId var changes []*treechangeproto.RawTreeChangeWithId
for _, ch := range t.changes { for _, ch := range t.Changes {
changes = append(changes, ch) changes = append(changes, ch)
} }
other, _ := NewInMemoryTreeStorage(t.root, t.heads, changes) other, _ := NewInMemoryTreeStorage(t.root, t.heads, changes)
@ -111,11 +111,11 @@ func (t *InMemoryTreeStorage) Equal(other *InMemoryTreeStorage) bool {
if !slice.UnsortedEquals(t.heads, other.heads) { if !slice.UnsortedEquals(t.heads, other.heads) {
return false return false
} }
if len(t.changes) != len(other.changes) { if len(t.Changes) != len(other.Changes) {
return false return false
} }
for k, v := range t.changes { for k, v := range t.Changes {
if otherV, exists := other.changes[k]; exists { if otherV, exists := other.Changes[k]; exists {
if otherV.Id == v.Id { if otherV.Id == v.Id {
continue continue
} }

View File

@ -39,11 +39,9 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
} }
// Broadcast mocks base method. // Broadcast mocks base method.
func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) m.ctrl.Call(m, "Broadcast", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
} }
// Broadcast indicates an expected call of Broadcast. // Broadcast indicates an expected call of Broadcast.

View File

@ -4,11 +4,12 @@ import (
"context" "context"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap"
) )
type SyncClient interface { type SyncClient interface {
RequestFactory RequestFactory
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage)
SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
MessagePool() MessagePool MessagePool() MessagePool
@ -31,12 +32,15 @@ func NewSyncClient(
} }
} }
func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
if err != nil { if err != nil {
return return
} }
return s.messagePool.Broadcast(ctx, objMsg) err = s.messagePool.Broadcast(ctx, objMsg)
if err != nil {
log.DebugCtx(ctx, "broadcast error", zap.Error(err))
}
} }
func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {