diff --git a/common/commonspace/syncservice/streampool_test.go b/common/commonspace/syncservice/streampool_test.go index 7540e02f..47ed4ee0 100644 --- a/common/commonspace/syncservice/streampool_test.go +++ b/common/commonspace/syncservice/streampool_test.go @@ -7,26 +7,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" - "storj.io/drpc" "testing" "time" ) -type testPeer struct { - id string - drpc.Conn -} - -func (t testPeer) Id() string { - return t.id -} - -func (t testPeer) LastUsage() time.Time { - return time.Now() -} - -func (t testPeer) UpdateLastUsage() {} - type testServer struct { stream chan spacesyncproto.DRPCSpace_StreamStream addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error @@ -79,8 +63,7 @@ func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1) require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer)) clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId) - p := &testPeer{id: localId.String(), Conn: fx.drpcTS.DialWrapConn(nil, clientWrapper)} - fx.client = spacesyncproto.NewDRPCSpaceClient(p) + fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper)) var err error fx.clientStream, err = fx.client.Stream(context.Background()) diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 40149b78..38de40f8 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -104,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { stream, err := s.clientFactory.Client(peer).Stream(ctx) if err != nil { err = rpcerr.Unwrap(err) - log.With("spaceId", s.spaceId).Errorf("failed to open clientStream: %v", err) + log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err) // so here probably the request is failed because there is no such space, // but diffService should handle such cases by sending pushSpace continue @@ -113,7 +113,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) if err != nil { err = rpcerr.Unwrap(err) - log.With("spaceId", s.spaceId).Errorf("failed to send first message to clientStream: %v", err) + log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) continue } s.streamPool.AddAndReadStreamAsync(stream)