Add comments and fix streampool test
This commit is contained in:
parent
6f13df03a6
commit
fcbc6af479
@ -19,5 +19,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
|
||||
// TODO: if needed we can launch full sync here
|
||||
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
|
||||
}
|
||||
|
||||
@ -122,39 +122,16 @@ func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
|
||||
func TestStreamPool_Close(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
|
||||
t.Run("client close", func(t *testing.T) {
|
||||
t.Run("close", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
fx.run(t)
|
||||
var events []string
|
||||
recvChan := make(chan struct{})
|
||||
go func() {
|
||||
fx.pool.Close()
|
||||
events = append(events, "pool_close")
|
||||
recvChan <- struct{}{}
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond) //err = <-waitCh
|
||||
events = append(events, "stream_close")
|
||||
err := fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
<-recvChan
|
||||
require.Equal(t, []string{"stream_close", "pool_close"}, events)
|
||||
})
|
||||
t.Run("server close", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
fx.run(t)
|
||||
var events []string
|
||||
recvChan := make(chan struct{})
|
||||
go func() {
|
||||
fx.pool.Close()
|
||||
events = append(events, "pool_close")
|
||||
recvChan <- struct{}{}
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond) //err = <-waitCh
|
||||
events = append(events, "stream_close")
|
||||
err := fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
<-recvChan
|
||||
require.Equal(t, []string{"stream_close", "pool_close"}, events)
|
||||
fx.pool.Close()
|
||||
select {
|
||||
case <-fx.clientStream.Context().Done():
|
||||
break
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
t.Fatal("context should be closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -313,8 +313,8 @@ func (s *syncTree) checkAlive() (err error) {
|
||||
|
||||
func (s *syncTree) Ping() (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||
s.Unlock()
|
||||
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +35,7 @@ func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchand
|
||||
}
|
||||
|
||||
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
// TODO: when implementing sync status check msg heads before sending into queue
|
||||
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user