Add test with random trees

This commit is contained in:
mcrakhman 2023-04-17 21:49:51 +02:00
parent 673925df84
commit 125d8b4626
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 518 additions and 391 deletions

View File

@ -8,325 +8,14 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/util/slice"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/exp/slices" "math/rand"
"sync"
"testing" "testing"
"time" "time"
) )
type processMsg struct { func TestEmptyClientGetsFullHistory(t *testing.T) {
msg *spacesyncproto.ObjectSyncMessage
senderId string
receiverId string
userMsg *objecttree.RawChangesPayload
}
type msgDescription struct {
name string
from string
to string
heads []string
changes []*treechangeproto.RawTreeChangeWithId
}
func (p *processMsg) description() (descr msgDescription) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(p.msg.Payload, unmarshalled)
if err != nil {
panic(err)
}
descr = msgDescription{
from: p.senderId,
to: p.receiverId,
}
switch {
case unmarshalled.GetContent().GetHeadUpdate() != nil:
cnt := unmarshalled.GetContent().GetHeadUpdate()
descr.name = "HeadUpdate"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetHeadUpdate().Changes
case unmarshalled.GetContent().GetFullSyncRequest() != nil:
cnt := unmarshalled.GetContent().GetFullSyncRequest()
descr.name = "FullSyncRequest"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncRequest().Changes
case unmarshalled.GetContent().GetFullSyncResponse() != nil:
cnt := unmarshalled.GetContent().GetFullSyncResponse()
descr.name = "FullSyncResponse"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncResponse().Changes
}
return
}
type messageLog struct {
batcher *mb.MB[processMsg]
}
func newMessageLog() *messageLog {
return &messageLog{batcher: mb.New[processMsg](0)}
}
func (m *messageLog) addMessage(msg processMsg) {
m.batcher.Add(context.Background(), msg)
}
type processSyncHandler struct {
synchandler.SyncHandler
batcher *mb.MB[processMsg]
peerId string
aclList list.AclList
log *messageLog
syncClient SyncClient
}
func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
if p.SyncHandler != nil {
return p.SyncHandler.HandleMessage(ctx, senderId, request)
}
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
if unmarshalled.Content.GetFullSyncResponse() == nil {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
var objMsg *spacesyncproto.ObjectSyncMessage
objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil {
return
}
return p.manager().SendPeer(context.Background(), senderId, objMsg)
}
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, fullSyncResponse.Heads, fullSyncResponse.Changes)
tree, err := createTestTree(p.aclList, treeStorage)
if err != nil {
return
}
netTree := &broadcastTree{
ObjectTree: tree,
SyncClient: p.syncClient,
}
p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus())
return
}
func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler {
batcher := mb.New[processMsg](0)
return &processSyncHandler{
SyncHandler: syncHandler,
batcher: batcher,
peerId: peerId,
}
}
func (p *processSyncHandler) manager() *processPeerManager {
if p.SyncHandler != nil {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
return p.syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
func (p *processSyncHandler) tree() *broadcastTree {
return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree)
}
func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) {
return p.batcher.Add(ctx, msg)
}
func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) {
p.batcher.Add(ctx, processMsg{userMsg: &changes})
}
func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
res, err := p.batcher.WaitOne(ctx)
if err != nil {
return
}
if res.userMsg != nil {
p.tree().Lock()
userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg)
require.NoError(t, err)
fmt.Println("user add result", userRes.Heads)
p.tree().Unlock()
continue
}
err = p.HandleMessage(ctx, res.senderId, res.msg)
if err != nil {
fmt.Println("error handling message", err.Error())
continue
}
}
}()
}
type processPeerManager struct {
peerId string
handlers map[string]*processSyncHandler
log *messageLog
}
func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager {
return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log}
}
func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) {
m.handlers[peerId] = handler
}
func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: peerId,
}
m.log.addMessage(pMsg)
return m.handlers[peerId].send(context.Background(), pMsg)
}
func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
for _, handler := range m.handlers {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: handler.peerId,
}
m.log.addMessage(pMsg)
handler.send(context.Background(), pMsg)
}
return
}
func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
panic("should not be called")
}
type broadcastTree struct {
objecttree.ObjectTree
SyncClient
}
func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
if err != nil {
return objecttree.AddResult{}, err
}
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
b.SyncClient.Broadcast(ctx, upd)
return res, nil
}
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
factory := GetRequestFactory()
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return newProcessSyncHandler(peerId, handler)
}
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
factory := GetRequestFactory()
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
batcher := mb.New[processMsg](0)
return &processSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
}
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id)
return st
}
func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) {
return objecttree.BuildTestableTree(aclList, storage)
}
type fixtureDeps struct {
aclList list.AclList
initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string
emptyTrees []string
}
type processFixture struct {
handlers map[string]*processSyncHandler
log *messageLog
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture {
var (
handlers = map[string]*processSyncHandler{}
log = newMessageLog()
wg = sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
)
for peerId := range deps.connectionMap {
var handler *processSyncHandler
if slices.Contains(deps.emptyTrees, peerId) {
handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log)
} else {
stCopy := deps.initStorage.Copy()
testTree, err := createTestTree(deps.aclList, stCopy)
require.NoError(t, err)
handler = createSyncHandler(peerId, spaceId, testTree, log)
}
handlers[peerId] = handler
}
for peerId, connectionMap := range deps.connectionMap {
handler := handlers[peerId]
manager := handler.manager()
for _, connectionId := range connectionMap {
manager.addHandler(connectionId, handlers[connectionId])
}
}
return &processFixture{
handlers: handlers,
log: log,
wg: &wg,
ctx: ctx,
cancel: cancel,
}
}
func (p *processFixture) run(t *testing.T) {
for _, handler := range p.handlers {
handler.run(p.ctx, t, p.wg)
}
}
func (p *processFixture) stop() {
p.cancel()
p.wg.Wait()
}
func TestSend_EmptyClientGetsFullHistory(t *testing.T) {
treeId := "treeId" treeId := "treeId"
spaceId := "spaceId" spaceId := "spaceId"
keys, err := accountdata.NewRandom() keys, err := accountdata.NewRandom()
@ -356,70 +45,22 @@ func TestSend_EmptyClientGetsFullHistory(t *testing.T) {
fx.stop() fx.stop()
firstHeads := fx.handlers["peer1"].tree().Heads() firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads() secondHeads := fx.handlers["peer2"].tree().Heads()
slices.Sort(firstHeads) require.True(t, slice.SortedEquals(firstHeads, secondHeads))
slices.Sort(secondHeads)
require.Equal(t, firstHeads, secondHeads)
require.Equal(t, []string{"1"}, firstHeads) require.Equal(t, []string{"1"}, firstHeads)
logMsgs := fx.log.batcher.GetAll() logMsgs := fx.log.batcher.GetAll()
var fullResponseMsg *processMsg var fullResponseMsg msgDescription
for _, msg := range logMsgs { for _, msg := range logMsgs {
descr := msg.description() descr := msg.description()
if descr.name == "FullSyncResponse" { if descr.name == "FullSyncResponse" {
fullResponseMsg = &msg fullResponseMsg = descr
} }
} }
require.NotNil(t, fullResponseMsg)
// that means that we got not only the last snapshot, but also the first one // that means that we got not only the last snapshot, but also the first one
require.Len(t, fullResponseMsg.description().changes, 2) require.Len(t, fullResponseMsg.changes, 2)
} }
func TestSimple_TwoPeers(t *testing.T) { func TestRandomTreeMerge(t *testing.T) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
require.NoError(t, err)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
deps := fixtureDeps{
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
}
fx := newProcessFixture(t, spaceId, deps)
fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil,
RawChanges: []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId),
},
})
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil,
RawChanges: []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("2", aclList.Id(), treeId, false, treeId),
},
})
time.Sleep(100 * time.Millisecond)
fx.stop()
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
slices.Sort(firstHeads)
slices.Sort(secondHeads)
require.Equal(t, firstHeads, secondHeads)
require.Equal(t, []string{"1", "2"}, firstHeads)
logMsgs := fx.log.batcher.GetAll()
for _, msg := range logMsgs {
fmt.Println(msg.description())
}
}
func TestSimple_ThreePeers(t *testing.T) {
treeId := "treeId" treeId := "treeId"
spaceId := "spaceId" spaceId := "spaceId"
keys, err := accountdata.NewRandom() keys, err := accountdata.NewRandom()
@ -427,6 +68,21 @@ func TestSimple_ThreePeers(t *testing.T) {
aclList, err := list.NewTestDerivedAcl(spaceId, keys) aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList) storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator() changeCreator := objecttree.NewMockChangeCreator()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: 10,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: func() bool {
return rnd.Intn(100) > 80
},
}
initialRes := genChanges(changeCreator, params)
err = storage.TransactionAdd(initialRes.changes, initialRes.heads)
require.NoError(t, err)
deps := fixtureDeps{ deps := fixtureDeps{
aclList: aclList, aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage), initStorage: storage.(*treestorage.InMemoryTreeStorage),
@ -435,31 +91,44 @@ func TestSimple_ThreePeers(t *testing.T) {
"peer2": []string{"node1"}, "peer2": []string{"node1"},
"node1": []string{"peer1", "peer2"}, "node1": []string{"peer1", "peer2"},
}, },
emptyTrees: []string{"peer2", "node1"},
} }
fx := newProcessFixture(t, spaceId, deps) fx := newProcessFixture(t, spaceId, deps)
fx.run(t) fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil, NewHeads: initialRes.heads,
RawChanges: []*treechangeproto.RawTreeChangeWithId{ RawChanges: initialRes.changes,
changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId),
},
}) })
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ time.Sleep(1000 * time.Millisecond)
NewHeads: nil,
RawChanges: []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("2", aclList.Id(), treeId, false, treeId),
},
})
time.Sleep(100 * time.Millisecond)
fx.stop()
firstHeads := fx.handlers["peer1"].tree().Heads() firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads() secondHeads := fx.handlers["peer2"].tree().Heads()
slices.Sort(firstHeads) require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
slices.Sort(secondHeads) params = genParams{
require.Equal(t, firstHeads, secondHeads) prefix: "peer1",
require.Equal(t, []string{"1", "2"}, firstHeads) aclId: aclList.Id(),
logMsgs := fx.log.batcher.GetAll() startIdx: 11,
for _, msg := range logMsgs { levels: 10,
fmt.Println(msg.description()) snapshotId: initialRes.snapshotId,
prevHeads: initialRes.heads,
isSnapshot: func() bool {
return rnd.Intn(100) > 80
},
} }
peer1Res := genChanges(changeCreator, params)
params.prefix = "peer2"
peer2Res := genChanges(changeCreator, params)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: peer1Res.heads,
RawChanges: peer1Res.changes,
})
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: peer2Res.heads,
RawChanges: peer2Res.changes,
})
time.Sleep(1000 * time.Millisecond)
fx.stop()
firstHeads = fx.handlers["peer1"].tree().Heads()
secondHeads = fx.handlers["peer2"].tree().Heads()
fmt.Println(firstHeads)
fmt.Println(secondHeads)
} }

