diff --git a/commonspace/confconnector/confconnector.go b/commonspace/confconnector/confconnector.go deleted file mode 100644 index 0a259c90..00000000 --- a/commonspace/confconnector/confconnector.go +++ /dev/null @@ -1,85 +0,0 @@ -//go:generate mockgen -destination mock_confconnector/mock_confconnector.go github.com/anytypeio/any-sync/commonspace/confconnector ConfConnector -package confconnector - -import ( - "context" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/pool" - "github.com/anytypeio/any-sync/nodeconf" - "golang.org/x/exp/slices" -) - -type ConfConnector interface { - Configuration() nodeconf.Configuration - Pool() pool.Pool - GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) - DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) -} - -type confConnector struct { - conf nodeconf.Configuration - pool pool.Pool -} - -func NewConfConnector(conf nodeconf.Configuration, pool pool.Pool) ConfConnector { - return &confConnector{conf: conf, pool: pool} -} - -func (s *confConnector) Configuration() nodeconf.Configuration { - return s.conf -} - -func (s *confConnector) Pool() pool.Pool { - return s.pool -} - -func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get) -} - -func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial) -} - -func (s *confConnector) connectOneOrMany( - ctx context.Context, - spaceId string, - activeNodeIds []string, - connectOne func(context.Context, string) (peer.Peer, error)) (peers []peer.Peer, err error) { - var ( - inactiveNodeIds []string - allNodes = s.conf.NodeIds(spaceId) - ) - for _, id := range allNodes { - if !slices.Contains(activeNodeIds, id) { - inactiveNodeIds = append(inactiveNodeIds, id) - } - } - - if s.conf.IsResponsible(spaceId) { - for _, id := range inactiveNodeIds { - var p peer.Peer - p, err = connectOne(ctx, id) - if err != nil { - continue - } - peers = append(peers, p) - } - } else if len(activeNodeIds) == 0 { - // that means that all connected ids - var p peer.Peer - p, err = s.pool.GetOneOf(ctx, allNodes) - if err != nil { - return - } - - // if we are dialling someone, we want to dial to the same peer which we cached - // thus communication through streams and through diff will go to the same node - p, err = connectOne(ctx, p.Id()) - if err != nil { - return - } - peers = []peer.Peer{p} - } - return -} diff --git a/commonspace/confconnector/mock_confconnector/mock_confconnector.go b/commonspace/confconnector/mock_confconnector/mock_confconnector.go deleted file mode 100644 index facacfb3..00000000 --- a/commonspace/confconnector/mock_confconnector/mock_confconnector.go +++ /dev/null @@ -1,96 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anytypeio/any-sync/commonspace/confconnector (interfaces: ConfConnector) - -// Package mock_confconnector is a generated GoMock package. -package mock_confconnector - -import ( - context "context" - reflect "reflect" - - peer "github.com/anytypeio/any-sync/net/peer" - pool "github.com/anytypeio/any-sync/net/pool" - nodeconf "github.com/anytypeio/any-sync/nodeconf" - gomock "github.com/golang/mock/gomock" -) - -// MockConfConnector is a mock of ConfConnector interface. -type MockConfConnector struct { - ctrl *gomock.Controller - recorder *MockConfConnectorMockRecorder -} - -// MockConfConnectorMockRecorder is the mock recorder for MockConfConnector. -type MockConfConnectorMockRecorder struct { - mock *MockConfConnector -} - -// NewMockConfConnector creates a new mock instance. -func NewMockConfConnector(ctrl *gomock.Controller) *MockConfConnector { - mock := &MockConfConnector{ctrl: ctrl} - mock.recorder = &MockConfConnectorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder { - return m.recorder -} - -// Configuration mocks base method. -func (m *MockConfConnector) Configuration() nodeconf.Configuration { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Configuration") - ret0, _ := ret[0].(nodeconf.Configuration) - return ret0 -} - -// Configuration indicates an expected call of Configuration. -func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration)) -} - -// DialInactiveResponsiblePeers mocks base method. -func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2) - ret0, _ := ret[0].([]peer.Peer) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers. -func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2) -} - -// GetResponsiblePeers mocks base method. -func (m *MockConfConnector) GetResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0, arg1) - ret0, _ := ret[0].([]peer.Peer) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetResponsiblePeers indicates an expected call of GetResponsiblePeers. -func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1) -} - -// Pool mocks base method. -func (m *MockConfConnector) Pool() pool.Pool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Pool") - ret0, _ := ret[0].(pool.Pool) - return ret0 -} - -// Pool indicates an expected call of Pool. -func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool)) -} diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 9665702a..9db4d3db 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -28,7 +28,7 @@ type DiffSyncer interface { func newDiffSyncer( spaceId string, diff ldiff.Diff, - confConnector confconnector.ConfConnector, + peerManager peermanager.PeerManager, cache treegetter.TreeGetter, storage spacestorage.SpaceStorage, clientFactory spacesyncproto.ClientFactory, @@ -39,7 +39,7 @@ func newDiffSyncer( spaceId: spaceId, cache: cache, storage: storage, - confConnector: confConnector, + peerManager: peerManager, clientFactory: clientFactory, log: log, syncStatus: syncStatus, @@ -49,7 +49,7 @@ func newDiffSyncer( type diffSyncer struct { spaceId string diff ldiff.Diff - confConnector confconnector.ConfConnector + peerManager peermanager.PeerManager cache treegetter.TreeGetter storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory @@ -88,7 +88,7 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) { func (d *diffSyncer) Sync(ctx context.Context) error { st := time.Now() // diffing with responsible peers according to configuration - peers, err := d.confConnector.GetResponsiblePeers(ctx, d.spaceId) + peers, err := d.peerManager.GetResponsiblePeers(ctx) if err != nil { return err } diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 51192e9f..19049ae1 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -6,12 +6,12 @@ import ( "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff/mock_ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage" "github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager/mock_peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -99,7 +99,7 @@ func TestDiffSyncer_Sync(t *testing.T) { defer ctrl.Finish() diffMock := mock_ldiff.NewMockDiff(ctrl) - connectorMock := mock_confconnector.NewMockConfConnector(ctrl) + peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl) cacheMock := mock_treegetter.NewMockTreeGetter(ctrl) stMock := mock_spacestorage.NewMockSpaceStorage(ctrl) clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) @@ -110,13 +110,13 @@ func TestDiffSyncer_Sync(t *testing.T) { spaceId := "spaceId" aclRootId := "aclRootId" l := logger.NewNamed(spaceId) - diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) + diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) delState.EXPECT().AddObserver(gomock.Any()) diffSyncer.Init(delState) t.Run("diff syncer sync", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). @@ -131,8 +131,8 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync conf error", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return(nil, fmt.Errorf("some error")) require.Error(t, diffSyncer.Sync(ctx)) @@ -173,8 +173,8 @@ func TestDiffSyncer_Sync(t *testing.T) { spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} spaceSettingsId := "spaceSettingsId" - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). @@ -197,8 +197,8 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync other error", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 50a18a99..7ab3b255 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -5,8 +5,8 @@ import ( "context" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -49,7 +49,7 @@ func NewHeadSync( spaceId string, syncPeriod int, storage spacestorage.SpaceStorage, - confConnector confconnector.ConfConnector, + peerManager peermanager.PeerManager, cache treegetter.TreeGetter, syncStatus syncstatus.StatusUpdater, log logger.CtxLogger) HeadSync { @@ -57,7 +57,7 @@ func NewHeadSync( diff := ldiff.New(16, 16) l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) - syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l) + syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) return &headSync{ diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index 5c7f2e2f..e858f190 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -3,7 +3,6 @@ package synctree import ( "context" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -21,7 +20,6 @@ type syncClient struct { objectsync.MessagePool RequestFactory spaceId string - connector confconnector.ConfConnector configuration nodeconf.Configuration } diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index a12dbb68..47ba14c4 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "go.uber.org/zap" "strconv" @@ -14,17 +15,11 @@ import ( "time" ) -type StreamManager interface { - SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) - SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) - Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) -} - // MessagePool can be made generic to work with different streams type MessagePool interface { ocache.ObjectLastUsage synchandler.SyncHandler - StreamManager + peermanager.PeerManager SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) } @@ -36,7 +31,7 @@ type responseWaiter struct { type messagePool struct { sync.Mutex - StreamManager + peermanager.PeerManager messageHandler MessageHandler waiters map[string]responseWaiter waitersMx sync.Mutex @@ -44,9 +39,9 @@ type messagePool struct { lastUsage atomic.Int64 } -func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { +func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageHandler) MessagePool { s := &messagePool{ - StreamManager: streamManager, + PeerManager: peerManager, messageHandler: messageHandler, waiters: make(map[string]responseWaiter), } @@ -88,16 +83,16 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.SendPeer(ctx, peerId, msg) + return s.PeerManager.SendPeer(ctx, peerId, msg) } func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.SendResponsible(ctx, msg) + return s.PeerManager.SendResponsible(ctx, msg) } func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.Broadcast(ctx, msg) + return s.PeerManager.Broadcast(ctx, msg) } func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 11c79a4a..5184f797 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "go.uber.org/zap" "time" @@ -34,7 +35,7 @@ type objectSync struct { func NewObjectSync( spaceId string, - streamManager StreamManager, + peerManager peermanager.PeerManager, objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync { syncCtx, cancel := context.WithCancel(context.Background()) os := newObjectSync( @@ -42,7 +43,7 @@ func NewObjectSync( objectGetter, syncCtx, cancel) - msgPool := newMessagePool(streamManager, os.handleMessage) + msgPool := newMessagePool(peerManager, os.handleMessage) os.messagePool = msgPool return os } diff --git a/commonspace/peermanager/mock_peermanager/mock_peermanager.go b/commonspace/peermanager/mock_peermanager/mock_peermanager.go new file mode 100644 index 00000000..ae97fe7b --- /dev/null +++ b/commonspace/peermanager/mock_peermanager/mock_peermanager.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anytypeio/any-sync/commonspace/peermanager (interfaces: PeerManager) + +// Package mock_peermanager is a generated GoMock package. +package mock_peermanager + +import ( + context "context" + reflect "reflect" + + spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + peer "github.com/anytypeio/any-sync/net/peer" + gomock "github.com/golang/mock/gomock" +) + +// MockPeerManager is a mock of PeerManager interface. +type MockPeerManager struct { + ctrl *gomock.Controller + recorder *MockPeerManagerMockRecorder +} + +// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager. +type MockPeerManagerMockRecorder struct { + mock *MockPeerManager +} + +// NewMockPeerManager creates a new mock instance. +func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager { + mock := &MockPeerManager{ctrl: ctrl} + mock.recorder = &MockPeerManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder { + return m.recorder +} + +// Broadcast mocks base method. +func (m *MockPeerManager) Broadcast(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Broadcast indicates an expected call of Broadcast. +func (mr *MockPeerManagerMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0, arg1) +} + +// GetResponsiblePeers mocks base method. +func (m *MockPeerManager) GetResponsiblePeers(arg0 context.Context) ([]peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0) + ret0, _ := ret[0].([]peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetResponsiblePeers indicates an expected call of GetResponsiblePeers. +func (mr *MockPeerManagerMockRecorder) GetResponsiblePeers(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockPeerManager)(nil).GetResponsiblePeers), arg0) +} + +// SendPeer mocks base method. +func (m *MockPeerManager) SendPeer(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendPeer", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendPeer indicates an expected call of SendPeer. +func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2) +} + +// SendResponsible mocks base method. +func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendResponsible indicates an expected call of SendResponsible. +func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1) +} diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go new file mode 100644 index 00000000..373c3a51 --- /dev/null +++ b/commonspace/peermanager/peermanager.go @@ -0,0 +1,27 @@ +//go:generate mockgen -destination mock_peermanager/mock_peermanager.go github.com/anytypeio/any-sync/commonspace/peermanager PeerManager +package peermanager + +import ( + "context" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/net/peer" +) + +const CName = "common.commonspace.peermanager" + +type PeerManager interface { + // SendPeer sends a message to a stream by peerId + SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) + // SendResponsible sends a message to responsible peers streams + SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + // Broadcast sends a message to all subscribed peers + Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + // GetResponsiblePeers dials or gets from cache responsible peers to unary operations + GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) +} + +type PeerManagerProvider interface { + app.Component + NewPeerManager(ctx context.Context, spaceId string) (sm PeerManager, err error) +} diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 3799ae24..80e48ff0 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -5,15 +5,14 @@ import ( "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/objectsync" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/commonspace/streammanager" "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" @@ -40,13 +39,13 @@ type SpaceService interface { } type spaceService struct { - config Config - account accountservice.Service - configurationService nodeconf.Service - storageProvider spacestorage.SpaceStorageProvider - streamManagerProvider streammanager.StreamManagerProvider - treeGetter treegetter.TreeGetter - pool pool.Pool + config Config + account accountservice.Service + configurationService nodeconf.Service + storageProvider spacestorage.SpaceStorageProvider + peermanagerProvider peermanager.PeerManagerProvider + treeGetter treegetter.TreeGetter + pool pool.Pool } func (s *spaceService) Init(a *app.App) (err error) { @@ -55,7 +54,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) - s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider) + s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -117,8 +116,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { } lastConfiguration := s.configurationService.GetLast() - confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) - + getter := newCommonGetter(st.Id(), s.treeGetter) syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... if !lastConfiguration.IsResponsible(st.Id()) { @@ -126,21 +124,19 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) } - // TODO: [che] remove *5 - headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log) - - streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) + peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id) if err != nil { return nil, err } - objectSync := objectsync.NewObjectSync(streamManager, id) + headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log) + objectSync := objectsync.NewObjectSync(id, peerManager, getter) sp := &space{ id: id, objectSync: objectSync, headSync: headSync, syncStatus: syncStatus, - cache: s.treeGetter, + cache: getter, account: s.account, configuration: lastConfiguration, storage: st, diff --git a/commonspace/streammanager/streammanager.go b/commonspace/streammanager/streammanager.go deleted file mode 100644 index 5fb15b4c..00000000 --- a/commonspace/streammanager/streammanager.go +++ /dev/null @@ -1,14 +0,0 @@ -package streammanager - -import ( - "context" - "github.com/anytypeio/any-sync/app" - "github.com/anytypeio/any-sync/commonspace/objectsync" -) - -const CName = "common.commonspace.streammanager" - -type StreamManagerProvider interface { - app.Component - NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error) -}