Add sync protocol tests for empty data tree
This commit is contained in:
parent
95cb364316
commit
61454b7405
@ -8,6 +8,7 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/any-sync/util/slice"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"math/rand"
|
||||
"testing"
|
||||
@ -30,7 +31,8 @@ func TestEmptyClientGetsFullHistory(t *testing.T) {
|
||||
"peer1": []string{"peer2"},
|
||||
"peer2": []string{"peer1"},
|
||||
},
|
||||
emptyTrees: []string{"peer2"},
|
||||
emptyTrees: []string{"peer2"},
|
||||
treeBuilder: objecttree.BuildTestableTree,
|
||||
}
|
||||
fx := newProtocolFixture(t, spaceId, deps)
|
||||
fx.run(t)
|
||||
@ -60,23 +62,32 @@ func TestEmptyClientGetsFullHistory(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTreeFuzzyMerge(t *testing.T) {
|
||||
testTreeFuzzyMerge(t, true)
|
||||
testTreeFuzzyMerge(t, false)
|
||||
}
|
||||
|
||||
func testTreeFuzzyMerge(t *testing.T, withData bool) {
|
||||
var (
|
||||
rnd = rand.New(rand.NewSource(time.Now().Unix()))
|
||||
levels = 20
|
||||
perLevel = 20
|
||||
rounds = 10
|
||||
)
|
||||
testTreeMerge(t, levels, perLevel, func() bool {
|
||||
return true
|
||||
})
|
||||
testTreeMerge(t, levels, perLevel, func() bool {
|
||||
return false
|
||||
})
|
||||
testTreeMerge(t, levels, perLevel, func() bool {
|
||||
return rnd.Intn(10) > 8
|
||||
})
|
||||
for i := 0; i < rounds; i++ {
|
||||
testTreeMerge(t, levels, perLevel, withData, func() bool {
|
||||
return true
|
||||
})
|
||||
testTreeMerge(t, levels, perLevel, withData, func() bool {
|
||||
return false
|
||||
})
|
||||
testTreeMerge(t, levels, perLevel, withData, func() bool {
|
||||
return rnd.Intn(10) > 8
|
||||
})
|
||||
levels += 2
|
||||
}
|
||||
}
|
||||
|
||||
func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
|
||||
treeId := "treeId"
|
||||
spaceId := "spaceId"
|
||||
keys, err := accountdata.NewRandom()
|
||||
@ -84,15 +95,20 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
|
||||
storage := createStorage(treeId, aclList)
|
||||
changeCreator := objecttree.NewMockChangeCreator()
|
||||
builder := objecttree.BuildTestableTree
|
||||
if hasData {
|
||||
builder = objecttree.BuildEmptyDataTestableTree
|
||||
}
|
||||
params := genParams{
|
||||
prefix: "peer1",
|
||||
aclId: aclList.Id(),
|
||||
startIdx: 0,
|
||||
levels: levels,
|
||||
perLevel: perlevel,
|
||||
perLevel: perLevel,
|
||||
snapshotId: treeId,
|
||||
prevHeads: []string{treeId},
|
||||
isSnapshot: isSnapshot,
|
||||
hasData: hasData,
|
||||
}
|
||||
// generating initial tree
|
||||
initialRes := genChanges(changeCreator, params)
|
||||
@ -106,7 +122,8 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
"peer2": []string{"node1"},
|
||||
"node1": []string{"peer1", "peer2"},
|
||||
},
|
||||
emptyTrees: []string{"peer2", "node1"},
|
||||
emptyTrees: []string{"peer2", "node1"},
|
||||
treeBuilder: builder,
|
||||
}
|
||||
fx := newProtocolFixture(t, spaceId, deps)
|
||||
fx.run(t)
|
||||
@ -114,7 +131,7 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
NewHeads: initialRes.heads,
|
||||
RawChanges: initialRes.changes,
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
firstHeads := fx.handlers["peer1"].tree().Heads()
|
||||
secondHeads := fx.handlers["peer2"].tree().Heads()
|
||||
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
|
||||
@ -123,11 +140,12 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
aclId: aclList.Id(),
|
||||
startIdx: levels,
|
||||
levels: levels,
|
||||
perLevel: perlevel,
|
||||
perLevel: perLevel,
|
||||
|
||||
snapshotId: initialRes.snapshotId,
|
||||
prevHeads: initialRes.heads,
|
||||
isSnapshot: isSnapshot,
|
||||
hasData: hasData,
|
||||
}
|
||||
// generating different additions to the tree for different peers
|
||||
peer1Res := genChanges(changeCreator, params)
|
||||
@ -141,7 +159,7 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
NewHeads: peer2Res.heads,
|
||||
RawChanges: peer2Res.changes,
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
fx.stop()
|
||||
firstTree := fx.handlers["peer1"].tree()
|
||||
secondTree := fx.handlers["peer2"].tree()
|
||||
@ -152,4 +170,16 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
|
||||
firstStorage := firstTree.Storage().(*treestorage.InMemoryTreeStorage)
|
||||
secondStorage := secondTree.Storage().(*treestorage.InMemoryTreeStorage)
|
||||
require.True(t, firstStorage.Equal(secondStorage))
|
||||
if hasData {
|
||||
for _, ch := range secondStorage.Changes {
|
||||
if ch.Id == treeId {
|
||||
continue
|
||||
}
|
||||
unmarshallRaw := &treechangeproto.RawTreeChange{}
|
||||
proto.Unmarshal(ch.RawChange, unmarshallRaw)
|
||||
treeCh := &treechangeproto.TreeChange{}
|
||||
proto.Unmarshal(unmarshallRaw.Payload, treeCh)
|
||||
require.Equal(t, ch.Id, string(treeCh.ChangesData))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,6 +90,7 @@ type testSyncHandler struct {
|
||||
aclList list.AclList
|
||||
log *messageLog
|
||||
syncClient objectsync.SyncClient
|
||||
builder objecttree.BuildObjectTreeFunc
|
||||
}
|
||||
|
||||
// createSyncHandler creates a sync handler when a tree is already created
|
||||
@ -105,7 +106,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
|
||||
}
|
||||
|
||||
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
|
||||
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler {
|
||||
func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler {
|
||||
factory := objectsync.NewRequestFactory()
|
||||
syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
|
||||
|
||||
@ -116,6 +117,7 @@ func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *m
|
||||
aclList: aclList,
|
||||
log: log,
|
||||
syncClient: syncClient,
|
||||
builder: builder,
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,7 +150,7 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re
|
||||
}
|
||||
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
|
||||
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil)
|
||||
tree, err := createTestTree(h.aclList, treeStorage)
|
||||
tree, err := h.builder(treeStorage, h.aclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -304,6 +306,7 @@ type fixtureDeps struct {
|
||||
initStorage *treestorage.InMemoryTreeStorage
|
||||
connectionMap map[string][]string
|
||||
emptyTrees []string
|
||||
treeBuilder objecttree.BuildObjectTreeFunc
|
||||
}
|
||||
|
||||
// protocolFixture is the test environment for sync protocol tests
|
||||
@ -326,10 +329,10 @@ func newProtocolFixture(t *testing.T, spaceId string, deps fixtureDeps) *protoco
|
||||
for peerId := range deps.connectionMap {
|
||||
var handler *testSyncHandler
|
||||
if slices.Contains(deps.emptyTrees, peerId) {
|
||||
handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log)
|
||||
handler = createEmptySyncHandler(peerId, spaceId, deps.treeBuilder, deps.aclList, log)
|
||||
} else {
|
||||
stCopy := deps.initStorage.Copy()
|
||||
testTree, err := createTestTree(deps.aclList, stCopy)
|
||||
testTree, err := deps.treeBuilder(stCopy, deps.aclList)
|
||||
require.NoError(t, err)
|
||||
handler = createSyncHandler(peerId, spaceId, testTree, log)
|
||||
}
|
||||
@ -373,6 +376,7 @@ type genParams struct {
|
||||
snapshotId string
|
||||
prevHeads []string
|
||||
isSnapshot func() bool
|
||||
hasData bool
|
||||
}
|
||||
|
||||
// genResult is the result of genChanges
|
||||
@ -399,7 +403,11 @@ func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res ge
|
||||
)
|
||||
newChange := func(isSnapshot bool, idx int, prevIds []string) string {
|
||||
newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, idx)
|
||||
newCh := creator.CreateRaw(newId, params.aclId, snapshotId, isSnapshot, prevIds...)
|
||||
var data []byte
|
||||
if params.hasData {
|
||||
data = []byte(newId)
|
||||
}
|
||||
newCh := creator.CreateRawWithData(newId, params.aclId, snapshotId, isSnapshot, data, prevIds...)
|
||||
res.changes = append(res.changes, newCh)
|
||||
return newId
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@ type InMemoryTreeStorage struct {
|
||||
id string
|
||||
root *treechangeproto.RawTreeChangeWithId
|
||||
heads []string
|
||||
changes map[string]*treechangeproto.RawTreeChangeWithId
|
||||
Changes map[string]*treechangeproto.RawTreeChangeWithId
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
@ -22,7 +22,7 @@ func (t *InMemoryTreeStorage) TransactionAdd(changes []*treechangeproto.RawTreeC
|
||||
defer t.RUnlock()
|
||||
|
||||
for _, ch := range changes {
|
||||
t.changes[ch.Id] = ch
|
||||
t.Changes[ch.Id] = ch
|
||||
}
|
||||
t.heads = append(t.heads[:0], heads...)
|
||||
return nil
|
||||
@ -42,13 +42,13 @@ func NewInMemoryTreeStorage(
|
||||
id: root.Id,
|
||||
root: root,
|
||||
heads: append([]string(nil), heads...),
|
||||
changes: allChanges,
|
||||
Changes: allChanges,
|
||||
RWMutex: sync.RWMutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, error) {
|
||||
_, exists := t.changes[id]
|
||||
_, exists := t.Changes[id]
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
@ -81,14 +81,14 @@ func (t *InMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChange
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
// TODO: better to do deep copy
|
||||
t.changes[change.Id] = change
|
||||
t.Changes[change.Id] = change
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
if res, exists := t.changes[changeId]; exists {
|
||||
if res, exists := t.Changes[changeId]; exists {
|
||||
return res, nil
|
||||
}
|
||||
return nil, fmt.Errorf("could not get change with id: %s", changeId)
|
||||
@ -100,7 +100,7 @@ func (t *InMemoryTreeStorage) Delete() error {
|
||||
|
||||
func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage {
|
||||
var changes []*treechangeproto.RawTreeChangeWithId
|
||||
for _, ch := range t.changes {
|
||||
for _, ch := range t.Changes {
|
||||
changes = append(changes, ch)
|
||||
}
|
||||
other, _ := NewInMemoryTreeStorage(t.root, t.heads, changes)
|
||||
@ -111,11 +111,11 @@ func (t *InMemoryTreeStorage) Equal(other *InMemoryTreeStorage) bool {
|
||||
if !slice.UnsortedEquals(t.heads, other.heads) {
|
||||
return false
|
||||
}
|
||||
if len(t.changes) != len(other.changes) {
|
||||
if len(t.Changes) != len(other.Changes) {
|
||||
return false
|
||||
}
|
||||
for k, v := range t.changes {
|
||||
if otherV, exists := other.changes[k]; exists {
|
||||
for k, v := range t.Changes {
|
||||
if otherV, exists := other.Changes[k]; exists {
|
||||
if otherV.Id == v.Id {
|
||||
continue
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user