Add comments and fix streampool test
This commit is contained in:
parent
0c85336e82
commit
3b320087f3
@ -19,5 +19,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
|
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
|
||||||
|
// TODO: if needed we can launch full sync here
|
||||||
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
|
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -122,39 +122,16 @@ func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
|
|||||||
func TestStreamPool_Close(t *testing.T) {
|
func TestStreamPool_Close(t *testing.T) {
|
||||||
remId := "remoteId"
|
remId := "remoteId"
|
||||||
|
|
||||||
t.Run("client close", func(t *testing.T) {
|
t.Run("close", func(t *testing.T) {
|
||||||
fx := newFixture(t, "", remId, nil)
|
fx := newFixture(t, "", remId, nil)
|
||||||
fx.run(t)
|
fx.run(t)
|
||||||
var events []string
|
fx.pool.Close()
|
||||||
recvChan := make(chan struct{})
|
select {
|
||||||
go func() {
|
case <-fx.clientStream.Context().Done():
|
||||||
fx.pool.Close()
|
break
|
||||||
events = append(events, "pool_close")
|
case <-time.After(time.Millisecond * 100):
|
||||||
recvChan <- struct{}{}
|
t.Fatal("context should be closed")
|
||||||
}()
|
}
|
||||||
time.Sleep(50 * time.Millisecond) //err = <-waitCh
|
|
||||||
events = append(events, "stream_close")
|
|
||||||
err := fx.clientStream.Close()
|
|
||||||
require.NoError(t, err)
|
|
||||||
<-recvChan
|
|
||||||
require.Equal(t, []string{"stream_close", "pool_close"}, events)
|
|
||||||
})
|
|
||||||
t.Run("server close", func(t *testing.T) {
|
|
||||||
fx := newFixture(t, "", remId, nil)
|
|
||||||
fx.run(t)
|
|
||||||
var events []string
|
|
||||||
recvChan := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
fx.pool.Close()
|
|
||||||
events = append(events, "pool_close")
|
|
||||||
recvChan <- struct{}{}
|
|
||||||
}()
|
|
||||||
time.Sleep(50 * time.Millisecond) //err = <-waitCh
|
|
||||||
events = append(events, "stream_close")
|
|
||||||
err := fx.clientStream.Close()
|
|
||||||
require.NoError(t, err)
|
|
||||||
<-recvChan
|
|
||||||
require.Equal(t, []string{"stream_close", "pool_close"}, events)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -313,8 +313,8 @@ func (s *syncTree) checkAlive() (err error) {
|
|||||||
|
|
||||||
func (s *syncTree) Ping() (err error) {
|
func (s *syncTree) Ping() (err error) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||||
s.Unlock()
|
|
||||||
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -35,6 +35,7 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
|
// TODO: when implementing sync status check msg heads before sending into queue
|
||||||
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||||
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user