Fix stream checker and tests

This commit is contained in:
mcrakhman 2022-12-09 19:55:57 +01:00 committed by Mikhail Iudin
parent 8a4b58969b
commit 30a8194638
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
5 changed files with 32 additions and 13 deletions

View File

@ -109,7 +109,7 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast() lastConfiguration := s.configurationService.GetLast()
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log) diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log)
syncService := syncservice.NewSyncService(id, confConnector) syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
sp := &space{ sp := &space{
id: id, id: id,
syncService: syncService, syncService: syncService,

View File

@ -69,8 +69,12 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
s.log.Error("failed to send first message to stream", zap.Error(err)) s.log.Error("failed to send first message to stream", zap.Error(err))
continue continue
} }
err = s.streamPool.AddAndReadStreamAsync(stream)
if err != nil {
s.log.Error("failed to read from stream async", zap.Error(err))
continue
}
s.log.Debug("reading stream for", zap.String("id", p.Id())) s.log.Debug("reading stream for", zap.String("id", p.Id()))
s.streamPool.AddAndReadStreamAsync(stream)
} }
return return
} }

View File

@ -24,7 +24,7 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive")
type StreamPool interface { type StreamPool interface {
ocache.ObjectLastUsage ocache.ObjectLastUsage
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error)
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
@ -183,23 +183,34 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
return nil return nil
} }
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) { func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) {
go s.AddAndReadStreamSync(stream) peerId, err := s.addStream(stream)
if err != nil {
return
}
go s.readPeerLoop(peerId, stream)
return
} }
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) {
s.Lock() peerId, err := s.addStream(stream)
peerId, err := peer.CtxPeerId(stream.Context()) if err != nil {
return
}
return s.readPeerLoop(peerId, stream)
}
func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) {
s.Lock()
defer s.Unlock()
peerId, err = peer.CtxPeerId(stream.Context())
if err != nil { if err != nil {
s.Unlock()
return return
} }
s.peerStreams[peerId] = stream s.peerStreams[peerId] = stream
s.wg.Add(1) s.wg.Add(1)
s.Unlock() return
log.With("peerId", peerId).Debug("reading stream from peer")
return s.readPeerLoop(peerId, stream)
} }
func (s *streamPool) Close() (err error) { func (s *streamPool) Close() (err error) {
@ -213,6 +224,7 @@ func (s *streamPool) Close() (err error) {
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
log.With("peerId", peerId).Debug("reading stream from peer")
defer s.wg.Done() defer s.wg.Done()
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { for i := 0; i < maxSimultaneousOperationsPerStream; i++ {

View File

@ -38,7 +38,8 @@ type syncService struct {
func NewSyncService( func NewSyncService(
spaceId string, spaceId string,
confConnector nodeconf.ConfConnector) (syncService SyncService) { confConnector nodeconf.ConfConnector,
periodicSeconds int) (syncService SyncService) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncService.HandleMessage(ctx, senderId, message) return syncService.HandleMessage(ctx, senderId, message)
}) })
@ -50,7 +51,7 @@ func NewSyncService(
streamPool, streamPool,
clientFactory, clientFactory,
syncLog) syncLog)
periodicSync := periodicsync.NewPeriodicSync(respPeersStreamCheckInterval, checker.CheckResponsiblePeers, syncLog) periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog)
syncService = newSyncService( syncService = newSyncService(
spaceId, spaceId,
streamPool, streamPool,

View File

@ -65,6 +65,7 @@ func Test_DeriveSyncTree(t *testing.T) {
Payload: expectedPayload, Payload: expectedPayload,
SpaceStorage: spaceStorageMock, SpaceStorage: spaceStorageMock,
} }
objTreeMock.EXPECT().ID().Return("id")
_, err := DeriveSyncTree(ctx, deps) _, err := DeriveSyncTree(ctx, deps)
require.NoError(t, err) require.NoError(t, err)
@ -92,6 +93,7 @@ func Test_CreateSyncTree(t *testing.T) {
headUpdate := &treechangeproto.TreeSyncMessage{} headUpdate := &treechangeproto.TreeSyncMessage{}
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
objTreeMock.EXPECT().ID().Return("id")
deps := CreateDeps{ deps := CreateDeps{
AclList: aclListMock, AclList: aclListMock,
SpaceId: spaceId, SpaceId: spaceId,