Add empty client logic

This commit is contained in:
mcrakhman 2023-04-13 21:32:39 +02:00 committed by Mikhail Iudin
parent b82c7da159
commit 4a1a95a7b9
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0

View File

@ -3,8 +3,8 @@ package synctree
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/anytypeio/any-sync/commonspace/object/accountdata"
"github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/acl/testutils/acllistbuilder"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
@ -76,17 +76,59 @@ func (m *messageLog) addMessage(msg processMsg) {
type processSyncHandler struct { type processSyncHandler struct {
synchandler.SyncHandler synchandler.SyncHandler
batcher *mb.MB[processMsg] batcher *mb.MB[processMsg]
peerId string peerId string
aclList list.AclList
log *messageLog
syncClient SyncClient
}
func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
if p.SyncHandler != nil {
return p.SyncHandler.HandleMessage(ctx, senderId, request)
}
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
if unmarshalled.Content.GetFullSyncResponse() == nil {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
var objMsg *spacesyncproto.ObjectSyncMessage
objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil {
return
}
return p.manager().SendPeer(context.Background(), senderId, objMsg)
}
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, fullSyncResponse.Heads, fullSyncResponse.Changes)
tree, err := createTestTree(p.aclList, treeStorage)
if err != nil {
return
}
netTree := &broadcastTree{
ObjectTree: tree,
SyncClient: p.syncClient,
}
p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus())
return
} }
func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler {
batcher := mb.New[processMsg](0) batcher := mb.New[processMsg](0)
return &processSyncHandler{syncHandler, batcher, peerId} return &processSyncHandler{
SyncHandler: syncHandler,
batcher: batcher,
peerId: peerId,
}
} }
func (p *processSyncHandler) manager() *processPeerManager { func (p *processSyncHandler) manager() *processPeerManager {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) if p.SyncHandler != nil {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
return p.syncClient.(*syncClient).PeerManager.(*processPeerManager)
} }
func (p *processSyncHandler) tree() *broadcastTree { func (p *processSyncHandler) tree() *broadcastTree {
@ -118,7 +160,7 @@ func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.Wai
p.tree().Unlock() p.tree().Unlock()
continue continue
} }
err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg) err = p.HandleMessage(ctx, res.senderId, res.msg)
if err != nil { if err != nil {
fmt.Println("error handling message", err.Error()) fmt.Println("error handling message", err.Error())
continue continue
@ -194,12 +236,18 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
return newProcessSyncHandler(peerId, handler) return newProcessSyncHandler(peerId, handler)
} }
func createAclList() (list.AclList, error) { func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
st, err := acllistbuilder.NewListStorageWithTestName("userjoinexample.yml") factory := GetRequestFactory()
if err != nil { syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
return nil, err
batcher := mb.New[processMsg](0)
return &processSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
} }
return list.BuildAclList(st)
} }
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
@ -216,6 +264,7 @@ type fixtureDeps struct {
aclList list.AclList aclList list.AclList
initStorage *treestorage.InMemoryTreeStorage initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string connectionMap map[string][]string
emptyTrees []string
} }
type processFixture struct { type processFixture struct {
@ -235,10 +284,15 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF
) )
for peerId := range deps.connectionMap { for peerId := range deps.connectionMap {
stCopy := deps.initStorage.Copy() var handler *processSyncHandler
testTree, err := createTestTree(deps.aclList, stCopy) if slices.Contains(deps.emptyTrees, peerId) {
require.NoError(t, err) handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log)
handler := createSyncHandler(peerId, spaceId, testTree, log) } else {
stCopy := deps.initStorage.Copy()
testTree, err := createTestTree(deps.aclList, stCopy)
require.NoError(t, err)
handler = createSyncHandler(peerId, spaceId, testTree, log)
}
handlers[peerId] = handler handlers[peerId] = handler
} }
for peerId, connectionMap := range deps.connectionMap { for peerId, connectionMap := range deps.connectionMap {
@ -268,11 +322,53 @@ func (p *processFixture) stop() {
p.wg.Wait() p.wg.Wait()
} }
func TestSimple_TwoPeers(t *testing.T) { func TestSend_EmptyClient(t *testing.T) {
aclList, err := createAclList()
require.NoError(t, err)
treeId := "treeId" treeId := "treeId"
spaceId := "spaceId" spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
require.NoError(t, err)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
deps := fixtureDeps{
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
emptyTrees: []string{"peer2"},
}
fx := newProcessFixture(t, spaceId, deps)
fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil,
RawChanges: []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId),
},
})
time.Sleep(100 * time.Millisecond)
fx.stop()
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
slices.Sort(firstHeads)
slices.Sort(secondHeads)
require.Equal(t, firstHeads, secondHeads)
require.Equal(t, []string{"1"}, firstHeads)
logMsgs := fx.log.batcher.GetAll()
for _, msg := range logMsgs {
fmt.Println(msg.description())
}
}
func TestSimple_TwoPeers(t *testing.T) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
require.NoError(t, err)
storage := createStorage(treeId, aclList) storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator() changeCreator := objecttree.NewMockChangeCreator()
deps := fixtureDeps{ deps := fixtureDeps{
@ -312,10 +408,11 @@ func TestSimple_TwoPeers(t *testing.T) {
} }
func TestSimple_ThreePeers(t *testing.T) { func TestSimple_ThreePeers(t *testing.T) {
aclList, err := createAclList()
require.NoError(t, err)
treeId := "treeId" treeId := "treeId"
spaceId := "spaceId" spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList) storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator() changeCreator := objecttree.NewMockChangeCreator()
deps := fixtureDeps{ deps := fixtureDeps{