Change sync client message pool interactions and request-reply
This commit is contained in:
parent
52816f0f44
commit
fed0eee243
@ -55,16 +55,14 @@ type syncTree struct {
|
||||
|
||||
var log = logger.NewNamed("common.commonspace.synctree")
|
||||
|
||||
var createSyncClient = objectsync.NewSyncClient
|
||||
|
||||
type ResponsiblePeersGetter interface {
|
||||
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
|
||||
}
|
||||
|
||||
type BuildDeps struct {
|
||||
SpaceId string
|
||||
ObjectSync objectsync.ObjectSync
|
||||
Configuration nodeconf.Configuration
|
||||
SyncClient objectsync.SyncClient
|
||||
Configuration nodeconf.NodeConf
|
||||
HeadNotifiable HeadNotifiable
|
||||
Listener updatelistener.UpdateListener
|
||||
AclList list.AclList
|
||||
@ -99,10 +97,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
deps.ObjectSync.MessagePool(),
|
||||
objectsync.GetRequestFactory())
|
||||
syncClient := deps.SyncClient
|
||||
syncTree := &syncTree{
|
||||
ObjectTree: objTree,
|
||||
syncClient: syncClient,
|
||||
|
||||
@ -48,12 +48,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e
|
||||
|
||||
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
newTreeRequest := objectsync.GetRequestFactory().CreateNewTreeRequest()
|
||||
objMsg, err := objectsync.MarshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
|
||||
resp, err := t.deps.SyncClient.SendSync(ctx, peerId, t.treeId, newTreeRequest)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ type testSyncHandler struct {
|
||||
// createSyncHandler creates a sync handler when a tree is already created
|
||||
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler {
|
||||
factory := objectsync.GetRequestFactory()
|
||||
syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory)
|
||||
syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
|
||||
netTree := &broadcastTree{
|
||||
ObjectTree: objTree,
|
||||
SyncClient: syncClient,
|
||||
@ -107,7 +107,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
|
||||
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
|
||||
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler {
|
||||
factory := objectsync.GetRequestFactory()
|
||||
syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory)
|
||||
syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
|
||||
|
||||
batcher := mb.New[protocolMsg](0)
|
||||
return &testSyncHandler{
|
||||
@ -173,11 +173,11 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re
|
||||
return h.manager().Broadcast(context.Background(), objMsg)
|
||||
}
|
||||
|
||||
func (h *testSyncHandler) manager() *testPeerManager {
|
||||
func (h *testSyncHandler) manager() *testMessagePool {
|
||||
if h.SyncHandler != nil {
|
||||
return h.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*testPeerManager)
|
||||
return h.SyncHandler.(*syncTreeHandler).syncClient.MessagePool().(*testMessagePool)
|
||||
}
|
||||
return h.syncClient.PeerManager().(*testPeerManager)
|
||||
return h.syncClient.MessagePool().(*testMessagePool)
|
||||
}
|
||||
|
||||
func (h *testSyncHandler) tree() *broadcastTree {
|
||||
@ -218,22 +218,22 @@ func (h *testSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGr
|
||||
}()
|
||||
}
|
||||
|
||||
// testPeerManager captures all other handlers and sends messages to them
|
||||
type testPeerManager struct {
|
||||
// testMessagePool captures all other handlers and sends messages to them
|
||||
type testMessagePool struct {
|
||||
peerId string
|
||||
handlers map[string]*testSyncHandler
|
||||
log *messageLog
|
||||
}
|
||||
|
||||
func newTestPeerManager(peerId string, log *messageLog) *testPeerManager {
|
||||
return &testPeerManager{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log}
|
||||
func newTestMessagePool(peerId string, log *messageLog) *testMessagePool {
|
||||
return &testMessagePool{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log}
|
||||
}
|
||||
|
||||
func (m *testPeerManager) addHandler(peerId string, handler *testSyncHandler) {
|
||||
func (m *testMessagePool) addHandler(peerId string, handler *testSyncHandler) {
|
||||
m.handlers[peerId] = handler
|
||||
}
|
||||
|
||||
func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
func (m *testMessagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
pMsg := protocolMsg{
|
||||
msg: msg,
|
||||
senderId: m.peerId,
|
||||
@ -243,7 +243,7 @@ func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spac
|
||||
return m.handlers[peerId].send(context.Background(), pMsg)
|
||||
}
|
||||
|
||||
func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
func (m *testMessagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
for _, handler := range m.handlers {
|
||||
pMsg := protocolMsg{
|
||||
msg: msg,
|
||||
@ -256,7 +256,19 @@ func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.Obj
|
||||
return
|
||||
}
|
||||
|
||||
func (m *testPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
func (m *testMessagePool) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
panic("should not be called")
|
||||
}
|
||||
|
||||
func (m *testMessagePool) LastUsage() time.Time {
|
||||
panic("should not be called")
|
||||
}
|
||||
|
||||
func (m *testMessagePool) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
panic("should not be called")
|
||||
}
|
||||
|
||||
func (m *testMessagePool) SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
panic("should not be called")
|
||||
}
|
||||
|
||||
|
||||
@ -10,7 +10,8 @@ import (
|
||||
|
||||
objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||
treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
peermanager "github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||
objectsync "github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
@ -109,18 +110,33 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest))
|
||||
}
|
||||
|
||||
// PeerManager mocks base method.
|
||||
func (m *MockSyncClient) PeerManager() peermanager.PeerManager {
|
||||
// MessagePool mocks base method.
|
||||
func (m *MockSyncClient) MessagePool() objectsync.MessagePool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "PeerManager")
|
||||
ret0, _ := ret[0].(peermanager.PeerManager)
|
||||
ret := m.ctrl.Call(m, "MessagePool")
|
||||
ret0, _ := ret[0].(objectsync.MessagePool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// PeerManager indicates an expected call of PeerManager.
|
||||
func (mr *MockSyncClientMockRecorder) PeerManager() *gomock.Call {
|
||||
// MessagePool indicates an expected call of MessagePool.
|
||||
func (mr *MockSyncClientMockRecorder) MessagePool() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerManager", reflect.TypeOf((*MockSyncClient)(nil).PeerManager))
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MessagePool", reflect.TypeOf((*MockSyncClient)(nil).MessagePool))
|
||||
}
|
||||
|
||||
// SendSync mocks base method.
|
||||
func (m *MockSyncClient) SendSync(arg0 context.Context, arg1, arg2 string, arg3 *treechangeproto.TreeSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SendSync", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SendSync indicates an expected call of SendSync.
|
||||
func (mr *MockSyncClientMockRecorder) SendSync(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendSync", reflect.TypeOf((*MockSyncClient)(nil).SendSync), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// SendWithReply mocks base method.
|
||||
|
||||
@ -24,7 +24,7 @@ var log = logger.NewNamed("common.commonspace.objectsync")
|
||||
type ObjectSync interface {
|
||||
LastUsage
|
||||
synchandler.SyncHandler
|
||||
MessagePool() MessagePool
|
||||
SyncClient() SyncClient
|
||||
|
||||
Close() (err error)
|
||||
}
|
||||
@ -59,9 +59,9 @@ func NewObjectSync(
|
||||
cancelSync: cancel,
|
||||
spaceIsDeleted: spaceIsDeleted,
|
||||
configuration: configuration,
|
||||
syncClient: NewSyncClient(spaceId, peerManager, GetRequestFactory()),
|
||||
}
|
||||
os.messagePool = newMessagePool(peerManager, os.handleMessage)
|
||||
os.syncClient = NewSyncClient(spaceId, os.messagePool, GetRequestFactory())
|
||||
return os
|
||||
}
|
||||
|
||||
@ -79,7 +79,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
|
||||
}
|
||||
|
||||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId))
|
||||
log := log.With(
|
||||
zap.String("objectId", msg.ObjectId),
|
||||
zap.String("requestId", msg.RequestId),
|
||||
zap.String("replyId", msg.ReplyId))
|
||||
if s.spaceIsDeleted.Load() {
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients if they are not just syncing the settings tree
|
||||
@ -97,11 +100,11 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
||||
treeMsg := &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(msg.Payload, treeMsg)
|
||||
if err != nil {
|
||||
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.ReplyId)
|
||||
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId)
|
||||
}
|
||||
// this means that we don't have the tree locally and therefore can't return it
|
||||
if s.isEmptyFullSyncRequest(treeMsg) {
|
||||
return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.ReplyId)
|
||||
return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId)
|
||||
}
|
||||
}
|
||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||
@ -112,8 +115,8 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
||||
return obj.HandleMessage(ctx, senderId, msg)
|
||||
}
|
||||
|
||||
func (s *objectSync) MessagePool() MessagePool {
|
||||
return s.messagePool
|
||||
func (s *objectSync) SyncClient() SyncClient {
|
||||
return s.syncClient
|
||||
}
|
||||
|
||||
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) {
|
||||
@ -122,7 +125,7 @@ func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncprot
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.ReplyId)
|
||||
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId)
|
||||
}
|
||||
|
||||
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) {
|
||||
|
||||
@ -3,7 +3,6 @@ package objectsync
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
)
|
||||
|
||||
@ -11,21 +10,22 @@ type SyncClient interface {
|
||||
RequestFactory
|
||||
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
|
||||
SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
|
||||
PeerManager() peermanager.PeerManager
|
||||
SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||
MessagePool() MessagePool
|
||||
}
|
||||
|
||||
type syncClient struct {
|
||||
RequestFactory
|
||||
spaceId string
|
||||
peerManager peermanager.PeerManager
|
||||
messagePool MessagePool
|
||||
}
|
||||
|
||||
func NewSyncClient(
|
||||
spaceId string,
|
||||
peerManager peermanager.PeerManager,
|
||||
messagePool MessagePool,
|
||||
factory RequestFactory) SyncClient {
|
||||
return &syncClient{
|
||||
peerManager: peerManager,
|
||||
messagePool: messagePool,
|
||||
RequestFactory: factory,
|
||||
spaceId: spaceId,
|
||||
}
|
||||
@ -36,7 +36,15 @@ func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyn
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.peerManager.Broadcast(ctx, objMsg)
|
||||
return s.messagePool.Broadcast(ctx, objMsg)
|
||||
}
|
||||
|
||||
func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.messagePool.SendSync(ctx, peerId, objMsg)
|
||||
}
|
||||
|
||||
func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
@ -44,11 +52,11 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string,
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.peerManager.SendPeer(ctx, peerId, objMsg)
|
||||
return s.messagePool.SendPeer(ctx, peerId, objMsg)
|
||||
}
|
||||
|
||||
func (s *syncClient) PeerManager() peermanager.PeerManager {
|
||||
return s.peerManager
|
||||
func (s *syncClient) MessagePool() MessagePool {
|
||||
return s.messagePool
|
||||
}
|
||||
|
||||
func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
|
||||
@ -179,7 +179,7 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool())
|
||||
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.SyncClient().MessagePool())
|
||||
s.treeManager.AddObject(s.aclList)
|
||||
|
||||
deletionState := settingsstate.NewObjectDeletionState(s.storage)
|
||||
@ -284,7 +284,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
|
||||
}
|
||||
deps := synctree.BuildDeps{
|
||||
SpaceId: s.id,
|
||||
ObjectSync: s.objectSync,
|
||||
SyncClient: s.objectSync.SyncClient(),
|
||||
Configuration: s.configuration,
|
||||
HeadNotifiable: s.headSync,
|
||||
Listener: listener,
|
||||
@ -322,7 +322,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
|
||||
|
||||
deps := synctree.BuildDeps{
|
||||
SpaceId: s.id,
|
||||
ObjectSync: s.objectSync,
|
||||
SyncClient: s.objectSync.SyncClient(),
|
||||
Configuration: s.configuration,
|
||||
HeadNotifiable: s.headSync,
|
||||
Listener: opts.Listener,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user