From b2578a19baf9133409c046d9957cd6887dd59380 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 13:16:07 +0300 Subject: [PATCH 1/5] async dial --- net/streampool/streampool.go | 23 +++++++++++++++-------- net/streampool/streampool_test.go | 12 +++++++++--- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 0a2b4419..a846ef51 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -21,6 +21,9 @@ type StreamHandler interface { NewReadMessage() drpc.Message } +// PeerGetter should dial or return cached peers +type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error) + // StreamPool keeps and read streams type StreamPool interface { // AddStream adds new outgoing stream into the pool @@ -28,7 +31,7 @@ type StreamPool interface { // ReadStream adds new incoming stream and synchronously read it ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. - Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) + Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error) // SendById sends a message to given peerIds. Works only if stream exists SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) // Broadcast sends a message to all peers with given tags. Works async. @@ -95,7 +98,7 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st return st } -func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { +func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { var sendOneFunc = func(sp peer.Peer) func() { return func() { if e := s.sendOne(ctx, sp, msg); e != nil { @@ -105,13 +108,17 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P } } } - - for _, p := range peers { - if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { - return + return s.exec.Add(ctx, func() { + peers, dialErr := peerGetter(ctx) + if dialErr != nil { + log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) } - } - return + for _, p := range peers { + if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { + return + } + } + }) } func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) { diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 671b2aa2..c67377cc 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -66,7 +66,9 @@ func TestStreamPool_AddStream(t *testing.T) { defer s1.Close() fx.AddStream("p1", s1, "space1", "common") - require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, p1)) + require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) { + return []peer.Peer{p1}, nil + })) var msg *testservice.StreamMessage select { case msg = <-fx.tsh.receiveCh: @@ -85,7 +87,9 @@ func TestStreamPool_Send(t *testing.T) { p, err := fx.tp.Dial(ctx, "p1") require.NoError(t, err) - require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) + require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) { + return []peer.Peer{p}, nil + })) var msg *testservice.StreamMessage select { @@ -107,7 +111,9 @@ func TestStreamPool_Send(t *testing.T) { var numMsgs = 5 for i := 0; i < numMsgs; i++ { - go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) + go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) { + return []peer.Peer{p}, nil + })) } var msgs []*testservice.StreamMessage From 78067eaa4c4656310591fe957c07035b4673e889 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 16:43:52 +0300 Subject: [PATCH 2/5] dial pool --- commonspace/space.go | 9 ++++++++- net/streampool/sendpool.go | 16 ++++++++-------- net/streampool/streampool.go | 12 ++++-------- net/streampool/streampoolservice.go | 3 ++- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/commonspace/space.go b/commonspace/space.go index b4ab1da3..378490d3 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -27,6 +27,7 @@ import ( "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" "github.com/anytypeio/any-sync/util/multiqueue" "github.com/anytypeio/any-sync/util/slice" + "github.com/cheggaaa/mb/v3" "github.com/zeebo/errs" "go.uber.org/zap" "strconv" @@ -359,7 +360,13 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) _ = s.handleQueue.CloseThread(threadId) }() } - return s.handleQueue.Add(ctx, threadId, hm) + err = s.handleQueue.Add(ctx, threadId, hm) + if err == mb.ErrOverflowed { + log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId)) + // skip overflowed error + return nil + } + return } func (s *space) handleMessage(msg HandleMessage) { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index 899da5c5..5bf075e7 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -6,11 +6,11 @@ import ( "go.uber.org/zap" ) -// newStreamSender creates new sendPool +// newExecPool creates new execPool // workers - how many processes will execute tasks // maxSize - limit for queue size -func newStreamSender(workers, maxSize int) *sendPool { - ss := &sendPool{ +func newExecPool(workers, maxSize int) *execPool { + ss := &execPool{ batch: mb.New[func()](maxSize), } for i := 0; i < workers; i++ { @@ -19,16 +19,16 @@ func newStreamSender(workers, maxSize int) *sendPool { return ss } -// sendPool needed for parallel execution of the incoming send tasks -type sendPool struct { +// execPool needed for parallel execution of the incoming send tasks +type execPool struct { batch *mb.MB[func()] } -func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) { +func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { return ss.batch.Add(ctx, f...) } -func (ss *sendPool) sendLoop() { +func (ss *execPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) if err != nil { @@ -39,6 +39,6 @@ func (ss *sendPool) sendLoop() { } } -func (ss *sendPool) Close() (err error) { +func (ss *execPool) Close() (err error) { return ss.batch.Close() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index a846ef51..fb53cd69 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -50,8 +50,9 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - exec *sendPool - mu sync.RWMutex + exec *execPool + dial *execPool + mu sync.Mutex lastStreamId uint32 } @@ -59,11 +60,6 @@ type openingProcess struct { ch chan struct{} err error } -type handleMessage struct { - ctx context.Context - msg drpc.Message - peerId string -} func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { st := s.addStream(peerId, drpcStream, tags...) @@ -108,7 +104,7 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter Peer } } } - return s.exec.Add(ctx, func() { + return s.dial.Add(ctx, func() { peers, dialErr := peerGetter(ctx) if dialErr != nil { log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index e4d5965f..fe6c9ce2 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -28,7 +28,8 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newStreamSender(10, 100), + exec: newExecPool(10, 100), + dial: newExecPool(4, 100), } return sp } From 9dc3c807fb79da0de52529c603b50ebf372ecdeb Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 17:03:31 +0300 Subject: [PATCH 3/5] stream config --- net/streampool/streampool_test.go | 7 ++++++- net/streampool/streampoolservice.go | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index c67377cc..2ab15d26 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -200,7 +200,12 @@ func newFixture(t *testing.T) *fixture { require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh)) fx.tp = rpctest.NewTestPool().WithServer(ts) fx.th = &testHandler{} - fx.StreamPool = New().NewStreamPool(fx.th) + fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{ + SendQueueWorkers: 4, + SendQueueSize: 10, + DialQueueWorkers: 1, + DialQueueSize: 10, + }) return fx } diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index fe6c9ce2..25070497 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -13,23 +13,34 @@ func New() Service { return new(service) } +type StreamConfig struct { + // SendQueueWorkers how many workers will write message to streams + SendQueueWorkers int + // SendQueueSize size of the queue for write + SendQueueSize int + // DialQueueWorkers how many workers will dial to peers + DialQueueWorkers int + // DialQueueSize size of the dial queue + DialQueueSize int +} + type Service interface { - NewStreamPool(h StreamHandler) StreamPool + NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool app.Component } type service struct { } -func (s *service) NewStreamPool(h StreamHandler) StreamPool { +func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { sp := &streamPool{ handler: h, streamIdsByPeer: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newExecPool(10, 100), - dial: newExecPool(4, 100), + exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize), + dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } return sp } From d04d1f3a3b39f8ac275d1543f5244a72cb9676d4 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 18:40:25 +0300 Subject: [PATCH 4/5] streampool: close dial queue --- net/streampool/streampool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index fb53cd69..8319e972 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -332,6 +332,9 @@ func (s *streamPool) removeStream(streamId uint32) { } func (s *streamPool) Close() (err error) { + if e := s.dial.Close(); e != nil { + log.Warn("dial queue close error", zap.Error(e)) + } return s.exec.Close() } From 73762e460a6474e08d19988da4c04484575c1081 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 19:18:53 +0300 Subject: [PATCH 5/5] streammanger -> peermanager, remove confconnector --- commonspace/confconnector/confconnector.go | 85 ---------------- .../mock_confconnector/mock_confconnector.go | 96 ------------------- commonspace/headsync/diffsyncer.go | 10 +- commonspace/headsync/diffsyncer_test.go | 22 ++--- commonspace/headsync/headsync.go | 6 +- .../object/tree/synctree/syncclient.go | 2 - commonspace/objectsync/msgpool.go | 21 ++-- commonspace/objectsync/objectsync.go | 5 +- .../mock_peermanager/mock_peermanager.go | 94 ++++++++++++++++++ commonspace/peermanager/peermanager.go | 27 ++++++ commonspace/spaceservice.go | 27 +++--- commonspace/streammanager/streammanager.go | 14 --- 12 files changed, 163 insertions(+), 246 deletions(-) delete mode 100644 commonspace/confconnector/confconnector.go delete mode 100644 commonspace/confconnector/mock_confconnector/mock_confconnector.go create mode 100644 commonspace/peermanager/mock_peermanager/mock_peermanager.go create mode 100644 commonspace/peermanager/peermanager.go delete mode 100644 commonspace/streammanager/streammanager.go diff --git a/commonspace/confconnector/confconnector.go b/commonspace/confconnector/confconnector.go deleted file mode 100644 index 0a259c90..00000000 --- a/commonspace/confconnector/confconnector.go +++ /dev/null @@ -1,85 +0,0 @@ -//go:generate mockgen -destination mock_confconnector/mock_confconnector.go github.com/anytypeio/any-sync/commonspace/confconnector ConfConnector -package confconnector - -import ( - "context" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/pool" - "github.com/anytypeio/any-sync/nodeconf" - "golang.org/x/exp/slices" -) - -type ConfConnector interface { - Configuration() nodeconf.Configuration - Pool() pool.Pool - GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) - DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) -} - -type confConnector struct { - conf nodeconf.Configuration - pool pool.Pool -} - -func NewConfConnector(conf nodeconf.Configuration, pool pool.Pool) ConfConnector { - return &confConnector{conf: conf, pool: pool} -} - -func (s *confConnector) Configuration() nodeconf.Configuration { - return s.conf -} - -func (s *confConnector) Pool() pool.Pool { - return s.pool -} - -func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get) -} - -func (s *confConnector) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial) -} - -func (s *confConnector) connectOneOrMany( - ctx context.Context, - spaceId string, - activeNodeIds []string, - connectOne func(context.Context, string) (peer.Peer, error)) (peers []peer.Peer, err error) { - var ( - inactiveNodeIds []string - allNodes = s.conf.NodeIds(spaceId) - ) - for _, id := range allNodes { - if !slices.Contains(activeNodeIds, id) { - inactiveNodeIds = append(inactiveNodeIds, id) - } - } - - if s.conf.IsResponsible(spaceId) { - for _, id := range inactiveNodeIds { - var p peer.Peer - p, err = connectOne(ctx, id) - if err != nil { - continue - } - peers = append(peers, p) - } - } else if len(activeNodeIds) == 0 { - // that means that all connected ids - var p peer.Peer - p, err = s.pool.GetOneOf(ctx, allNodes) - if err != nil { - return - } - - // if we are dialling someone, we want to dial to the same peer which we cached - // thus communication through streams and through diff will go to the same node - p, err = connectOne(ctx, p.Id()) - if err != nil { - return - } - peers = []peer.Peer{p} - } - return -} diff --git a/commonspace/confconnector/mock_confconnector/mock_confconnector.go b/commonspace/confconnector/mock_confconnector/mock_confconnector.go deleted file mode 100644 index facacfb3..00000000 --- a/commonspace/confconnector/mock_confconnector/mock_confconnector.go +++ /dev/null @@ -1,96 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anytypeio/any-sync/commonspace/confconnector (interfaces: ConfConnector) - -// Package mock_confconnector is a generated GoMock package. -package mock_confconnector - -import ( - context "context" - reflect "reflect" - - peer "github.com/anytypeio/any-sync/net/peer" - pool "github.com/anytypeio/any-sync/net/pool" - nodeconf "github.com/anytypeio/any-sync/nodeconf" - gomock "github.com/golang/mock/gomock" -) - -// MockConfConnector is a mock of ConfConnector interface. -type MockConfConnector struct { - ctrl *gomock.Controller - recorder *MockConfConnectorMockRecorder -} - -// MockConfConnectorMockRecorder is the mock recorder for MockConfConnector. -type MockConfConnectorMockRecorder struct { - mock *MockConfConnector -} - -// NewMockConfConnector creates a new mock instance. -func NewMockConfConnector(ctrl *gomock.Controller) *MockConfConnector { - mock := &MockConfConnector{ctrl: ctrl} - mock.recorder = &MockConfConnectorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockConfConnector) EXPECT() *MockConfConnectorMockRecorder { - return m.recorder -} - -// Configuration mocks base method. -func (m *MockConfConnector) Configuration() nodeconf.Configuration { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Configuration") - ret0, _ := ret[0].(nodeconf.Configuration) - return ret0 -} - -// Configuration indicates an expected call of Configuration. -func (mr *MockConfConnectorMockRecorder) Configuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configuration", reflect.TypeOf((*MockConfConnector)(nil).Configuration)) -} - -// DialInactiveResponsiblePeers mocks base method. -func (m *MockConfConnector) DialInactiveResponsiblePeers(arg0 context.Context, arg1 string, arg2 []string) ([]peer.Peer, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DialInactiveResponsiblePeers", arg0, arg1, arg2) - ret0, _ := ret[0].([]peer.Peer) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DialInactiveResponsiblePeers indicates an expected call of DialInactiveResponsiblePeers. -func (mr *MockConfConnectorMockRecorder) DialInactiveResponsiblePeers(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialInactiveResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).DialInactiveResponsiblePeers), arg0, arg1, arg2) -} - -// GetResponsiblePeers mocks base method. -func (m *MockConfConnector) GetResponsiblePeers(arg0 context.Context, arg1 string) ([]peer.Peer, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0, arg1) - ret0, _ := ret[0].([]peer.Peer) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetResponsiblePeers indicates an expected call of GetResponsiblePeers. -func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1) -} - -// Pool mocks base method. -func (m *MockConfConnector) Pool() pool.Pool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Pool") - ret0, _ := ret[0].(pool.Pool) - return ret0 -} - -// Pool indicates an expected call of Pool. -func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool)) -} diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 9665702a..9db4d3db 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -28,7 +28,7 @@ type DiffSyncer interface { func newDiffSyncer( spaceId string, diff ldiff.Diff, - confConnector confconnector.ConfConnector, + peerManager peermanager.PeerManager, cache treegetter.TreeGetter, storage spacestorage.SpaceStorage, clientFactory spacesyncproto.ClientFactory, @@ -39,7 +39,7 @@ func newDiffSyncer( spaceId: spaceId, cache: cache, storage: storage, - confConnector: confConnector, + peerManager: peerManager, clientFactory: clientFactory, log: log, syncStatus: syncStatus, @@ -49,7 +49,7 @@ func newDiffSyncer( type diffSyncer struct { spaceId string diff ldiff.Diff - confConnector confconnector.ConfConnector + peerManager peermanager.PeerManager cache treegetter.TreeGetter storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory @@ -88,7 +88,7 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) { func (d *diffSyncer) Sync(ctx context.Context) error { st := time.Now() // diffing with responsible peers according to configuration - peers, err := d.confConnector.GetResponsiblePeers(ctx, d.spaceId) + peers, err := d.peerManager.GetResponsiblePeers(ctx) if err != nil { return err } diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 51192e9f..19049ae1 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -6,12 +6,12 @@ import ( "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff/mock_ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector/mock_confconnector" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" mock_treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage/mock_treestorage" "github.com/anytypeio/any-sync/commonspace/object/treegetter/mock_treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager/mock_peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate/mock_deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage/mock_spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -99,7 +99,7 @@ func TestDiffSyncer_Sync(t *testing.T) { defer ctrl.Finish() diffMock := mock_ldiff.NewMockDiff(ctrl) - connectorMock := mock_confconnector.NewMockConfConnector(ctrl) + peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl) cacheMock := mock_treegetter.NewMockTreeGetter(ctrl) stMock := mock_spacestorage.NewMockSpaceStorage(ctrl) clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) @@ -110,13 +110,13 @@ func TestDiffSyncer_Sync(t *testing.T) { spaceId := "spaceId" aclRootId := "aclRootId" l := logger.NewNamed(spaceId) - diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) + diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) delState.EXPECT().AddObserver(gomock.Any()) diffSyncer.Init(delState) t.Run("diff syncer sync", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). @@ -131,8 +131,8 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync conf error", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return(nil, fmt.Errorf("some error")) require.Error(t, diffSyncer.Sync(ctx)) @@ -173,8 +173,8 @@ func TestDiffSyncer_Sync(t *testing.T) { spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} spaceSettingsId := "spaceSettingsId" - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). @@ -197,8 +197,8 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync other error", func(t *testing.T) { - connectorMock.EXPECT(). - GetResponsiblePeers(gomock.Any(), spaceId). + peerManagerMock.EXPECT(). + GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 50a18a99..7ab3b255 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -5,8 +5,8 @@ import ( "context" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/treegetter" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -49,7 +49,7 @@ func NewHeadSync( spaceId string, syncPeriod int, storage spacestorage.SpaceStorage, - confConnector confconnector.ConfConnector, + peerManager peermanager.PeerManager, cache treegetter.TreeGetter, syncStatus syncstatus.StatusUpdater, log logger.CtxLogger) HeadSync { @@ -57,7 +57,7 @@ func NewHeadSync( diff := ldiff.New(16, 16) l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) - syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l) + syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l) return &headSync{ diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index 5c7f2e2f..e858f190 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -3,7 +3,6 @@ package synctree import ( "context" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -21,7 +20,6 @@ type syncClient struct { objectsync.MessagePool RequestFactory spaceId string - connector confconnector.ConfConnector configuration nodeconf.Configuration } diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index a12dbb68..47ba14c4 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "go.uber.org/zap" "strconv" @@ -14,17 +15,11 @@ import ( "time" ) -type StreamManager interface { - SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) - SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) - Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) -} - // MessagePool can be made generic to work with different streams type MessagePool interface { ocache.ObjectLastUsage synchandler.SyncHandler - StreamManager + peermanager.PeerManager SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) } @@ -36,7 +31,7 @@ type responseWaiter struct { type messagePool struct { sync.Mutex - StreamManager + peermanager.PeerManager messageHandler MessageHandler waiters map[string]responseWaiter waitersMx sync.Mutex @@ -44,9 +39,9 @@ type messagePool struct { lastUsage atomic.Int64 } -func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { +func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageHandler) MessagePool { s := &messagePool{ - StreamManager: streamManager, + PeerManager: peerManager, messageHandler: messageHandler, waiters: make(map[string]responseWaiter), } @@ -88,16 +83,16 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.SendPeer(ctx, peerId, msg) + return s.PeerManager.SendPeer(ctx, peerId, msg) } func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.SendResponsible(ctx, msg) + return s.PeerManager.SendResponsible(ctx, msg) } func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - return s.StreamManager.Broadcast(ctx, msg) + return s.PeerManager.Broadcast(ctx, msg) } func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 11c79a4a..5184f797 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "go.uber.org/zap" "time" @@ -34,7 +35,7 @@ type objectSync struct { func NewObjectSync( spaceId string, - streamManager StreamManager, + peerManager peermanager.PeerManager, objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync { syncCtx, cancel := context.WithCancel(context.Background()) os := newObjectSync( @@ -42,7 +43,7 @@ func NewObjectSync( objectGetter, syncCtx, cancel) - msgPool := newMessagePool(streamManager, os.handleMessage) + msgPool := newMessagePool(peerManager, os.handleMessage) os.messagePool = msgPool return os } diff --git a/commonspace/peermanager/mock_peermanager/mock_peermanager.go b/commonspace/peermanager/mock_peermanager/mock_peermanager.go new file mode 100644 index 00000000..ae97fe7b --- /dev/null +++ b/commonspace/peermanager/mock_peermanager/mock_peermanager.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anytypeio/any-sync/commonspace/peermanager (interfaces: PeerManager) + +// Package mock_peermanager is a generated GoMock package. +package mock_peermanager + +import ( + context "context" + reflect "reflect" + + spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + peer "github.com/anytypeio/any-sync/net/peer" + gomock "github.com/golang/mock/gomock" +) + +// MockPeerManager is a mock of PeerManager interface. +type MockPeerManager struct { + ctrl *gomock.Controller + recorder *MockPeerManagerMockRecorder +} + +// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager. +type MockPeerManagerMockRecorder struct { + mock *MockPeerManager +} + +// NewMockPeerManager creates a new mock instance. +func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager { + mock := &MockPeerManager{ctrl: ctrl} + mock.recorder = &MockPeerManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder { + return m.recorder +} + +// Broadcast mocks base method. +func (m *MockPeerManager) Broadcast(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Broadcast indicates an expected call of Broadcast. +func (mr *MockPeerManagerMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0, arg1) +} + +// GetResponsiblePeers mocks base method. +func (m *MockPeerManager) GetResponsiblePeers(arg0 context.Context) ([]peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetResponsiblePeers", arg0) + ret0, _ := ret[0].([]peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetResponsiblePeers indicates an expected call of GetResponsiblePeers. +func (mr *MockPeerManagerMockRecorder) GetResponsiblePeers(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockPeerManager)(nil).GetResponsiblePeers), arg0) +} + +// SendPeer mocks base method. +func (m *MockPeerManager) SendPeer(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendPeer", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendPeer indicates an expected call of SendPeer. +func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2) +} + +// SendResponsible mocks base method. +func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendResponsible indicates an expected call of SendResponsible. +func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1) +} diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go new file mode 100644 index 00000000..373c3a51 --- /dev/null +++ b/commonspace/peermanager/peermanager.go @@ -0,0 +1,27 @@ +//go:generate mockgen -destination mock_peermanager/mock_peermanager.go github.com/anytypeio/any-sync/commonspace/peermanager PeerManager +package peermanager + +import ( + "context" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/net/peer" +) + +const CName = "common.commonspace.peermanager" + +type PeerManager interface { + // SendPeer sends a message to a stream by peerId + SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) + // SendResponsible sends a message to responsible peers streams + SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + // Broadcast sends a message to all subscribed peers + Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + // GetResponsiblePeers dials or gets from cache responsible peers to unary operations + GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) +} + +type PeerManagerProvider interface { + app.Component + NewPeerManager(ctx context.Context, spaceId string) (sm PeerManager, err error) +} diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 242c8efb..80e48ff0 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -5,15 +5,14 @@ import ( "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/objectsync" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/commonspace/streammanager" "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" @@ -40,13 +39,13 @@ type SpaceService interface { } type spaceService struct { - config Config - account accountservice.Service - configurationService nodeconf.Service - storageProvider spacestorage.SpaceStorageProvider - streamManagerProvider streammanager.StreamManagerProvider - treeGetter treegetter.TreeGetter - pool pool.Pool + config Config + account accountservice.Service + configurationService nodeconf.Service + storageProvider spacestorage.SpaceStorageProvider + peermanagerProvider peermanager.PeerManagerProvider + treeGetter treegetter.TreeGetter + pool pool.Pool } func (s *spaceService) Init(a *app.App) (err error) { @@ -55,7 +54,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) - s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider) + s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -117,7 +116,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { } lastConfiguration := s.configurationService.GetLast() - confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) getter := newCommonGetter(st.Id(), s.treeGetter) syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... @@ -126,14 +124,13 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) } - headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log) - - streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) + peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id) if err != nil { return nil, err } - objectSync := objectsync.NewObjectSync(id, streamManager, getter) + headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log) + objectSync := objectsync.NewObjectSync(id, peerManager, getter) sp := &space{ id: id, objectSync: objectSync, diff --git a/commonspace/streammanager/streammanager.go b/commonspace/streammanager/streammanager.go deleted file mode 100644 index 5fb15b4c..00000000 --- a/commonspace/streammanager/streammanager.go +++ /dev/null @@ -1,14 +0,0 @@ -package streammanager - -import ( - "context" - "github.com/anytypeio/any-sync/app" - "github.com/anytypeio/any-sync/commonspace/objectsync" -) - -const CName = "common.commonspace.streammanager" - -type StreamManagerProvider interface { - app.Component - NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error) -}