Further updates to sync logic

This commit is contained in:
mcrakhman 2022-10-02 16:11:47 +02:00
parent 20234269e9
commit 274573df28
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
10 changed files with 303 additions and 123 deletions

View File

@ -19,5 +19,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
}
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
return r.s.SyncService().SyncClient().AddAndReadStreamSync(stream)
}

View File

@ -90,11 +90,11 @@ func (s *space) DiffService() diffservice.DiffService {
}
func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) {
return synctree.DeriveSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage)
return synctree.DeriveSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage)
}
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) {
return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage)
return synctree.CreateSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage)
}
func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) {
@ -104,10 +104,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
if err != nil {
return nil, err
}
return s.syncService.StreamPool().SendSync(
return s.syncService.SyncClient().SendSync(
peerId,
spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, ""),
s.syncService.SyncClient().CreateNewTreeRequest(id),
)
}
@ -142,7 +141,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return
}
}
return synctree.BuildSyncTree(ctx, s.syncService, store.(treestorage.TreeStorage), listener, s.aclList)
return synctree.BuildSyncTree(ctx, s.syncService.SyncClient(), store.(treestorage.TreeStorage), listener, s.aclList)
}
func (s *space) Close() error {

View File

@ -8,6 +8,8 @@ import (
reflect "reflect"
spacesyncproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
treechangeproto "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
gomock "github.com/golang/mock/gomock"
)
@ -34,6 +36,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
return m.recorder
}
// AddAndReadStreamAsync mocks base method.
func (m *MockSyncClient) AddAndReadStreamAsync(arg0 spacesyncproto.DRPCSpace_StreamStream) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "AddAndReadStreamAsync", arg0)
}
// AddAndReadStreamAsync indicates an expected call of AddAndReadStreamAsync.
func (mr *MockSyncClientMockRecorder) AddAndReadStreamAsync(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamAsync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamAsync), arg0)
}
// AddAndReadStreamSync mocks base method.
func (m *MockSyncClient) AddAndReadStreamSync(arg0 spacesyncproto.DRPCSpace_StreamStream) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddAndReadStreamSync", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddAndReadStreamSync indicates an expected call of AddAndReadStreamSync.
func (mr *MockSyncClientMockRecorder) AddAndReadStreamSync(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamSync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamSync), arg0)
}
// BroadcastAsync mocks base method.
func (m *MockSyncClient) BroadcastAsync(arg0 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
@ -48,8 +76,108 @@ func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0)
}
// BroadcastAsyncOrSendResponsible mocks base method.
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0)
}
// Close mocks base method.
func (m *MockSyncClient) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockSyncClientMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncClient)(nil).Close))
}
// CreateFullSyncRequest mocks base method.
func (m *MockSyncClient) CreateFullSyncRequest(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncRequest", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncRequest indicates an expected call of CreateFullSyncRequest.
func (mr *MockSyncClientMockRecorder) CreateFullSyncRequest(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncRequest), arg0, arg1, arg2, arg3)
}
// CreateFullSyncResponse mocks base method.
func (m *MockSyncClient) CreateFullSyncResponse(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncResponse", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncResponse indicates an expected call of CreateFullSyncResponse.
func (mr *MockSyncClientMockRecorder) CreateFullSyncResponse(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncResponse", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncResponse), arg0, arg1, arg2, arg3)
}
// CreateHeadUpdate mocks base method.
func (m *MockSyncClient) CreateHeadUpdate(arg0 tree.ObjectTree, arg1 []*treechangeproto.RawTreeChangeWithId) *spacesyncproto.ObjectSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateHeadUpdate", arg0, arg1)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
return ret0
}
// CreateHeadUpdate indicates an expected call of CreateHeadUpdate.
func (mr *MockSyncClientMockRecorder) CreateHeadUpdate(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHeadUpdate", reflect.TypeOf((*MockSyncClient)(nil).CreateHeadUpdate), arg0, arg1)
}
// CreateNewTreeRequest mocks base method.
func (m *MockSyncClient) CreateNewTreeRequest(arg0 string) *spacesyncproto.ObjectSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateNewTreeRequest", arg0)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
return ret0
}
// CreateNewTreeRequest indicates an expected call of CreateNewTreeRequest.
func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest), arg0)
}
// HasActiveStream mocks base method.
func (m *MockSyncClient) HasActiveStream(arg0 string) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasActiveStream", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// HasActiveStream indicates an expected call of HasActiveStream.
func (mr *MockSyncClientMockRecorder) HasActiveStream(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasActiveStream", reflect.TypeOf((*MockSyncClient)(nil).HasActiveStream), arg0)
}
// SendAsync mocks base method.
func (m *MockSyncClient) SendAsync(arg0 string, arg1 *spacesyncproto.ObjectSyncMessage) error {
func (m *MockSyncClient) SendAsync(arg0 []string, arg1 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendAsync", arg0, arg1)
ret0, _ := ret[0].(error)

View File

@ -1,6 +1,7 @@
package syncservice
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
@ -8,8 +9,10 @@ import (
)
type RequestFactory interface {
FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error)
FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage)
CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage)
CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error)
CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
}
func newRequestFactory() RequestFactory {
@ -18,11 +21,22 @@ func newRequestFactory() RequestFactory {
type requestFactory struct{}
func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
func (r *requestFactory) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapHeadUpdate(&spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
Changes: added,
SnapshotPath: t.SnapshotPath(),
}, t.Header(), t.ID(), "")
}
func (r *requestFactory) CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, "")
}
func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
req := &spacesyncproto.ObjectFullSyncRequest{}
if t == nil {
msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId)
return
return nil, fmt.Errorf("tree should not be empty")
}
req.Heads = t.Heads()
@ -39,7 +53,7 @@ func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSna
return
}
func (r *requestFactory) FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
resp := &spacesyncproto.ObjectFullSyncResponse{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),

