Fix stream checker and tests

This commit is contained in:
mcrakhman 2022-12-09 19:55:57 +01:00
parent eb078d311a
commit 5eaa3bfe44
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
5 changed files with 32 additions and 13 deletions

View File

@ -109,7 +109,7 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast()
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log)
syncService := syncservice.NewSyncService(id, confConnector)
syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
sp := &space{
id: id,
syncService: syncService,

View File

@ -69,8 +69,12 @@ func (s *streamChecker) CheckResponsiblePeers(ctx context.Context) (err error) {
s.log.Error("failed to send first message to stream", zap.Error(err))
continue
}
err = s.streamPool.AddAndReadStreamAsync(stream)
if err != nil {
s.log.Error("failed to read from stream async", zap.Error(err))
continue
}
s.log.Debug("reading stream for", zap.String("id", p.Id()))
s.streamPool.AddAndReadStreamAsync(stream)
}
return
}

View File

@ -24,7 +24,7 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive")
type StreamPool interface {
ocache.ObjectLastUsage
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error)
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
@ -183,23 +183,34 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
return nil
}
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) {
go s.AddAndReadStreamSync(stream)
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) {
peerId, err := s.addStream(stream)
if err != nil {
return
}
go s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) {
s.Lock()
peerId, err := peer.CtxPeerId(stream.Context())
peerId, err := s.addStream(stream)
if err != nil {
return
}
return s.readPeerLoop(peerId, stream)
}
func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) {
s.Lock()
defer s.Unlock()
peerId, err = peer.CtxPeerId(stream.Context())
if err != nil {
s.Unlock()
return
}
s.peerStreams[peerId] = stream
s.wg.Add(1)
s.Unlock()
log.With("peerId", peerId).Debug("reading stream from peer")
return s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) Close() (err error) {
@ -213,6 +224,7 @@ func (s *streamPool) Close() (err error) {
}
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
log.With("peerId", peerId).Debug("reading stream from peer")
defer s.wg.Done()
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {

View File

@ -38,7 +38,8 @@ type syncService struct {
func NewSyncService(
spaceId string,
confConnector nodeconf.ConfConnector) (syncService SyncService) {
confConnector nodeconf.ConfConnector,
periodicSeconds int) (syncService SyncService) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncService.HandleMessage(ctx, senderId, message)
})
@ -50,7 +51,7 @@ func NewSyncService(
streamPool,
clientFactory,
syncLog)
periodicSync := periodicsync.NewPeriodicSync(respPeersStreamCheckInterval, checker.CheckResponsiblePeers, syncLog)
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, checker.CheckResponsiblePeers, syncLog)
syncService = newSyncService(
spaceId,
streamPool,

View File

@ -65,6 +65,7 @@ func Test_DeriveSyncTree(t *testing.T) {
Payload: expectedPayload,
SpaceStorage: spaceStorageMock,
}
objTreeMock.EXPECT().ID().Return("id")
_, err := DeriveSyncTree(ctx, deps)
require.NoError(t, err)
@ -92,6 +93,7 @@ func Test_CreateSyncTree(t *testing.T) {
headUpdate := &treechangeproto.TreeSyncMessage{}
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
objTreeMock.EXPECT().ID().Return("id")
deps := CreateDeps{
AclList: aclListMock,
SpaceId: spaceId,