From 30a81946383257fd2359f87ded47ecf1b4d854df Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 9 Dec 2022 19:55:57 +0100 Subject: [PATCH] Fix stream checker and tests --- common/commonspace/service.go | 2 +- .../commonspace/syncservice/streamchecker.go | 6 +++- common/commonspace/syncservice/streampool.go | 30 +++++++++++++------ common/commonspace/syncservice/syncservice.go | 5 ++-- common/commonspace/synctree/synctree_test.go | 2 ++ 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/common/commonspace/service.go b/common/commonspace/service.go index ba3dabd2..6b279140 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -109,7 +109,7 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log) - syncService := syncservice.NewSyncService(id, confConnector) + syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod) sp := &space{ id: id, syncService: syncService, diff --git a/common/commonspace/syncservice/streamchecker.go b/common/commonspace/syncservice/streamchecker.go index ffc6b371..cbc152dd 100644 --- a/common/commonspace/syncservice/streamchecker.go +++ b/common/commonspace/syncservice/streamchecker.go @@ -69,8 +69,12 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) { s.log.Error("failed to send first message to stream", zap.Error(err)) continue } + err = s.streamPool.AddAndReadStreamAsync(stream) + if err != nil { + s.log.Error("failed to read from stream async", zap.Error(err)) + continue + } s.log.Debug("reading stream for", zap.String("id", p.Id())) - s.streamPool.AddAndReadStreamAsync(stream) } return } diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index a2bbe4ce..578d3b59 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -24,7 +24,7 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive") type StreamPool interface { ocache.ObjectLastUsage AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) - AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) + AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) @@ -183,23 +183,34 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( return nil } -func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) { - go s.AddAndReadStreamSync(stream) +func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) { + peerId, err := s.addStream(stream) + if err != nil { + return + } + go s.readPeerLoop(peerId, stream) + return } func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { - s.Lock() - peerId, err := peer.CtxPeerId(stream.Context()) + peerId, err := s.addStream(stream) + if err != nil { + return + } + return s.readPeerLoop(peerId, stream) +} + +func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) { + s.Lock() + defer s.Unlock() + peerId, err = peer.CtxPeerId(stream.Context()) if err != nil { - s.Unlock() return } s.peerStreams[peerId] = stream s.wg.Add(1) - s.Unlock() - log.With("peerId", peerId).Debug("reading stream from peer") - return s.readPeerLoop(peerId, stream) + return } func (s *streamPool) Close() (err error) { @@ -213,6 +224,7 @@ func (s *streamPool) Close() (err error) { } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { + log.With("peerId", peerId).Debug("reading stream from peer") defer s.wg.Done() limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) for i := 0; i < maxSimultaneousOperationsPerStream; i++ { diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 5328fbbc..18d613c9 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -38,7 +38,8 @@ type syncService struct { func NewSyncService( spaceId string, - confConnector nodeconf.ConfConnector) (syncService SyncService) { + confConnector nodeconf.ConfConnector, + periodicSeconds int) (syncService SyncService) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return syncService.HandleMessage(ctx, senderId, message) }) @@ -50,7 +51,7 @@ func NewSyncService( streamPool, clientFactory, syncLog) - periodicSync := periodicsync.NewPeriodicSync(respPeersStreamCheckInterval, checker.CheckResponsiblePeers, syncLog) + periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog) syncService = newSyncService( spaceId, streamPool, diff --git a/common/commonspace/synctree/synctree_test.go b/common/commonspace/synctree/synctree_test.go index 8d6511c3..05343493 100644 --- a/common/commonspace/synctree/synctree_test.go +++ b/common/commonspace/synctree/synctree_test.go @@ -65,6 +65,7 @@ func Test_DeriveSyncTree(t *testing.T) { Payload: expectedPayload, SpaceStorage: spaceStorageMock, } + objTreeMock.EXPECT().ID().Return("id") _, err := DeriveSyncTree(ctx, deps) require.NoError(t, err) @@ -92,6 +93,7 @@ func Test_CreateSyncTree(t *testing.T) { headUpdate := &treechangeproto.TreeSyncMessage{} syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate) syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil) + objTreeMock.EXPECT().ID().Return("id") deps := CreateDeps{ AclList: aclListMock, SpaceId: spaceId,