View File

@ -18,16 +18,16 @@ const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams
type StreamPool interface {
SyncClient
Sender
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
HasActiveStream(peerId string) bool
Close() (err error)
}
type SyncClient interface {
type Sender interface {
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
}
@ -56,6 +56,8 @@ func newStreamPool(messageHandler MessageHandler) StreamPool {
}
func (s *streamPool) HasActiveStream(peerId string) (res bool) {
s.Lock()
defer s.Unlock()
_, err := s.getOrDeleteStream(peerId)
return err == nil
}
@ -73,7 +75,7 @@ func (s *streamPool) SendSync(
s.waiters[msg.TrackingId] = waiter
s.waitersMx.Unlock()
err = s.SendAsync(peerId, msg)
err = s.SendAsync([]string{peerId}, msg)
if err != nil {
return
}
@ -82,18 +84,31 @@ func (s *streamPool) SendSync(
return
}
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getOrDeleteStream(peerId)
if err != nil {
return
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
getStreams := func() (streams []spacesyncproto.SpaceStream) {
for _, pId := range peers {
stream, err := s.getOrDeleteStream(pId)
if err != nil {
continue
}
streams = append(streams, stream)
}
return streams
}
return stream.Send(message)
s.Lock()
streams := getStreams()
s.Unlock()
for _, s := range streams {
if len(peers) == 1 {
err = s.Send(message)
}
}
return err
}
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) {
s.Lock()
defer s.Unlock()
stream, exists := s.peerStreams[id]
if !exists {
err = ErrEmptyPeer

View File

@ -0,0 +1,49 @@
package syncservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
)
type SyncClient interface {
StreamPool
RequestFactory
BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error)
}
type syncClient struct {
StreamPool
RequestFactory
spaceId string
notifiable HeadNotifiable
configuration nodeconf.Configuration
}
func newSyncClient(spaceId string, pool StreamPool, notifiable HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
return &syncClient{
StreamPool: pool,
RequestFactory: factory,
notifiable: notifiable,
configuration: configuration,
spaceId: spaceId,
}
}
func (s *syncClient) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
s.notifyIfNeeded(message)
return s.BroadcastAsync(message)
}
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) {
if s.configuration.IsResponsible(s.spaceId) {
return s.SendAsync(s.configuration.NodeIds(s.spaceId), message)
}
return s.BroadcastAsync(message)
}
func (s *syncClient) notifyIfNeeded(message *spacesyncproto.ObjectSyncMessage) {
if message.GetContent().GetHeadUpdate() != nil {
update := message.GetContent().GetHeadUpdate()
s.notifiable.UpdateHeads(message.TreeId, update.Heads)
}
}

View File

