diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index dbc5b7aa..73698856 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -19,5 +19,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { - return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream) + return r.s.SyncService().SyncClient().AddAndReadStreamSync(stream) } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 46099da3..8111895a 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -90,11 +90,11 @@ func (s *space) DiffService() diffservice.DiffService { } func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) { - return synctree.DeriveSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage) + return synctree.DeriveSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage) } func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) { - return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage) + return synctree.CreateSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage) } func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) { @@ -104,10 +104,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene if err != nil { return nil, err } - - return s.syncService.StreamPool().SendSync( + return s.syncService.SyncClient().SendSync( peerId, - spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, ""), + s.syncService.SyncClient().CreateNewTreeRequest(id), ) } @@ -142,7 +141,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene return } } - return synctree.BuildSyncTree(ctx, s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) + return synctree.BuildSyncTree(ctx, s.syncService.SyncClient(), store.(treestorage.TreeStorage), listener, s.aclList) } func (s *space) Close() error { diff --git a/common/commonspace/syncservice/mock_syncservice/mock_syncservice.go b/common/commonspace/syncservice/mock_syncservice/mock_syncservice.go index 8438ab64..6f1a272b 100644 --- a/common/commonspace/syncservice/mock_syncservice/mock_syncservice.go +++ b/common/commonspace/syncservice/mock_syncservice/mock_syncservice.go @@ -8,6 +8,8 @@ import ( reflect "reflect" spacesyncproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + tree "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" + treechangeproto "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" gomock "github.com/golang/mock/gomock" ) @@ -34,6 +36,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder { return m.recorder } +// AddAndReadStreamAsync mocks base method. +func (m *MockSyncClient) AddAndReadStreamAsync(arg0 spacesyncproto.DRPCSpace_StreamStream) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddAndReadStreamAsync", arg0) +} + +// AddAndReadStreamAsync indicates an expected call of AddAndReadStreamAsync. +func (mr *MockSyncClientMockRecorder) AddAndReadStreamAsync(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamAsync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamAsync), arg0) +} + +// AddAndReadStreamSync mocks base method. +func (m *MockSyncClient) AddAndReadStreamSync(arg0 spacesyncproto.DRPCSpace_StreamStream) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddAndReadStreamSync", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddAndReadStreamSync indicates an expected call of AddAndReadStreamSync. +func (mr *MockSyncClientMockRecorder) AddAndReadStreamSync(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamSync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamSync), arg0) +} + // BroadcastAsync mocks base method. func (m *MockSyncClient) BroadcastAsync(arg0 *spacesyncproto.ObjectSyncMessage) error { m.ctrl.T.Helper() @@ -48,8 +76,108 @@ func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0) } +// BroadcastAsyncOrSendResponsible mocks base method. +func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *spacesyncproto.ObjectSyncMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. +func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0) +} + +// Close mocks base method. +func (m *MockSyncClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSyncClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncClient)(nil).Close)) +} + +// CreateFullSyncRequest mocks base method. +func (m *MockSyncClient) CreateFullSyncRequest(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateFullSyncRequest", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateFullSyncRequest indicates an expected call of CreateFullSyncRequest. +func (mr *MockSyncClientMockRecorder) CreateFullSyncRequest(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncRequest), arg0, arg1, arg2, arg3) +} + +// CreateFullSyncResponse mocks base method. +func (m *MockSyncClient) CreateFullSyncResponse(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateFullSyncResponse", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateFullSyncResponse indicates an expected call of CreateFullSyncResponse. +func (mr *MockSyncClientMockRecorder) CreateFullSyncResponse(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncResponse", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncResponse), arg0, arg1, arg2, arg3) +} + +// CreateHeadUpdate mocks base method. +func (m *MockSyncClient) CreateHeadUpdate(arg0 tree.ObjectTree, arg1 []*treechangeproto.RawTreeChangeWithId) *spacesyncproto.ObjectSyncMessage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateHeadUpdate", arg0, arg1) + ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage) + return ret0 +} + +// CreateHeadUpdate indicates an expected call of CreateHeadUpdate. +func (mr *MockSyncClientMockRecorder) CreateHeadUpdate(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHeadUpdate", reflect.TypeOf((*MockSyncClient)(nil).CreateHeadUpdate), arg0, arg1) +} + +// CreateNewTreeRequest mocks base method. +func (m *MockSyncClient) CreateNewTreeRequest(arg0 string) *spacesyncproto.ObjectSyncMessage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateNewTreeRequest", arg0) + ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage) + return ret0 +} + +// CreateNewTreeRequest indicates an expected call of CreateNewTreeRequest. +func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest), arg0) +} + +// HasActiveStream mocks base method. +func (m *MockSyncClient) HasActiveStream(arg0 string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasActiveStream", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// HasActiveStream indicates an expected call of HasActiveStream. +func (mr *MockSyncClientMockRecorder) HasActiveStream(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasActiveStream", reflect.TypeOf((*MockSyncClient)(nil).HasActiveStream), arg0) +} + // SendAsync mocks base method. -func (m *MockSyncClient) SendAsync(arg0 string, arg1 *spacesyncproto.ObjectSyncMessage) error { +func (m *MockSyncClient) SendAsync(arg0 []string, arg1 *spacesyncproto.ObjectSyncMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendAsync", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/common/commonspace/syncservice/requestfactory.go b/common/commonspace/syncservice/requestfactory.go index ab19bc6a..1237653b 100644 --- a/common/commonspace/syncservice/requestfactory.go +++ b/common/commonspace/syncservice/requestfactory.go @@ -1,6 +1,7 @@ package syncservice import ( + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" @@ -8,8 +9,10 @@ import ( ) type RequestFactory interface { - FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error) - FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error) + CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage) + CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage) + CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error) + CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error) } func newRequestFactory() RequestFactory { @@ -18,11 +21,22 @@ func newRequestFactory() RequestFactory { type requestFactory struct{} -func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { +func (r *requestFactory) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage) { + return spacesyncproto.WrapHeadUpdate(&spacesyncproto.ObjectHeadUpdate{ + Heads: t.Heads(), + Changes: added, + SnapshotPath: t.SnapshotPath(), + }, t.Header(), t.ID(), "") +} + +func (r *requestFactory) CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage) { + return spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, "") +} + +func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { req := &spacesyncproto.ObjectFullSyncRequest{} if t == nil { - msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId) - return + return nil, fmt.Errorf("tree should not be empty") } req.Heads = t.Heads() @@ -39,7 +53,7 @@ func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSna return } -func (r *requestFactory) FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { +func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { resp := &spacesyncproto.ObjectFullSyncResponse{ Heads: t.Heads(), SnapshotPath: t.SnapshotPath(), diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 5eb97170..bf457374 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -18,16 +18,16 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { - SyncClient + Sender AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) HasActiveStream(peerId string) bool Close() (err error) } -type SyncClient interface { +type Sender interface { SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) - SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) + SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) } @@ -56,6 +56,8 @@ func newStreamPool(messageHandler MessageHandler) StreamPool { } func (s *streamPool) HasActiveStream(peerId string) (res bool) { + s.Lock() + defer s.Unlock() _, err := s.getOrDeleteStream(peerId) return err == nil } @@ -73,7 +75,7 @@ func (s *streamPool) SendSync( s.waiters[msg.TrackingId] = waiter s.waitersMx.Unlock() - err = s.SendAsync(peerId, msg) + err = s.SendAsync([]string{peerId}, msg) if err != nil { return } @@ -82,18 +84,31 @@ func (s *streamPool) SendSync( return } -func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - stream, err := s.getOrDeleteStream(peerId) - if err != nil { - return +func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { + getStreams := func() (streams []spacesyncproto.SpaceStream) { + for _, pId := range peers { + stream, err := s.getOrDeleteStream(pId) + if err != nil { + continue + } + streams = append(streams, stream) + } + return streams } - return stream.Send(message) + s.Lock() + streams := getStreams() + s.Unlock() + + for _, s := range streams { + if len(peers) == 1 { + err = s.Send(message) + } + } + return err } func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) { - s.Lock() - defer s.Unlock() stream, exists := s.peerStreams[id] if !exists { err = ErrEmptyPeer diff --git a/common/commonspace/syncservice/syncclient.go b/common/commonspace/syncservice/syncclient.go new file mode 100644 index 00000000..12c25164 --- /dev/null +++ b/common/commonspace/syncservice/syncclient.go @@ -0,0 +1,49 @@ +package syncservice + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" +) + +type SyncClient interface { + StreamPool + RequestFactory + BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) +} + +type syncClient struct { + StreamPool + RequestFactory + spaceId string + notifiable HeadNotifiable + configuration nodeconf.Configuration +} + +func newSyncClient(spaceId string, pool StreamPool, notifiable HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient { + return &syncClient{ + StreamPool: pool, + RequestFactory: factory, + notifiable: notifiable, + configuration: configuration, + spaceId: spaceId, + } +} + +func (s *syncClient) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { + s.notifyIfNeeded(message) + return s.BroadcastAsync(message) +} + +func (s *syncClient) BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) { + if s.configuration.IsResponsible(s.spaceId) { + return s.SendAsync(s.configuration.NodeIds(s.spaceId), message) + } + return s.BroadcastAsync(message) +} + +func (s *syncClient) notifyIfNeeded(message *spacesyncproto.ObjectSyncMessage) { + if message.GetContent().GetHeadUpdate() != nil { + update := message.GetContent().GetHeadUpdate() + s.notifiable.UpdateHeads(message.TreeId, update.Heads) + } +} diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index 16ce2177..d0edf03c 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -1,4 +1,3 @@ -//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient package syncservice import ( @@ -13,19 +12,17 @@ type syncHandler struct { spaceId string treeCache cache.TreeCache syncClient SyncClient - factory RequestFactory } type SyncHandler interface { HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) } -func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient, factory RequestFactory) *syncHandler { +func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { return &syncHandler{ spaceId: spaceId, treeCache: treeCache, syncClient: syncClient, - factory: factory, } } @@ -48,7 +45,10 @@ func (s *syncHandler) handleHeadUpdate( update *spacesyncproto.ObjectHeadUpdate, msg *spacesyncproto.ObjectSyncMessage) (err error) { - var fullRequest *spacesyncproto.ObjectSyncMessage + var ( + fullRequest *spacesyncproto.ObjectSyncMessage + isEmptyUpdate = len(update.Changes) == 0 + ) res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) if err != nil { return @@ -60,7 +60,14 @@ func (s *syncHandler) handleHeadUpdate( defer res.Release() defer objTree.Unlock() - if s.alreadyHaveHeads(objTree, update.Heads) { + // isEmptyUpdate is sent when the tree is brought up from cache + if isEmptyUpdate { + // we need to sync in any case + fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId) + return err + } + + if s.alreadyHasHeads(objTree, update.Heads) { return nil } @@ -69,19 +76,16 @@ func (s *syncHandler) handleHeadUpdate( return err } - if s.alreadyHaveHeads(objTree, update.Heads) { + if s.alreadyHasHeads(objTree, update.Heads) { return nil } - fullRequest, err = s.factory.FullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId) - if err != nil { - return err - } - return nil + fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId) + return err }() if fullRequest != nil { - return s.syncClient.SendAsync(senderId, fullRequest) + return s.syncClient.SendAsync([]string{senderId}, fullRequest) } return } @@ -97,7 +101,7 @@ func (s *syncHandler) handleFullSyncRequest( ) defer func() { if err != nil { - s.syncClient.SendAsync(senderId, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId)) + s.syncClient.SendAsync([]string{senderId}, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId)) } }() @@ -116,21 +120,21 @@ func (s *syncHandler) handleFullSyncRequest( header = objTree.Header() } - if !s.alreadyHaveHeads(objTree, request.Heads) { + if !s.alreadyHasHeads(objTree, request.Heads) { _, err = objTree.AddRawChanges(ctx, request.Changes...) if err != nil { return err } } - fullResponse, err = s.factory.FullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId) + fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId) return err }() if err != nil { return } - return s.syncClient.SendAsync(senderId, fullResponse) + return s.syncClient.SendAsync([]string{senderId}, fullResponse) } func (s *syncHandler) handleFullSyncResponse( @@ -149,7 +153,7 @@ func (s *syncHandler) handleFullSyncResponse( defer res.Release() defer objTree.Unlock() - if s.alreadyHaveHeads(objTree, response.Heads) { + if s.alreadyHasHeads(objTree, response.Heads) { return nil } @@ -160,6 +164,6 @@ func (s *syncHandler) handleFullSyncResponse( return } -func (s *syncHandler) alreadyHaveHeads(t tree.ObjectTree, heads []string) bool { +func (s *syncHandler) alreadyHasHeads(t tree.ObjectTree, heads []string) bool { return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...) } diff --git a/common/commonspace/syncservice/synchandler_test.go b/common/commonspace/syncservice/synchandler_test.go index dc145017..e8fa2b63 100644 --- a/common/commonspace/syncservice/synchandler_test.go +++ b/common/commonspace/syncservice/synchandler_test.go @@ -48,9 +48,12 @@ func TestSyncHandler_HandleMessage(t *testing.T) { Release: func() {}, TreeContainer: treeContainer{objectTreeMock}, }, nil) - objectTreeMock.EXPECT().Lock() - objectTreeMock.EXPECT().Heads().Return([]string{"h2"}) - objectTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})). + objectTreeMock.EXPECT(). + Lock() + objectTreeMock.EXPECT(). + Heads().Return([]string{"h2"}) + objectTreeMock.EXPECT(). + AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})). Return(tree.AddResult{}, nil) objectTreeMock.EXPECT().Unlock() err := syncHandler.HandleMessage(ctx, senderId, msg) diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index c0a461e1..d065832a 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient package syncservice import ( @@ -7,19 +8,13 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" "time" ) var log = logger.NewNamed("syncservice").Sugar() type SyncService interface { - NotifyHeadUpdate( - ctx context.Context, - treeId string, - root *treechangeproto.RawTreeChangeWithId, - update *spacesyncproto.ObjectHeadUpdate) (err error) - StreamPool() StreamPool + SyncClient() SyncClient Init() Close() (err error) @@ -34,11 +29,9 @@ const respPeersStreamCheckInterval = time.Second * 10 type syncService struct { spaceId string - syncHandler SyncHandler - streamPool StreamPool - headNotifiable HeadNotifiable - configuration nodeconf.Configuration - clientFactory spacesyncproto.ClientFactory + syncClient SyncClient + configuration nodeconf.Configuration + clientFactory spacesyncproto.ClientFactory streamLoopCtx context.Context stopStreamLoop context.CancelFunc @@ -47,30 +40,26 @@ type syncService struct { func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService { var syncHandler SyncHandler - streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + pool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return syncHandler.HandleMessage(ctx, senderId, message) }) - syncHandler = newSyncHandler(spaceId, cache, streamPool, newRequestFactory()) + factory := newRequestFactory() + syncClient := newSyncClient(spaceId, pool, headNotifiable, factory, configuration) + syncHandler = newSyncHandler(spaceId, cache, syncClient) return newSyncService( spaceId, - headNotifiable, - syncHandler, - streamPool, + syncClient, spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient), configuration) } func newSyncService( spaceId string, - headNotifiable HeadNotifiable, - syncHandler SyncHandler, - streamPool StreamPool, + syncClient SyncClient, clientFactory spacesyncproto.ClientFactory, configuration nodeconf.Configuration) *syncService { return &syncService{ - syncHandler: syncHandler, - streamPool: streamPool, - headNotifiable: headNotifiable, + syncClient: syncClient, configuration: configuration, clientFactory: clientFactory, spaceId: spaceId, @@ -86,16 +75,7 @@ func (s *syncService) Init() { func (s *syncService) Close() (err error) { s.stopStreamLoop() <-s.streamLoopDone - return s.streamPool.Close() -} - -func (s *syncService) NotifyHeadUpdate( - ctx context.Context, - treeId string, - header *treechangeproto.RawTreeChangeWithId, - update *spacesyncproto.ObjectHeadUpdate) (err error) { - s.headNotifiable.UpdateHeads(treeId, update.Heads) - return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId, "")) + return s.syncClient.Close() } func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { @@ -106,7 +86,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { return } for _, peer := range respPeers { - if s.streamPool.HasActiveStream(peer.Id()) { + if s.syncClient.HasActiveStream(peer.Id()) { continue } stream, err := s.clientFactory.Client(peer).Stream(ctx) @@ -124,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) continue } - s.streamPool.AddAndReadStreamAsync(stream) + s.syncClient.AddAndReadStreamAsync(stream) } } @@ -141,6 +121,6 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { } } -func (s *syncService) StreamPool() StreamPool { - return s.streamPool +func (s *syncService) SyncClient() SyncClient { + return s.syncClient } diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index f3075825..99c6014d 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -2,7 +2,6 @@ package synctree import ( "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" @@ -14,14 +13,14 @@ import ( // SyncTree sends head updates to sync service and also sends new changes to update listener type SyncTree struct { tree.ObjectTree - syncService syncservice.SyncService - listener updatelistener.UpdateListener + syncClient syncservice.SyncClient + listener updatelistener.UpdateListener } func DeriveSyncTree( ctx context.Context, payload tree.ObjectTreeCreatePayload, - syncService syncservice.SyncService, + syncClient syncservice.SyncClient, listener updatelistener.UpdateListener, aclList list.ACLList, createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { @@ -30,22 +29,20 @@ func DeriveSyncTree( return } t = &SyncTree{ - ObjectTree: t, - syncService: syncService, - listener: listener, + ObjectTree: t, + syncClient: syncClient, + listener: listener, } - err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ - Heads: t.Heads(), - SnapshotPath: t.SnapshotPath(), - }) + headUpdate := syncClient.CreateHeadUpdate(t, nil) + err = syncClient.BroadcastAsync(headUpdate) return } func CreateSyncTree( ctx context.Context, payload tree.ObjectTreeCreatePayload, - syncService syncservice.SyncService, + syncClient syncservice.SyncClient, listener updatelistener.UpdateListener, aclList list.ACLList, createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { @@ -54,30 +51,28 @@ func CreateSyncTree( return } t = &SyncTree{ - ObjectTree: t, - syncService: syncService, - listener: listener, + ObjectTree: t, + syncClient: syncClient, + listener: listener, } - err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ - Heads: t.Heads(), - SnapshotPath: t.SnapshotPath(), - }) + headUpdate := syncClient.CreateHeadUpdate(t, nil) + err = syncClient.BroadcastAsync(headUpdate) return } func BuildSyncTree( ctx context.Context, - syncService syncservice.SyncService, + syncClient syncservice.SyncClient, treeStorage storage.TreeStorage, listener updatelistener.UpdateListener, aclList list.ACLList) (t tree.ObjectTree, err error) { - return buildSyncTree(ctx, syncService, treeStorage, listener, aclList) + return buildSyncTree(ctx, syncClient, treeStorage, listener, aclList) } func buildSyncTree( ctx context.Context, - syncService syncservice.SyncService, + syncClient syncservice.SyncClient, treeStorage storage.TreeStorage, listener updatelistener.UpdateListener, aclList list.ACLList) (t tree.ObjectTree, err error) { @@ -86,15 +81,14 @@ func buildSyncTree( return } t = &SyncTree{ - ObjectTree: t, - syncService: syncService, - listener: listener, + ObjectTree: t, + syncClient: syncClient, + listener: listener, } - err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ - Heads: t.Heads(), - SnapshotPath: t.SnapshotPath(), - }) + headUpdate := syncClient.CreateHeadUpdate(t, nil) + // here we will have different behaviour based on who is sending this update + err = syncClient.BroadcastAsyncOrSendResponsible(headUpdate) return } @@ -103,11 +97,8 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo if err != nil { return } - err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ - Heads: res.Heads, - Changes: res.Added, - SnapshotPath: s.SnapshotPath(), - }) + headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) + err = s.syncClient.BroadcastAsync(headUpdate) return } @@ -125,11 +116,8 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot s.listener.Rebuild(s) } - err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ - Heads: res.Heads, - Changes: res.Added, - SnapshotPath: s.SnapshotPath(), - }) + headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) + err = s.syncClient.BroadcastAsync(headUpdate) return }