any-sync/commonspace/object/tree/synctree/syncclient_test.go
2023-03-14 10:33:59 +01:00

177 lines
5.0 KiB
Go

package synctree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/acl/testutils/acllistbuilder"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/cheggaaa/mb/v3"
"sync"
)
type processMsg struct {
msg *spacesyncproto.ObjectSyncMessage
senderId string
receiverId string
}
type messageLog struct {
batcher *mb.MB[processMsg]
}
func newMessageLog() *messageLog {
return &messageLog{batcher: mb.New[processMsg](0)}
}
func (m *messageLog) addMessage(msg processMsg) {
m.batcher.Add(context.Background(), msg)
}
type processSyncHandler struct {
synchandler.SyncHandler
batcher *mb.MB[processMsg]
peerId string
}
func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler {
batcher := mb.New[processMsg](0)
return &processSyncHandler{syncHandler, batcher, peerId}
}
func (p *processSyncHandler) manager() *processPeerManager {
return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager)
}
func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) {
return p.batcher.Add(ctx, msg)
}
func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
res, err := p.batcher.WaitOne(ctx)
if err != nil {
return
}
err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg)
if err != nil {
fmt.Println("error handling message", err.Error())
continue
}
}
}()
}
type processPeerManager struct {
peerId string
handlers map[string]*processSyncHandler
log *messageLog
}
func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager {
return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log}
}
func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) {
m.handlers[peerId] = handler
}
func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: peerId,
}
m.log.addMessage(pMsg)
return m.handlers[peerId].send(context.Background(), pMsg)
}
func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
for _, handler := range m.handlers {
pMsg := processMsg{
msg: msg,
senderId: m.peerId,
receiverId: handler.peerId,
}
m.log.addMessage(pMsg)
handler.send(context.Background(), pMsg)
}
return
}
func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
panic("should not be called")
}
type broadcastTree struct {
objecttree.ObjectTree
SyncClient
}
func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
if err != nil {
return objecttree.AddResult{}, err
}
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
b.SyncClient.Broadcast(ctx, upd)
return res, nil
}
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
factory := GetRequestFactory()
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return newProcessSyncHandler(peerId, handler)
}
func createAclList() (list.AclList, error) {
st, err := acllistbuilder.NewListStorageWithTestName("userjoinexample.yml")
if err != nil {
return nil, err
}
return list.BuildAclList(st)
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id)
return st
}
func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) {
return objecttree.BuildTestableTree(aclList, storage)
}
type processFixture struct {
handlers []*processSyncHandler
log *messageLog
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
//func TestSyncProtocol(t *testing.T) {
// aclList, err := createAclList()
// require.NoError(t, err)
// treeId := "treeId"
// spaceId := "spaceId"
// storage := createStorage(treeId, aclList)
// testTree, err := createTestTree(aclList, storage)
// require.NoError(t, err)
// peerManager := &processPeerManager{}
// _ = createSyncHandler(spaceId, testTree, peerManager)
//}