@ -1,4 +1,3 @@
//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient
package syncservice
import (
@ -13,19 +12,17 @@ type syncHandler struct {
spaceId string
treeCache cache.TreeCache
syncClient SyncClient
factory RequestFactory
}
type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
}
func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient, factory RequestFactory) *syncHandler {
func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
return &syncHandler{
spaceId: spaceId,
treeCache: treeCache,
syncClient: syncClient,
factory: factory,
}
}
@ -48,7 +45,10 @@ func (s *syncHandler) handleHeadUpdate(
update *spacesyncproto.ObjectHeadUpdate,
msg *spacesyncproto.ObjectSyncMessage) (err error) {
var fullRequest *spacesyncproto.ObjectSyncMessage
var (
fullRequest *spacesyncproto.ObjectSyncMessage
isEmptyUpdate = len(update.Changes) == 0
)
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
if err != nil {
return
@ -60,7 +60,14 @@ func (s *syncHandler) handleHeadUpdate(
defer res.Release()
defer objTree.Unlock()
if s.alreadyHaveHeads(objTree, update.Heads) {
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
// we need to sync in any case
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
return err
}
if s.alreadyHasHeads(objTree, update.Heads) {
return nil
}
@ -69,19 +76,16 @@ func (s *syncHandler) handleHeadUpdate(
return err
}
if s.alreadyHaveHeads(objTree, update.Heads) {
if s.alreadyHasHeads(objTree, update.Heads) {
return nil
}
fullRequest, err = s.factory.FullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
if err != nil {
return err
}
return nil
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
return err
}()
if fullRequest != nil {
return s.syncClient.SendAsync(senderId, fullRequest)
return s.syncClient.SendAsync([]string{senderId}, fullRequest)
}
return
}
@ -97,7 +101,7 @@ func (s *syncHandler) handleFullSyncRequest(
)
defer func() {
if err != nil {
s.syncClient.SendAsync(senderId, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId))
s.syncClient.SendAsync([]string{senderId}, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId))
}
}()
@ -116,21 +120,21 @@ func (s *syncHandler) handleFullSyncRequest(
header = objTree.Header()
}
if !s.alreadyHaveHeads(objTree, request.Heads) {
if !s.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
}
}
fullResponse, err = s.factory.FullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId)
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId)
return err
}()
if err != nil {
return
}
return s.syncClient.SendAsync(senderId, fullResponse)
return s.syncClient.SendAsync([]string{senderId}, fullResponse)
}
func (s *syncHandler) handleFullSyncResponse(
@ -149,7 +153,7 @@ func (s *syncHandler) handleFullSyncResponse(
defer res.Release()
defer objTree.Unlock()
if s.alreadyHaveHeads(objTree, response.Heads) {
if s.alreadyHasHeads(objTree, response.Heads) {
return nil
}
@ -160,6 +164,6 @@ func (s *syncHandler) handleFullSyncResponse(
return
}
func (s *syncHandler) alreadyHaveHeads(t tree.ObjectTree, heads []string) bool {
func (s *syncHandler) alreadyHasHeads(t tree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...)
}

View File

@ -48,9 +48,12 @@ func TestSyncHandler_HandleMessage(t *testing.T) {
Release: func() {},
TreeContainer: treeContainer{objectTreeMock},
}, nil)
objectTreeMock.EXPECT().Lock()
objectTreeMock.EXPECT().Heads().Return([]string{"h2"})
objectTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})).
objectTreeMock.EXPECT().
Lock()
objectTreeMock.EXPECT().
Heads().Return([]string{"h2"})
objectTreeMock.EXPECT().
AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})).
Return(tree.AddResult{}, nil)
objectTreeMock.EXPECT().Unlock()
err := syncHandler.HandleMessage(ctx, senderId, msg)

View File

