diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index c58cb58e..76a3cae9 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -2,6 +2,7 @@ package headsync import ( "context" + "fmt" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree" @@ -91,7 +92,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error { return err } for _, p := range peers { - if err := d.syncWithPeer(ctx, p); err != nil { + if err = d.syncWithPeer(ctx, p); err != nil { d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) } } @@ -110,7 +111,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) err = rpcerr.Unwrap(err) if err != nil && err != spacesyncproto.ErrSpaceMissing { d.syncStatus.SetNodesOnline(p.Id(), false) - return err + return fmt.Errorf("diff error: %v", err) } d.syncStatus.SetNodesOnline(p.Id(), true) @@ -148,7 +149,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - syncTree.Ping() + _ = syncTree.Ping(ctx) } } diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index 2a29cd0c..d18fea0e 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -9,10 +9,10 @@ import ( type SyncAcl struct { list.AclList synchandler.SyncHandler - streamPool objectsync.StreamPool + streamPool objectsync.MessagePool } -func NewSyncAcl(aclList list.AclList, streamPool objectsync.StreamPool) *SyncAcl { +func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl { return &SyncAcl{ AclList: aclList, SyncHandler: nil, diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 6fad9dd5..8218aa2e 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -40,32 +40,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder { return m.recorder } -// BroadcastAsync mocks base method. -func (m *MockSyncClient) BroadcastAsync(arg0 *treechangeproto.TreeSyncMessage) error { +// Broadcast mocks base method. +func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsync", arg0) + ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// BroadcastAsync indicates an expected call of BroadcastAsync. -func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.Call { +// Broadcast indicates an expected call of Broadcast. +func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) } // BroadcastAsyncOrSendResponsible mocks base method. -func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *treechangeproto.TreeSyncMessage) error { +func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0) + ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. -func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call { +func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1) } // CreateFullSyncRequest mocks base method. @@ -126,18 +126,18 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest)) } -// SendAsync mocks base method. -func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error { +// SendWithReply mocks base method. +func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendAsync", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } -// SendAsync indicates an expected call of SendAsync. -func (mr *MockSyncClientMockRecorder) SendAsync(arg0, arg1, arg2 interface{}) *gomock.Call { +// SendWithReply indicates an expected call of SendWithReply. +func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAsync", reflect.TypeOf((*MockSyncClient)(nil).SendAsync), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3) } // MockSyncTree is a mock of SyncTree interface. @@ -395,17 +395,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { } // Ping mocks base method. -func (m *MockSyncTree) Ping() error { +func (m *MockSyncTree) Ping(arg0 context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping") + ret := m.ctrl.Call(m, "Ping", arg0) ret0, _ := ret[0].(error) return ret0 } // Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call { +func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) } // RLock mocks base method. diff --git a/commonspace/object/tree/synctree/queuedclient.go b/commonspace/object/tree/synctree/queuedclient.go index 5af9d95e..49ea3922 100644 --- a/commonspace/object/tree/synctree/queuedclient.go +++ b/commonspace/object/tree/synctree/queuedclient.go @@ -1,6 +1,7 @@ package synctree import ( + "context" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" ) @@ -17,20 +18,20 @@ func newQueuedClient(client SyncClient, queue objectsync.ActionQueue) SyncClient } } -func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { +func (q *queuedClient) Broadcast(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { return q.queue.Send(func() error { - return q.SyncClient.BroadcastAsync(message) + return q.SyncClient.Broadcast(ctx, message) }) } -func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { +func (q *queuedClient) SendWithReply(ctx context.Context, peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { return q.queue.Send(func() error { - return q.SyncClient.SendAsync(peerId, message, replyId) + return q.SyncClient.SendWithReply(ctx, peerId, message, replyId) }) } -func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { +func (q *queuedClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { return q.queue.Send(func() error { - return q.SyncClient.BroadcastAsyncOrSendResponsible(message) + return q.SyncClient.BroadcastAsyncOrSendResponsible(ctx, message) }) } diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index a610e2b5..5c7f2e2f 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -2,6 +2,7 @@ package synctree import ( + "context" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" @@ -11,72 +12,61 @@ import ( type SyncClient interface { RequestFactory - BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) - BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) - SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) + Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) + BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) + SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) } type syncClient struct { - objectsync.StreamPool + objectsync.MessagePool RequestFactory spaceId string connector confconnector.ConfConnector configuration nodeconf.Configuration - - checker objectsync.StreamChecker } func newSyncClient( spaceId string, - pool objectsync.StreamPool, + pool objectsync.MessagePool, factory RequestFactory, - configuration nodeconf.Configuration, - checker objectsync.StreamChecker) SyncClient { + configuration nodeconf.Configuration) SyncClient { return &syncClient{ - StreamPool: pool, + MessagePool: pool, RequestFactory: factory, configuration: configuration, - checker: checker, spaceId: spaceId, } } -func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") +func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") if err != nil { return } - s.checker.CheckResponsiblePeers() - return s.StreamPool.BroadcastAsync(objMsg) + return s.MessagePool.Broadcast(ctx, objMsg) } -func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { - err = s.checker.CheckPeerConnection(peerId) +func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { + objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId) if err != nil { return } - - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId) - if err != nil { - return - } - return s.StreamPool.SendAsync([]string{peerId}, objMsg) + return s.MessagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") +func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "") if err != nil { return } if s.configuration.IsResponsible(s.spaceId) { - s.checker.CheckResponsiblePeers() - return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg) + return s.MessagePool.SendResponsible(ctx, objMsg) } - return s.BroadcastAsync(message) + return s.Broadcast(ctx, message) } -func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { +func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { payload, err := message.Marshal() if err != nil { return @@ -84,7 +74,8 @@ func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId s objMsg = &spacesyncproto.ObjectSyncMessage{ ReplyId: replyId, Payload: payload, - ObjectId: id, + ObjectId: objectId, + SpaceId: spaceId, } return } diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 23dc5298..2b7e27f6 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -30,15 +30,10 @@ type HeadNotifiable interface { UpdateHeads(id string, heads []string) } -type ListenerSetter interface { - SetListener(listener updatelistener.UpdateListener) -} - type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler - ListenerSetter - Ping() (err error) + Ping(ctx context.Context) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -78,34 +73,24 @@ func newWrappedSyncClient( factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.Configuration) SyncClient { - syncClient := newSyncClient(spaceId, objectSync.StreamPool(), factory, configuration, objectSync.StreamChecker()) + syncClient := newSyncClient(spaceId, objectSync.MessagePool(), factory, configuration) return newQueuedClient(syncClient, objectSync.ActionQueue()) } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - streamChecker := deps.ObjectSync.StreamChecker() peerId, err := peer.CtxPeerId(ctx) if err != nil { - streamChecker.CheckResponsiblePeers() - peerId, err = streamChecker.FirstResponsiblePeer() - if err != nil { - return - } + peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] } newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - objMsg, err := marshallTreeMessage(newTreeRequest, id, "") + objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") if err != nil { return } - err = deps.ObjectSync.StreamChecker().CheckPeerConnection(peerId) - if err != nil { - return - } - - resp, err := deps.ObjectSync.StreamPool().SendSync(peerId, objMsg) + resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) if err != nil { return } @@ -218,16 +203,13 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if isFirstBuild { headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree - syncTree.syncClient.BroadcastAsync(headUpdate) + if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil { + log.Error("broadcast error", zap.Error(e)) + } } return } -func (s *syncTree) SetListener(listener updatelistener.UpdateListener) { - // this should be called under lock - s.listener = listener -} - func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { if err = s.checkAlive(); err != nil { return @@ -255,7 +237,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh } s.syncStatus.HeadsChange(s.Id(), res.Heads) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - err = s.syncClient.BroadcastAsync(headUpdate) + err = s.syncClient.Broadcast(ctx, headUpdate) return } @@ -282,7 +264,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. s.notifiable.UpdateHeads(s.Id(), res.Heads) } headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - err = s.syncClient.BroadcastAsync(headUpdate) + err = s.syncClient.Broadcast(ctx, headUpdate) } return } @@ -324,11 +306,11 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping() (err error) { +func (s *syncTree) Ping(ctx context.Context) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) + return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 1bfa7b34..11dbb235 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -111,7 +111,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendAsync(senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) } if s.alreadyHasHeads(objTree, update.Heads) { @@ -135,7 +135,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendAsync(senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) } func (s *syncTreeHandler) handleFullSyncRequest( @@ -159,7 +159,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( if err != nil { log.With(zap.Error(err)).Debug("full sync request finished with error") - s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) + s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId) return } else if fullResponse != nil { log.Debug("full sync response sent") @@ -180,7 +180,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( return } - return s.syncClient.SendAsync(senderId, fullResponse, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId) } func (s *syncTreeHandler) handleFullSyncResponse( diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go new file mode 100644 index 00000000..fc629e5e --- /dev/null +++ b/commonspace/objectsync/msgpool.go @@ -0,0 +1,120 @@ +package objectsync + +import ( + "context" + "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "go.uber.org/zap" + "strconv" + "strings" + "sync" + "sync/atomic" +) + +type StreamManager interface { + SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) + SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) +} + +// MessagePool can be made generic to work with different streams +type MessagePool interface { + synchandler.SyncHandler + StreamManager + SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) +} + +type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) + +type responseWaiter struct { + ch chan *spacesyncproto.ObjectSyncMessage +} + +type messagePool struct { + sync.Mutex + StreamManager + messageHandler MessageHandler + waiters map[string]responseWaiter + waitersMx sync.Mutex + counter atomic.Uint64 + queue ActionQueue +} + +func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { + s := &messagePool{ + StreamManager: streamManager, + messageHandler: messageHandler, + waiters: make(map[string]responseWaiter), + queue: NewDefaultActionQueue(), + } + return s +} + +func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + newCounter := s.counter.Add(1) + msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter) + + s.waitersMx.Lock() + waiter := responseWaiter{ + ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), + } + s.waiters[msg.ReplyId] = waiter + s.waitersMx.Unlock() + + err = s.SendPeer(ctx, peerId, msg) + if err != nil { + return + } + select { + case <-ctx.Done(): + s.waitersMx.Lock() + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + + log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting") + err = ctx.Err() + case reply = <-waiter.ch: + // success + } + return +} + +func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + if msg.ReplyId != "" { + // we got reply, send it to waiter + if s.stopWaiter(msg) { + return + } + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") + return + } + return s.queue.Send(func() error { + if e := s.messageHandler(ctx, senderId, msg); e != nil { + log.Info("handle message error", zap.Error(e)) + } + return nil + }) +} + +func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg + return true + } + s.waitersMx.Unlock() + return false +} + +func genReplyKey(peerId, treeId string, counter uint64) string { + b := &strings.Builder{} + b.WriteString(peerId) + b.WriteString(".") + b.WriteString(treeId) + b.WriteString(".") + b.WriteString(strconv.FormatUint(counter, 36)) + return b.String() +} diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 5eb87b1e..d9c9cc27 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -5,7 +5,6 @@ import ( "context" "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/ocache" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -18,19 +17,17 @@ var log = logger.NewNamed("commonspace.objectsync") type ObjectSync interface { ocache.ObjectLastUsage synchandler.SyncHandler - StreamPool() StreamPool - StreamChecker() StreamChecker + MessagePool() MessagePool ActionQueue() ActionQueue - Init() + Init(getter syncobjectgetter.SyncObjectGetter) Close() (err error) } type objectSync struct { spaceId string - streamPool StreamPool - checker StreamChecker + streamPool MessagePool objectGetter syncobjectgetter.SyncObjectGetter actionQueue ActionQueue @@ -38,28 +35,14 @@ type objectSync struct { cancelSync context.CancelFunc } -func NewObjectSync( - spaceId string, - confConnector confconnector.ConfConnector, - objectGetter syncobjectgetter.SyncObjectGetter) (objectSync ObjectSync) { - streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { +func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) { + msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return objectSync.HandleMessage(ctx, senderId, message) }) - clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) - syncLog := log.With(zap.String("id", spaceId)) syncCtx, cancel := context.WithCancel(context.Background()) - checker := NewStreamChecker( - spaceId, - confConnector, - streamPool, - clientFactory, - syncCtx, - syncLog) objectSync = newObjectSync( spaceId, - streamPool, - checker, - objectGetter, + msgPool, syncCtx, cancel) return @@ -67,36 +50,33 @@ func NewObjectSync( func newObjectSync( spaceId string, - streamPool StreamPool, - checker StreamChecker, - objectGetter syncobjectgetter.SyncObjectGetter, + streamPool MessagePool, syncCtx context.Context, cancel context.CancelFunc, ) *objectSync { return &objectSync{ - objectGetter: objectGetter, - streamPool: streamPool, - spaceId: spaceId, - checker: checker, - syncCtx: syncCtx, - cancelSync: cancel, - actionQueue: NewDefaultActionQueue(), + streamPool: streamPool, + spaceId: spaceId, + syncCtx: syncCtx, + cancelSync: cancel, + actionQueue: NewDefaultActionQueue(), } } -func (s *objectSync) Init() { +func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) { + s.objectGetter = objectGetter s.actionQueue.Run() - go s.checker.CheckResponsiblePeers() } func (s *objectSync) Close() (err error) { s.actionQueue.Close() s.cancelSync() - return s.streamPool.Close() + return } func (s *objectSync) LastUsage() time.Time { - return s.streamPool.LastUsage() + // TODO: [che] + return time.Now() } func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { @@ -108,14 +88,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message return obj.HandleMessage(ctx, senderId, message) } -func (s *objectSync) StreamPool() StreamPool { +func (s *objectSync) MessagePool() MessagePool { return s.streamPool } -func (s *objectSync) StreamChecker() StreamChecker { - return s.checker -} - func (s *objectSync) ActionQueue() ActionQueue { return s.actionQueue } diff --git a/commonspace/objectsync/streamchecker.go b/commonspace/objectsync/streamchecker.go deleted file mode 100644 index 5bf53e5e..00000000 --- a/commonspace/objectsync/streamchecker.go +++ /dev/null @@ -1,146 +0,0 @@ -package objectsync - -import ( - "context" - "fmt" - "github.com/anytypeio/any-sync/commonspace/confconnector" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/rpc/rpcerr" - "go.uber.org/atomic" - "go.uber.org/zap" - "golang.org/x/exp/slices" - "time" -) - -type StreamChecker interface { - CheckResponsiblePeers() - CheckPeerConnection(peerId string) (err error) - FirstResponsiblePeer() (peerId string, err error) -} - -type streamChecker struct { - spaceId string - connector confconnector.ConfConnector - streamPool StreamPool - clientFactory spacesyncproto.ClientFactory - log *zap.Logger - syncCtx context.Context - lastCheck *atomic.Time -} - -const streamCheckerInterval = time.Second * 5 - -func NewStreamChecker( - spaceId string, - connector confconnector.ConfConnector, - streamPool StreamPool, - clientFactory spacesyncproto.ClientFactory, - syncCtx context.Context, - log *zap.Logger) StreamChecker { - return &streamChecker{ - spaceId: spaceId, - connector: connector, - streamPool: streamPool, - clientFactory: clientFactory, - log: log, - syncCtx: syncCtx, - lastCheck: atomic.NewTime(time.Time{}), - } -} - -func (s *streamChecker) CheckResponsiblePeers() { - lastCheck := s.lastCheck.Load() - now := time.Now() - if lastCheck.Add(streamCheckerInterval).After(now) { - return - } - s.lastCheck.Store(now) - - var ( - activeNodeIds []string - configuration = s.connector.Configuration() - ) - nodeIds := configuration.NodeIds(s.spaceId) - for _, nodeId := range nodeIds { - if s.streamPool.HasActiveStream(nodeId) { - s.log.Debug("has active stream for", zap.String("id", nodeId)) - activeNodeIds = append(activeNodeIds, nodeId) - continue - } - } - s.log.Debug("total streams", zap.Int("total", len(activeNodeIds))) - newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds) - if err != nil { - s.log.Error("failed to dial peers", zap.Error(err)) - return - } - - for _, p := range newPeers { - err := s.createStream(p) - if err != nil { - log.With(zap.Error(err)).Error("failed to create stream") - continue - } - s.log.Debug("reading stream for", zap.String("id", p.Id())) - } - return -} - -func (s *streamChecker) CheckPeerConnection(peerId string) (err error) { - if s.streamPool.HasActiveStream(peerId) { - return - } - - var ( - configuration = s.connector.Configuration() - pool = s.connector.Pool() - ) - nodeIds := configuration.NodeIds(s.spaceId) - // we don't know the address of the peer - if !slices.Contains(nodeIds, peerId) { - err = fmt.Errorf("don't know the address of peer %s", peerId) - return - } - - newPeer, err := pool.Dial(s.syncCtx, peerId) - if err != nil { - return - } - return s.createStream(newPeer) -} - -func (s *streamChecker) createStream(p peer.Peer) (err error) { - stream, err := s.clientFactory.Client(p).ObjectSyncStream(s.syncCtx) - if err != nil { - // so here probably the request is failed because there is no such space, - // but diffService should handle such cases by sending pushSpace - err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err)) - return - } - - // sending empty message for the server to understand from which space is it coming - err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) - if err != nil { - err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err)) - return - } - err = s.streamPool.AddAndReadStreamAsync(stream) - if err != nil { - err = fmt.Errorf("failed to read from stream async: %w", err) - return - } - return -} - -func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) { - nodeIds := s.connector.Configuration().NodeIds(s.spaceId) - for _, nodeId := range nodeIds { - if s.streamPool.HasActiveStream(nodeId) { - peerId = nodeId - return - } - } - err = fmt.Errorf("no responsible peers are connected") - return -} diff --git a/commonspace/objectsync/streampool.go b/commonspace/objectsync/streampool.go deleted file mode 100644 index 2948dee8..00000000 --- a/commonspace/objectsync/streampool.go +++ /dev/null @@ -1,332 +0,0 @@ -package objectsync - -import ( - "context" - "errors" - "fmt" - "github.com/anytypeio/any-sync/app/ocache" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "go.uber.org/zap" - "sync" - "sync/atomic" - "time" -) - -var ErrEmptyPeer = errors.New("don't have such a peer") -var ErrStreamClosed = errors.New("stream is already closed") - -var maxStreamReaders = 10 -var syncWaitPeriod = 2 * time.Second - -var ErrSyncTimeout = errors.New("too long wait on sync receive") - -// StreamPool can be made generic to work with different streams -type StreamPool interface { - ocache.ObjectLastUsage - AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) - AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) - - SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) - SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) - BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) - - HasActiveStream(peerId string) bool - Close() (err error) -} - -type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) - -type responseWaiter struct { - ch chan *spacesyncproto.ObjectSyncMessage -} - -type streamPool struct { - sync.Mutex - peerStreams map[string]spacesyncproto.ObjectSyncStream - messageHandler MessageHandler - wg *sync.WaitGroup - waiters map[string]responseWaiter - waitersMx sync.Mutex - counter atomic.Uint64 - lastUsage atomic.Int64 -} - -func newStreamPool(messageHandler MessageHandler) StreamPool { - s := &streamPool{ - peerStreams: make(map[string]spacesyncproto.ObjectSyncStream), - messageHandler: messageHandler, - waiters: make(map[string]responseWaiter), - wg: &sync.WaitGroup{}, - } - s.lastUsage.Store(time.Now().Unix()) - return s -} - -func (s *streamPool) LastUsage() time.Time { - return time.Unix(s.lastUsage.Load(), 0) -} - -func (s *streamPool) HasActiveStream(peerId string) (res bool) { - s.Lock() - defer s.Unlock() - _, err := s.getOrDeleteStream(peerId) - return err == nil -} - -func (s *streamPool) SendSync( - peerId string, - msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - newCounter := s.counter.Add(1) - msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter) - - s.waitersMx.Lock() - waiter := responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), - } - s.waiters[msg.ReplyId] = waiter - s.waitersMx.Unlock() - - err = s.SendAsync([]string{peerId}, msg) - if err != nil { - return - } - delay := time.NewTimer(syncWaitPeriod) - select { - case <-delay.C: - s.waitersMx.Lock() - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - - log.With(zap.String("replyId", msg.ReplyId)).Error("time elapsed when waiting") - err = ErrSyncTimeout - case reply = <-waiter.ch: - if !delay.Stop() { - <-delay.C - } - } - return -} - -func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { - s.lastUsage.Store(time.Now().Unix()) - getStreams := func() (streams []spacesyncproto.ObjectSyncStream) { - for _, pId := range peers { - stream, err := s.getOrDeleteStream(pId) - if err != nil { - continue - } - streams = append(streams, stream) - } - return streams - } - - s.Lock() - streams := getStreams() - s.Unlock() - - log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)). - Debug("sending message to peers") - for _, stream := range streams { - err = stream.Send(message) - if err != nil { - log.Debug("error sending message to stream", zap.Error(err)) - } - } - if len(peers) != 1 { - err = nil - } - return err -} - -func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) { - stream, exists := s.peerStreams[id] - if !exists { - err = ErrEmptyPeer - return - } - - select { - case <-stream.Context().Done(): - delete(s.peerStreams, id) - err = ErrStreamClosed - default: - } - - return -} - -func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) { - s.Lock() - defer s.Unlock() -Loop: - for id, stream := range s.peerStreams { - select { - case <-stream.Context().Done(): - delete(s.peerStreams, id) - continue Loop - default: - break - } - log.With(zap.String("id", id)).Debug("getting peer stream") - streams = append(streams, stream) - } - - return -} - -func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { - streams := s.getAllStreams() - log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))). - Debug("broadcasting message to peers") - for _, stream := range streams { - if err = stream.Send(message); err != nil { - log.Debug("error sending message to stream", zap.Error(err)) - } - } - - return nil -} - -func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) { - peerId, err := s.addStream(stream) - if err != nil { - return - } - go s.readPeerLoop(peerId, stream) - return -} - -func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) { - peerId, err := s.addStream(stream) - if err != nil { - return - } - return s.readPeerLoop(peerId, stream) -} - -func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) { - s.Lock() - peerId, err = peer.CtxPeerId(stream.Context()) - if err != nil { - s.Unlock() - return - } - log.With(zap.String("peer id", peerId)).Debug("adding stream") - - if oldStream, ok := s.peerStreams[peerId]; ok { - s.Unlock() - oldStream.Close() - s.Lock() - log.With(zap.String("peer id", peerId)).Debug("closed old stream before adding") - } - - s.peerStreams[peerId] = stream - s.wg.Add(1) - s.Unlock() - return -} - -func (s *streamPool) Close() (err error) { - s.Lock() - wg := s.wg - s.Unlock() - streams := s.getAllStreams() - - log.Debug("closing streams on lock") - for _, stream := range streams { - stream.Close() - } - log.Debug("closed streams") - - if wg != nil { - wg.Wait() - } - return nil -} - -func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - var ( - log = log.With(zap.String("peerId", peerId)) - queue = NewDefaultActionQueue() - ) - queue.Run() - - defer func() { - log.Debug("stopped reading stream from peer") - s.removePeer(peerId, stream) - queue.Close() - s.wg.Done() - }() - - log.Debug("started reading stream from peer") - - stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool { - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg - return true - } - s.waitersMx.Unlock() - return false - } - - process := func(msg *spacesyncproto.ObjectSyncMessage) error { - log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) - log.Debug("getting message with reply id") - err = s.messageHandler(stream.Context(), peerId, msg) - if err != nil { - log.With(zap.Error(err)).Debug("message handling failed") - } - return nil - } - - for { - select { - case <-stream.Context().Done(): - return - default: - } - var msg *spacesyncproto.ObjectSyncMessage - msg, err = stream.Recv() - s.lastUsage.Store(time.Now().Unix()) - if err != nil { - stream.Close() - return - } - - if msg.ReplyId != "" { - // then we can send it directly to waiters without adding to queue or starting a reader - if stopWaiter(msg) { - continue - } - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") - } - - queue.Send(func() error { - return process(msg) - }) - } -} - -func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - s.Lock() - defer s.Unlock() - mapStream, ok := s.peerStreams[peerId] - if !ok { - return ErrEmptyPeer - } - - // it can be the case that the stream was already replaced - if mapStream == stream { - delete(s.peerStreams, peerId) - } - return -} - -func genStreamPoolKey(peerId, treeId string, counter uint64) string { - return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter) -} diff --git a/commonspace/objectsync/streampool_test.go b/commonspace/objectsync/streampool_test.go deleted file mode 100644 index 056d11de..00000000 --- a/commonspace/objectsync/streampool_test.go +++ /dev/null @@ -1,299 +0,0 @@ -package objectsync - -import ( - "context" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/rpc/rpctest" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -type testServer struct { - stream chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - releaseStream chan error - watchErrOnce bool -} - -func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - panic("implement me") -} - -func (t *testServer) SpacePush(ctx context.Context, request *spacesyncproto.SpacePushRequest) (*spacesyncproto.SpacePushResponse, error) { - panic("implement me") -} - -func (t *testServer) SpacePull(ctx context.Context, request *spacesyncproto.SpacePullRequest) (*spacesyncproto.SpacePullResponse, error) { - panic("implement me") -} - -func (t *testServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error { - t.stream <- stream - return <-t.releaseStream -} - -func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream { - select { - case <-time.After(time.Second * 5): - test.Fatalf("waiteStream timeout") - case st := <-t.stream: - return st - } - return nil -} - -type fixture struct { - testServer *testServer - drpcTS *rpctest.TesServer - client spacesyncproto.DRPCSpaceSyncClient - clientStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - serverStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - pool *streamPool - clientId string - serverId string -} - -func newFixture(t *testing.T, clientId, serverId string, handler MessageHandler) *fixture { - fx := &fixture{ - testServer: &testServer{}, - drpcTS: rpctest.NewTestServer(), - clientId: clientId, - serverId: serverId, - } - fx.testServer.stream = make(chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream, 1) - require.NoError(t, spacesyncproto.DRPCRegisterSpaceSync(fx.drpcTS.Mux, fx.testServer)) - fx.client = spacesyncproto.NewDRPCSpaceSyncClient(fx.drpcTS.Dial(peer.CtxWithPeerId(context.Background(), clientId))) - - var err error - fx.clientStream, err = fx.client.ObjectSyncStream(peer.CtxWithPeerId(context.Background(), serverId)) - require.NoError(t, err) - fx.serverStream = fx.testServer.waitStream(t) - fx.pool = newStreamPool(handler).(*streamPool) - - return fx -} - -func (fx *fixture) run(t *testing.T) chan error { - waitCh := make(chan error) - go func() { - err := fx.pool.AddAndReadStreamSync(fx.clientStream) - waitCh <- err - }() - - time.Sleep(time.Millisecond * 10) - fx.pool.Lock() - require.Equal(t, fx.pool.peerStreams[fx.serverId], fx.clientStream) - fx.pool.Unlock() - - return waitCh -} - -func TestStreamPool_AddAndReadStreamAsync(t *testing.T) { - remId := "remoteId" - - t.Run("client close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - t.Run("server close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - - err := fx.serverStream.Close() - require.NoError(t, err) - - err = <-waitCh - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_Close(t *testing.T) { - remId := "remoteId" - - t.Run("close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - fx.run(t) - fx.pool.Close() - select { - case <-fx.clientStream.Context().Done(): - break - case <-time.After(time.Millisecond * 100): - t.Fatal("context should be closed") - } - }) -} - -func TestStreamPool_ReceiveMessage(t *testing.T) { - remId := "remoteId" - t.Run("pool receive message from server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - recvChan := make(chan struct{}) - fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - require.Equal(t, msg, message) - recvChan <- struct{}{} - return nil - }) - waitCh := fx.run(t) - - err := fx.serverStream.Send(msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_HasActiveStream(t *testing.T) { - remId := "remoteId" - t.Run("pool has active stream", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - require.True(t, fx.pool.HasActiveStream(remId)) - - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - t.Run("pool has no active stream", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - require.Error(t, err) - require.False(t, fx.pool.HasActiveStream(remId)) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_SendAsync(t *testing.T) { - remId := "remoteId" - t.Run("pool send async to server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - recvChan := make(chan struct{}) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg, message) - recvChan <- struct{}{} - }() - waitCh := fx.run(t) - - err := fx.pool.SendAsync([]string{remId}, msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_SendSync(t *testing.T) { - remId := "remoteId" - t.Run("pool send sync to server", func(t *testing.T) { - objectId := "objectId" - payload := []byte("payload") - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg.ObjectId, message.ObjectId) - require.NotEmpty(t, message.ReplyId) - message.Payload = payload - err = fx.serverStream.Send(message) - require.NoError(t, err) - }() - waitCh := fx.run(t) - res, err := fx.pool.SendSync(remId, msg) - require.NoError(t, err) - require.Equal(t, payload, res.Payload) - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - - t.Run("pool send sync timeout", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - syncWaitPeriod = time.Millisecond * 30 - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg.ObjectId, message.ObjectId) - require.NotEmpty(t, message.ReplyId) - }() - waitCh := fx.run(t) - _, err := fx.pool.SendSync(remId, msg) - require.Equal(t, ErrSyncTimeout, err) - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_BroadcastAsync(t *testing.T) { - remId := "remoteId" - t.Run("pool broadcast async to server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - recvChan := make(chan struct{}) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg, message) - recvChan <- struct{}{} - }() - waitCh := fx.run(t) - - err := fx.pool.BroadcastAsync(msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} diff --git a/commonspace/rpchandler.go b/commonspace/rpchandler.go deleted file mode 100644 index 4bb33cd9..00000000 --- a/commonspace/rpchandler.go +++ /dev/null @@ -1,24 +0,0 @@ -package commonspace - -import ( - "context" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" -) - -type RpcHandler interface { - HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) - Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error -} - -type rpcHandler struct { - s *space -} - -func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - return r.s.HeadSync().HandleRangeRequest(ctx, req) -} - -func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) (err error) { - // TODO: if needed we can launch full sync here - return r.s.ObjectSync().StreamPool().AddAndReadStreamSync(stream) -} diff --git a/commonspace/space.go b/commonspace/space.go index 0cde3a0a..9dda2ece 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -14,6 +14,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" + "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" @@ -80,16 +81,14 @@ type Space interface { DebugAllHeads() []headsync.TreeHeads Description() (SpaceDescription, error) - SpaceSyncRpc() RpcHandler - DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) DeleteTree(ctx context.Context, id string) (err error) - BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) HeadSync() headsync.HeadSync + ObjectSync() objectsync.ObjectSync SyncStatus() syncstatus.StatusUpdater Storage() spacestorage.SpaceStorage @@ -101,13 +100,11 @@ type space struct { mu sync.RWMutex header *spacesyncproto.RawSpaceHeaderWithId - rpc *rpcHandler - objectSync objectsync.ObjectSync headSync headsync.HeadSync syncStatus syncstatus.StatusUpdater storage spacestorage.SpaceStorage - cache *commonGetter + cache treegetter.TreeGetter account accountservice.Service aclList *syncacl.SyncAcl configuration nodeconf.Configuration @@ -161,7 +158,6 @@ func (s *space) Init(ctx context.Context) (err error) { return } s.header = header - s.rpc = &rpcHandler{s: s} initialIds, err := s.storage.StoredIds() if err != nil { return @@ -174,8 +170,7 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.StreamPool()) - s.cache.AddObject(s.aclList) + s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) deletionState := deletionstate.NewDeletionState(s.storage) deps := settings.Deps{ @@ -196,8 +191,9 @@ func (s *space) Init(ctx context.Context) (err error) { DeletionState: deletionState, } s.settingsObject = settings.NewSettingsObject(deps, s.id) - s.cache.AddObject(s.settingsObject) - s.objectSync.Init() + + objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject) + s.objectSync.Init(objectGetter) s.headSync.Init(initialIds, deletionState) err = s.settingsObject.Init(ctx) if err != nil { @@ -208,10 +204,6 @@ func (s *space) Init(ctx context.Context) (err error) { return nil } -func (s *space) SpaceSyncRpc() RpcHandler { - return s.rpc -} - func (s *space) ObjectSync() objectsync.ObjectSync { return s.objectSync } @@ -297,11 +289,6 @@ type BuildTreeOpts struct { WaitTreeRemoteSync bool } -type HistoryTreeOpts struct { - BeforeId string - Include bool -} - func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { if s.isClosed.Load() { err = ErrSpaceClosed @@ -323,24 +310,6 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } -func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { - if s.isClosed.Load() { - err = ErrSpaceClosed - return - } - - params := objecttree.HistoryTreeParams{ - AclList: s.aclList, - BeforeId: opts.BeforeId, - IncludeBeforeId: opts.Include, - } - params.TreeStorage, err = s.storage.TreeStorage(id) - if err != nil { - return - } - return objecttree.BuildHistoryTree(params) -} - func (s *space) DeleteTree(ctx context.Context, id string) (err error) { return s.settingsObject.DeleteObject(id) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 7a1bbc0c..52e44a4a 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -13,6 +13,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/commonspace/streammanager" "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" @@ -39,12 +40,13 @@ type SpaceService interface { } type spaceService struct { - config Config - account accountservice.Service - configurationService nodeconf.Service - storageProvider spacestorage.SpaceStorageProvider - treeGetter treegetter.TreeGetter - pool pool.Pool + config Config + account accountservice.Service + configurationService nodeconf.Service + storageProvider spacestorage.SpaceStorageProvider + streamManagerProvider streammanager.StreamManagerProvider + treeGetter treegetter.TreeGetter + pool pool.Pool } func (s *spaceService) Init(a *app.App) (err error) { @@ -53,6 +55,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) + s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider) s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -115,7 +118,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) - getter := newCommonGetter(st.Id(), s.treeGetter) + syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... if !lastConfiguration.IsResponsible(st.Id()) { @@ -123,14 +126,21 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) } - headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log) - objectSync := objectsync.NewObjectSync(id, confConnector, getter) + // TODO: [che] remove *5 + headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log) + + streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) + if err != nil { + return nil, err + } + + objectSync := objectsync.NewObjectSync(streamManager, id) sp := &space{ id: id, objectSync: objectSync, headSync: headSync, syncStatus: syncStatus, - cache: getter, + cache: s.treeGetter, account: s.account, configuration: lastConfiguration, storage: st, diff --git a/commonspace/streammanager/streammanager.go b/commonspace/streammanager/streammanager.go new file mode 100644 index 00000000..5fb15b4c --- /dev/null +++ b/commonspace/streammanager/streammanager.go @@ -0,0 +1,14 @@ +package streammanager + +import ( + "context" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/commonspace/objectsync" +) + +const CName = "common.commonspace.streammanager" + +type StreamManagerProvider interface { + app.Component + NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 4b129949..41e9307a 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -1,6 +1,7 @@ package streampool import ( + "fmt" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -17,6 +18,9 @@ type stream struct { } func (sr *stream) write(msg drpc.Message) (err error) { + defer func() { + sr.l.Debug("write", zap.String("msg", msg.(fmt.Stringer).String()), zap.Error(err)) + }() if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { sr.l.Info("stream write error", zap.Error(err)) sr.streamClose() @@ -24,7 +28,7 @@ func (sr *stream) write(msg drpc.Message) (err error) { return err } -func (sr *stream) readLoop() { +func (sr *stream) readLoop() error { defer func() { sr.streamClose() }() @@ -32,7 +36,12 @@ func (sr *stream) readLoop() { msg := sr.pool.handler.NewReadMessage() if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { sr.l.Info("msg receive error", zap.Error(err)) - return + return err + } + sr.l.Debug("read msg", zap.String("msg", msg.(fmt.Stringer).String())) + if err := sr.pool.handler.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil { + sr.l.Info("msg handle error", zap.Error(err)) + return err } } } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 572c29c3..5957e056 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -1,7 +1,9 @@ package streampool import ( + "fmt" "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/pool" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -21,10 +23,14 @@ type StreamHandler interface { // StreamPool keeps and read streams type StreamPool interface { - // AddStream adds new incoming stream into the pool + // AddStream adds new outgoing stream into the pool AddStream(peerId string, stream drpc.Stream, tags ...string) + // ReadStream adds new incoming stream and synchronously read it + ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) + // SendById sends a message to given peerIds. Works only if stream exists + SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) // Broadcast sends a message to all peers with given tags. Works async. Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) // Close closes all streams @@ -42,7 +48,19 @@ type streamPool struct { lastStreamId uint32 } +func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { + st := s.addStream(peerId, drpcStream, tags...) + return st.readLoop() +} + func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { + st := s.addStream(peerId, drpcStream, tags...) + go func() { + _ = st.readLoop() + }() +} + +func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { s.mu.Lock() defer s.mu.Unlock() s.lastStreamId++ @@ -60,7 +78,8 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } - go st.readLoop() + st.l.Debug("stream added", zap.Strings("tags", st.tags)) + return st } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { @@ -75,6 +94,30 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P return s.exec.Add(ctx, funcs...) } +func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) { + s.mu.Lock() + var streams []*stream + for _, peerId := range peerIds { + for _, streamId := range s.streamIdsByPeer[peerId] { + streams = append(streams, s.streams[streamId]) + } + } + s.mu.Unlock() + log.Debug("sendById", zap.String("msg", msg.(fmt.Stringer).String()), zap.Int("streams", len(streams))) + var funcs []func() + for _, st := range streams { + funcs = append(funcs, func() { + if e := st.write(msg); e != nil { + log.Debug("sendById write error", zap.Error(e)) + } + }) + } + if len(funcs) == 0 { + return pool.ErrUnableToConnect + } + return s.exec.Add(ctx, funcs...) +} + func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) { // get all streams relates to peer streams, err := s.getStreams(ctx, p) @@ -164,6 +207,9 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } }) } + if len(funcs) == 0 { + return + } return s.exec.Add(ctx, funcs...) } @@ -195,6 +241,7 @@ func (s *streamPool) removeStream(streamId uint32) { } delete(s.streams, streamId) + st.l.Debug("stream removed", zap.Strings("tags", st.tags)) } func (s *streamPool) Close() (err error) {