View File

@ -83,13 +83,19 @@ func (s *syncTreeHandler) handleHeadUpdate(
objTree = s.objTree objTree = s.objTree
) )
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()), zap.String("spaceId", s.spaceId)) log := log.With(
zap.Strings("update heads", update.Heads),
zap.String("treeId", objTree.Id()),
zap.String("spaceId", s.spaceId),
zap.Int("len(update changes)", len(update.Changes)))
log.DebugCtx(ctx, "received head update message") log.DebugCtx(ctx, "received head update message")
defer func() { defer func() {
if err != nil { if err != nil {
log.With(zap.Error(err)).Debug("head update finished with error") log.With(zap.Error(err)).Debug("head update finished with error")
} else if fullRequest != nil { } else if fullRequest != nil {
cnt := fullRequest.Content.GetFullSyncRequest()
log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "sending full sync request") log.DebugCtx(ctx, "sending full sync request")
} else { } else {
if !isEmptyUpdate { if !isEmptyUpdate {
@ -151,19 +157,21 @@ func (s *syncTreeHandler) handleFullSyncRequest(
) )
log := log.With(zap.String("senderId", senderId), log := log.With(zap.String("senderId", senderId),
zap.Strings("heads", request.Heads), zap.Strings("request heads", request.Heads),
zap.String("treeId", s.objTree.Id()), zap.String("treeId", s.objTree.Id()),
zap.String("replyId", replyId), zap.String("replyId", replyId),
zap.String("spaceId", s.spaceId)) zap.String("spaceId", s.spaceId),
zap.Int("len(request changes)", len(request.Changes)))
log.DebugCtx(ctx, "received full sync request message") log.DebugCtx(ctx, "received full sync request message")
defer func() { defer func() {
if err != nil { if err != nil {
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error") log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId) s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
return return
} else if fullResponse != nil { } else if fullResponse != nil {
cnt := fullResponse.Content.GetFullSyncResponse()
log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "full sync response sent") log.DebugCtx(ctx, "full sync response sent")
} }
}() }()
@ -192,7 +200,11 @@ func (s *syncTreeHandler) handleFullSyncResponse(
var ( var (
objTree = s.objTree objTree = s.objTree
) )
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()), zap.String("spaceId", s.spaceId)) log := log.With(
zap.Strings("heads", response.Heads),
zap.String("treeId", s.objTree.Id()),
zap.String("spaceId", s.spaceId),
zap.Int("len(changes)", len(response.Changes)))
log.DebugCtx(ctx, "received full sync response message") log.DebugCtx(ctx, "received full sync response message")
defer func() { defer func() {

View File

@ -0,0 +1,446 @@
package synctree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/accountdata"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"math/rand"
"sync"
"testing"
"time"
)
type processMsg struct {
msg *spacesyncproto.ObjectSyncMessage
senderId string
receiverId string
userMsg *objecttree.RawChangesPayload
}
type msgDescription struct {
name string
from string
to string
heads []string
changes []*treechangeproto.RawTreeChangeWithId
}
func (p *processMsg) description() (descr msgDescription) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(p.msg.Payload, unmarshalled)
if err != nil {
panic(err)
}
descr = msgDescription{
from: p.senderId,
to: p.receiverId,
}
switch {
case unmarshalled.GetContent().GetHeadUpdate() != nil:
cnt := unmarshalled.GetContent().GetHeadUpdate()
descr.name = "HeadUpdate"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetHeadUpdate().Changes
case unmarshalled.GetContent().GetFullSyncRequest() != nil:
cnt := unmarshalled.GetContent().GetFullSyncRequest()
descr.name = "FullSyncRequest"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncRequest().Changes
case unmarshalled.GetContent().GetFullSyncResponse() != nil:
cnt := unmarshalled.GetContent().GetFullSyncResponse()
descr.name = "FullSyncResponse"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncResponse().Changes
}
return
}
type messageLog struct {
batcher *mb.MB[processMsg]
}
func newMessageLog() *messageLog {
return &messageLog{batcher: mb.New[processMsg](0)}
}
func (m *messageLog) addMessage(msg processMsg) {
m.batcher.Add(context.Background(), msg)
}
type processSyncHandler struct {
synchandler.SyncHandler
batcher *mb.MB[processMsg]
peerId string
aclList list.AclList
log *messageLog
syncClient SyncClient
}
func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
if p.SyncHandler != nil {
return p.SyncHandler.HandleMessage(ctx, senderId, request)
}
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
if unmarshalled.Content.GetFullSyncResponse() == nil {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
var objMsg *spacesyncproto.ObjectSyncMessage
objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil {
return
}
return p.manager().SendPeer(context.Background(), senderId, objMsg)
}
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil)
tree, err := createTestTree(p.aclList, treeStorage)
if err != nil {
return
}
netTree := &broadcastTree{
ObjectTree: tree,
SyncClient: p.syncClient,
}
res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: fullSyncResponse.Heads,
RawChanges: fullSyncResponse.Changes,
})
if err != nil {
return
}
p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus())
var objMsg *spacesyncproto.ObjectSyncMessage
newTreeRequest := GetRequestFactory().CreateHeadUpdate(netTree, res.Added)
objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil {
return
}
return p.manager().Broadcast(context.Background(), objMsg)
}
func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler {
batcher := mb.New[processMsg](0)
return &processSyncHandler{
SyncHandler: syncHandler,
batcher: batcher,
peerId: peerId,
}
}
func (p *processSyncHandler) manager() *processPeerManager {
if p.SyncHandler != nil {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
return p.syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
func (p *processSyncHandler) tree() *broadcastTree {
return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree)
}
func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) {
return p.batcher.Add(ctx, msg)
}
func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) {
p.batcher.Add(ctx, processMsg{userMsg: &changes})
}
func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
res, err := p.batcher.WaitOne(ctx)
if err != nil {
return
}
if res.userMsg != nil {
p.tree().Lock()
userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg)
require.NoError(t, err)
fmt.Println("user add result", userRes.Heads)
p.tree().Unlock()
continue
}
err = p.HandleMessage(ctx, res.senderId, res.msg)
if err != nil {
fmt.Println("error handling message", err.Error())
continue
}
}
}()
}
type processPeerManager struct {
peerId string
handlers map[string]*processSyncHandler
log *messageLog
}
func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager {
return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log}
}
func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) {
m.handlers[peerId] = handler
}
func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: peerId,
}
m.log.addMessage(pMsg)
return m.handlers[peerId].send(context.Background(), pMsg)
}
func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
for _, handler := range m.handlers {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: handler.peerId,
}
m.log.addMessage(pMsg)
handler.send(context.Background(), pMsg)
}
return
}
func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
panic("should not be called")
}
type broadcastTree struct {
objecttree.ObjectTree
SyncClient
}
func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
if err != nil {
return objecttree.AddResult{}, err
}
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
b.SyncClient.Broadcast(ctx, upd)
return res, nil
}
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
factory := GetRequestFactory()
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return newProcessSyncHandler(peerId, handler)
}
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
factory := GetRequestFactory()
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
batcher := mb.New[processMsg](0)
return &processSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
}
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id)
return st
}
func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) {
return objecttree.BuildTestableTree(aclList, storage)
}
type fixtureDeps struct {
aclList list.AclList
initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string
emptyTrees []string
}
type processFixture struct {
handlers map[string]*processSyncHandler
log *messageLog
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture {
var (
handlers = map[string]*processSyncHandler{}
log = newMessageLog()
wg = sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
)
for peerId := range deps.connectionMap {
var handler *processSyncHandler
if slices.Contains(deps.emptyTrees, peerId) {
handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log)
} else {
stCopy := deps.initStorage.Copy()
testTree, err := createTestTree(deps.aclList, stCopy)
require.NoError(t, err)
handler = createSyncHandler(peerId, spaceId, testTree, log)
}
handlers[peerId] = handler
}
for peerId, connectionMap := range deps.connectionMap {
handler := handlers[peerId]
manager := handler.manager()
for _, connectionId := range connectionMap {
manager.addHandler(connectionId, handlers[connectionId])
}
}
return &processFixture{
handlers: handlers,
log: log,
wg: &wg,
ctx: ctx,
cancel: cancel,
}
}
func (p *processFixture) run(t *testing.T) {
for _, handler := range p.handlers {
handler.run(p.ctx, t, p.wg)
}
}
func (p *processFixture) stop() {
p.cancel()
p.wg.Wait()
}
type genParams struct {
prefix string
aclId string
startIdx int
levels int
snapshotId string
prevHeads []string
isSnapshot func() bool
}
type genResult struct {
changes []*treechangeproto.RawTreeChangeWithId
heads []string
snapshotId string
}
func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) {
src := rand.NewSource(time.Now().Unix())
rnd := rand.New(src)
var (
prevHeads []string
snapshotId = params.snapshotId
)
prevHeads = append(prevHeads, params.prevHeads...)
for i := 0; i < params.levels; i++ {
if params.isSnapshot() {
newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, 0)
newCh := creator.CreateRaw(newId, params.aclId, snapshotId, true, prevHeads...)
res.changes = append(res.changes, newCh)
prevHeads = []string{newId}
snapshotId = newId
continue
}
perLevel := rnd.Intn(10)
if perLevel == 0 {
perLevel = 1
}
var (
newHeads []string
usedIds = map[string]struct{}{}
)
for j := 0; j < perLevel; j++ {
// if we didn't connect with all prev ones
prevConns := rnd.Intn(len(prevHeads))
if prevConns == 0 {
prevConns = 1
}
rnd.Shuffle(len(prevHeads), func(i, j int) {
prevHeads[i], prevHeads[j] = prevHeads[j], prevHeads[i]
})
if j == perLevel-1 && len(usedIds) != len(prevHeads) {
var unusedIds []string
for _, id := range prevHeads {
if _, exists := usedIds[id]; !exists {
unusedIds = append(unusedIds, id)
}
}
prevHeads = unusedIds
prevConns = len(prevHeads)
}
var prevChId []string
for k := 0; k < prevConns; k++ {
prevChId = append(prevChId, prevHeads[k])
usedIds[prevHeads[k]] = struct{}{}
}
newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, j)
newCh := creator.CreateRaw(newId, params.aclId, snapshotId, false, prevChId...)
res.changes = append(res.changes, newCh)
newHeads = append(newHeads, newId)
}
prevHeads = newHeads
}
res.heads = prevHeads
res.snapshotId = snapshotId
return
}
func TestGenChanges(t *testing.T) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
creator := objecttree.NewMockChangeCreator()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
params := genParams{
prefix: "peerId",
aclId: aclList.Id(),
startIdx: 0,
levels: 10,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: func() bool {
return rnd.Intn(100) > 80
},
}
res := genChanges(creator, params)
storage.TransactionAdd(res.changes, res.heads)
tr, err := createTestTree(aclList, storage)
require.NoError(t, err)
fmt.Println(tr.Debug(objecttree.NoOpDescriptionParser))
}