From fcbc6af4790eafad4401f79de9e0479cf9978ed3 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 16 Dec 2022 21:02:34 +0100 Subject: [PATCH] Add comments and fix streampool test --- common/commonspace/rpchandler.go | 1 + .../syncservice/streampool_test.go | 39 ++++--------------- common/commonspace/synctree/synctree.go | 2 +- .../commonspace/synctree/synctreehandler.go | 1 + 4 files changed, 11 insertions(+), 32 deletions(-) diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index dbc5b7aa..c8b1dbd4 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -19,5 +19,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { + // TODO: if needed we can launch full sync here return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream) } diff --git a/common/commonspace/syncservice/streampool_test.go b/common/commonspace/syncservice/streampool_test.go index 2d2ff512..3fd7f746 100644 --- a/common/commonspace/syncservice/streampool_test.go +++ b/common/commonspace/syncservice/streampool_test.go @@ -122,39 +122,16 @@ func TestStreamPool_AddAndReadStreamAsync(t *testing.T) { func TestStreamPool_Close(t *testing.T) { remId := "remoteId" - t.Run("client close", func(t *testing.T) { + t.Run("close", func(t *testing.T) { fx := newFixture(t, "", remId, nil) fx.run(t) - var events []string - recvChan := make(chan struct{}) - go func() { - fx.pool.Close() - events = append(events, "pool_close") - recvChan <- struct{}{} - }() - time.Sleep(50 * time.Millisecond) //err = <-waitCh - events = append(events, "stream_close") - err := fx.clientStream.Close() - require.NoError(t, err) - <-recvChan - require.Equal(t, []string{"stream_close", "pool_close"}, events) - }) - t.Run("server close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - fx.run(t) - var events []string - recvChan := make(chan struct{}) - go func() { - fx.pool.Close() - events = append(events, "pool_close") - recvChan <- struct{}{} - }() - time.Sleep(50 * time.Millisecond) //err = <-waitCh - events = append(events, "stream_close") - err := fx.clientStream.Close() - require.NoError(t, err) - <-recvChan - require.Equal(t, []string{"stream_close", "pool_close"}, events) + fx.pool.Close() + select { + case <-fx.clientStream.Context().Done(): + break + case <-time.After(time.Millisecond * 100): + t.Fatal("context should be closed") + } }) } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 64c9f809..221e71e3 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -313,8 +313,8 @@ func (s *syncTree) checkAlive() (err error) { func (s *syncTree) Ping() (err error) { s.Lock() + defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - s.Unlock() return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) } diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index f0bfe1fd..7990ab59 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -35,6 +35,7 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand } func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + // TODO: when implementing sync status check msg heads before sending into queue unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) if err != nil {