@ -1,3 +1,4 @@
//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient
package syncservice
import (
@ -7,19 +8,13 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"time"
)
var log = logger.NewNamed("syncservice").Sugar()
type SyncService interface {
NotifyHeadUpdate(
ctx context.Context,
treeId string,
root *treechangeproto.RawTreeChangeWithId,
update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool
SyncClient() SyncClient
Init()
Close() (err error)
@ -34,11 +29,9 @@ const respPeersStreamCheckInterval = time.Second * 10
type syncService struct {
spaceId string
syncHandler SyncHandler
streamPool StreamPool
headNotifiable HeadNotifiable
configuration nodeconf.Configuration
clientFactory spacesyncproto.ClientFactory
syncClient SyncClient
configuration nodeconf.Configuration
clientFactory spacesyncproto.ClientFactory
streamLoopCtx context.Context
stopStreamLoop context.CancelFunc
@ -47,30 +40,26 @@ type syncService struct {
func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
var syncHandler SyncHandler
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
pool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message)
})
syncHandler = newSyncHandler(spaceId, cache, streamPool, newRequestFactory())
factory := newRequestFactory()
syncClient := newSyncClient(spaceId, pool, headNotifiable, factory, configuration)
syncHandler = newSyncHandler(spaceId, cache, syncClient)
return newSyncService(
spaceId,
headNotifiable,
syncHandler,
streamPool,
syncClient,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
configuration)
}
func newSyncService(
spaceId string,
headNotifiable HeadNotifiable,
syncHandler SyncHandler,
streamPool StreamPool,
syncClient SyncClient,
clientFactory spacesyncproto.ClientFactory,
configuration nodeconf.Configuration) *syncService {
return &syncService{
syncHandler: syncHandler,
streamPool: streamPool,
headNotifiable: headNotifiable,
syncClient: syncClient,
configuration: configuration,
clientFactory: clientFactory,
spaceId: spaceId,
@ -86,16 +75,7 @@ func (s *syncService) Init() {
func (s *syncService) Close() (err error) {
s.stopStreamLoop()
<-s.streamLoopDone
return s.streamPool.Close()
}
func (s *syncService) NotifyHeadUpdate(
ctx context.Context,
treeId string,
header *treechangeproto.RawTreeChangeWithId,
update *spacesyncproto.ObjectHeadUpdate) (err error) {
s.headNotifiable.UpdateHeads(treeId, update.Heads)
return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId, ""))
return s.syncClient.Close()
}
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
@ -106,7 +86,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
return
}
for _, peer := range respPeers {
if s.streamPool.HasActiveStream(peer.Id()) {
if s.syncClient.HasActiveStream(peer.Id()) {
continue
}
stream, err := s.clientFactory.Client(peer).Stream(ctx)
@ -124,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
continue
}
s.streamPool.AddAndReadStreamAsync(stream)
s.syncClient.AddAndReadStreamAsync(stream)
}
}
@ -141,6 +121,6 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
}
}
func (s *syncService) StreamPool() StreamPool {
return s.streamPool
func (s *syncService) SyncClient() SyncClient {
return s.syncClient
}

View File

@ -2,7 +2,6 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
@ -14,14 +13,14 @@ import (
// SyncTree sends head updates to sync service and also sends new changes to update listener
type SyncTree struct {
tree.ObjectTree
syncService syncservice.SyncService
listener updatelistener.UpdateListener
syncClient syncservice.SyncClient
listener updatelistener.UpdateListener
}
func DeriveSyncTree(
ctx context.Context,
payload tree.ObjectTreeCreatePayload,
syncService syncservice.SyncService,
syncClient syncservice.SyncClient,
listener updatelistener.UpdateListener,
aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
@ -30,22 +29,20 @@ func DeriveSyncTree(
return
}
t = &SyncTree{
ObjectTree: t,
syncService: syncService,
listener: listener,
ObjectTree: t,
syncClient: syncClient,
listener: listener,
}
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
})
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
return
}
func CreateSyncTree(
ctx context.Context,
payload tree.ObjectTreeCreatePayload,
syncService syncservice.SyncService,
syncClient syncservice.SyncClient,
listener updatelistener.UpdateListener,
aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
@ -54,30 +51,28 @@ func CreateSyncTree(
return
}
t = &SyncTree{
ObjectTree: t,
syncService: syncService,
listener: listener,
ObjectTree: t,
syncClient: syncClient,
listener: listener,
}
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
})
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
return
}
func BuildSyncTree(
ctx context.Context,
syncService syncservice.SyncService,
syncClient syncservice.SyncClient,
treeStorage storage.TreeStorage,
listener updatelistener.UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) {
return buildSyncTree(ctx, syncService, treeStorage, listener, aclList)
return buildSyncTree(ctx, syncClient, treeStorage, listener, aclList)
}
func buildSyncTree(
ctx context.Context,
syncService syncservice.SyncService,
syncClient syncservice.SyncClient,
treeStorage storage.TreeStorage,
listener updatelistener.UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) {
@ -86,15 +81,14 @@ func buildSyncTree(
return
}
t = &SyncTree{
ObjectTree: t,
syncService: syncService,
listener: listener,
ObjectTree: t,
syncClient: syncClient,
listener: listener,
}
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
})
headUpdate := syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update
err = syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
return
}
@ -103,11 +97,8 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if err != nil {
return
}
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: res.Heads,
Changes: res.Added,
SnapshotPath: s.SnapshotPath(),
})
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
return
}
@ -125,11 +116,8 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
s.listener.Rebuild(s)
}
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: res.Heads,
Changes: res.Added,
SnapshotPath: s.SnapshotPath(),
})
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
return
}