Further updates to sync logic

This commit is contained in:
mcrakhman 2022-10-02 16:11:47 +02:00 committed by Mikhail Iudin
parent 9e95f21cbd
commit 84356efa86
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
10 changed files with 303 additions and 123 deletions

View File

@ -19,5 +19,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
} }
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream) return r.s.SyncService().SyncClient().AddAndReadStreamSync(stream)
} }

View File

@ -90,11 +90,11 @@ func (s *space) DiffService() diffservice.DiffService {
} }
func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) { func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) {
return synctree.DeriveSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage) return synctree.DeriveSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage)
} }
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) { func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error) {
return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, s.aclList, s.storage.CreateTreeStorage) return synctree.CreateSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage)
} }
func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) { func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) {
@ -104,10 +104,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.syncService.SyncClient().SendSync(
return s.syncService.StreamPool().SendSync(
peerId, peerId,
spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, ""), s.syncService.SyncClient().CreateNewTreeRequest(id),
) )
} }
@ -142,7 +141,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return return
} }
} }
return synctree.BuildSyncTree(ctx, s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) return synctree.BuildSyncTree(ctx, s.syncService.SyncClient(), store.(treestorage.TreeStorage), listener, s.aclList)
} }
func (s *space) Close() error { func (s *space) Close() error {

View File

@ -8,6 +8,8 @@ import (
reflect "reflect" reflect "reflect"
spacesyncproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" spacesyncproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
treechangeproto "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
) )
@ -34,6 +36,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
return m.recorder return m.recorder
} }
// AddAndReadStreamAsync mocks base method.
func (m *MockSyncClient) AddAndReadStreamAsync(arg0 spacesyncproto.DRPCSpace_StreamStream) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "AddAndReadStreamAsync", arg0)
}
// AddAndReadStreamAsync indicates an expected call of AddAndReadStreamAsync.
func (mr *MockSyncClientMockRecorder) AddAndReadStreamAsync(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamAsync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamAsync), arg0)
}
// AddAndReadStreamSync mocks base method.
func (m *MockSyncClient) AddAndReadStreamSync(arg0 spacesyncproto.DRPCSpace_StreamStream) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddAndReadStreamSync", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddAndReadStreamSync indicates an expected call of AddAndReadStreamSync.
func (mr *MockSyncClientMockRecorder) AddAndReadStreamSync(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddAndReadStreamSync", reflect.TypeOf((*MockSyncClient)(nil).AddAndReadStreamSync), arg0)
}
// BroadcastAsync mocks base method. // BroadcastAsync mocks base method.
func (m *MockSyncClient) BroadcastAsync(arg0 *spacesyncproto.ObjectSyncMessage) error { func (m *MockSyncClient) BroadcastAsync(arg0 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -48,8 +76,108 @@ func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0)
} }
// BroadcastAsyncOrSendResponsible mocks base method.
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0)
}
// Close mocks base method.
func (m *MockSyncClient) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockSyncClientMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncClient)(nil).Close))
}
// CreateFullSyncRequest mocks base method.
func (m *MockSyncClient) CreateFullSyncRequest(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncRequest", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncRequest indicates an expected call of CreateFullSyncRequest.
func (mr *MockSyncClientMockRecorder) CreateFullSyncRequest(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncRequest), arg0, arg1, arg2, arg3)
}
// CreateFullSyncResponse mocks base method.
func (m *MockSyncClient) CreateFullSyncResponse(arg0 tree.ObjectTree, arg1, arg2 []string, arg3 string) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncResponse", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncResponse indicates an expected call of CreateFullSyncResponse.
func (mr *MockSyncClientMockRecorder) CreateFullSyncResponse(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncResponse", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncResponse), arg0, arg1, arg2, arg3)
}
// CreateHeadUpdate mocks base method.
func (m *MockSyncClient) CreateHeadUpdate(arg0 tree.ObjectTree, arg1 []*treechangeproto.RawTreeChangeWithId) *spacesyncproto.ObjectSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateHeadUpdate", arg0, arg1)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
return ret0
}
// CreateHeadUpdate indicates an expected call of CreateHeadUpdate.
func (mr *MockSyncClientMockRecorder) CreateHeadUpdate(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHeadUpdate", reflect.TypeOf((*MockSyncClient)(nil).CreateHeadUpdate), arg0, arg1)
}
// CreateNewTreeRequest mocks base method.
func (m *MockSyncClient) CreateNewTreeRequest(arg0 string) *spacesyncproto.ObjectSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateNewTreeRequest", arg0)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
return ret0
}
// CreateNewTreeRequest indicates an expected call of CreateNewTreeRequest.
func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest), arg0)
}
// HasActiveStream mocks base method.
func (m *MockSyncClient) HasActiveStream(arg0 string) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasActiveStream", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// HasActiveStream indicates an expected call of HasActiveStream.
func (mr *MockSyncClientMockRecorder) HasActiveStream(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasActiveStream", reflect.TypeOf((*MockSyncClient)(nil).HasActiveStream), arg0)
}
// SendAsync mocks base method. // SendAsync mocks base method.
func (m *MockSyncClient) SendAsync(arg0 string, arg1 *spacesyncproto.ObjectSyncMessage) error { func (m *MockSyncClient) SendAsync(arg0 []string, arg1 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendAsync", arg0, arg1) ret := m.ctrl.Call(m, "SendAsync", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)

View File

@ -1,6 +1,7 @@
package syncservice package syncservice
import ( import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
@ -8,8 +9,10 @@ import (
) )
type RequestFactory interface { type RequestFactory interface {
FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage)
FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error) CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage)
CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (req *spacesyncproto.ObjectSyncMessage, err error)
CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error)
} }
func newRequestFactory() RequestFactory { func newRequestFactory() RequestFactory {
@ -18,11 +21,22 @@ func newRequestFactory() RequestFactory {
type requestFactory struct{} type requestFactory struct{}
func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { func (r *requestFactory) CreateHeadUpdate(t tree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapHeadUpdate(&spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
Changes: added,
SnapshotPath: t.SnapshotPath(),
}, t.Header(), t.ID(), "")
}
func (r *requestFactory) CreateNewTreeRequest(id string) (msg *spacesyncproto.ObjectSyncMessage) {
return spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, "")
}
func (r *requestFactory) CreateFullSyncRequest(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
req := &spacesyncproto.ObjectFullSyncRequest{} req := &spacesyncproto.ObjectFullSyncRequest{}
if t == nil { if t == nil {
msg = spacesyncproto.WrapFullRequest(req, t.Header(), t.ID(), trackingId) return nil, fmt.Errorf("tree should not be empty")
return
} }
req.Heads = t.Heads() req.Heads = t.Heads()
@ -39,7 +53,7 @@ func (r *requestFactory) FullSyncRequest(t tree.ObjectTree, theirHeads, theirSna
return return
} }
func (r *requestFactory) FullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) { func (r *requestFactory) CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (msg *spacesyncproto.ObjectSyncMessage, err error) {
resp := &spacesyncproto.ObjectFullSyncResponse{ resp := &spacesyncproto.ObjectFullSyncResponse{
Heads: t.Heads(), Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(), SnapshotPath: t.SnapshotPath(),

View File

@ -18,16 +18,16 @@ const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams // StreamPool can be made generic to work with different streams
type StreamPool interface { type StreamPool interface {
SyncClient Sender
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
HasActiveStream(peerId string) bool HasActiveStream(peerId string) bool
Close() (err error) Close() (err error)
} }
type SyncClient interface { type Sender interface {
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
} }
@ -56,6 +56,8 @@ func newStreamPool(messageHandler MessageHandler) StreamPool {
} }
func (s *streamPool) HasActiveStream(peerId string) (res bool) { func (s *streamPool) HasActiveStream(peerId string) (res bool) {
s.Lock()
defer s.Unlock()
_, err := s.getOrDeleteStream(peerId) _, err := s.getOrDeleteStream(peerId)
return err == nil return err == nil
} }
@ -73,7 +75,7 @@ func (s *streamPool) SendSync(
s.waiters[msg.TrackingId] = waiter s.waiters[msg.TrackingId] = waiter
s.waitersMx.Unlock() s.waitersMx.Unlock()
err = s.SendAsync(peerId, msg) err = s.SendAsync([]string{peerId}, msg)
if err != nil { if err != nil {
return return
} }
@ -82,18 +84,31 @@ func (s *streamPool) SendSync(
return return
} }
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getOrDeleteStream(peerId) getStreams := func() (streams []spacesyncproto.SpaceStream) {
for _, pId := range peers {
stream, err := s.getOrDeleteStream(pId)
if err != nil { if err != nil {
return continue
}
streams = append(streams, stream)
}
return streams
} }
return stream.Send(message) s.Lock()
streams := getStreams()
s.Unlock()
for _, s := range streams {
if len(peers) == 1 {
err = s.Send(message)
}
}
return err
} }
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) { func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) {
s.Lock()
defer s.Unlock()
stream, exists := s.peerStreams[id] stream, exists := s.peerStreams[id]
if !exists { if !exists {
err = ErrEmptyPeer err = ErrEmptyPeer

View File

@ -0,0 +1,49 @@
package syncservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
)
type SyncClient interface {
StreamPool
RequestFactory
BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error)
}
type syncClient struct {
StreamPool
RequestFactory
spaceId string
notifiable HeadNotifiable
configuration nodeconf.Configuration
}
func newSyncClient(spaceId string, pool StreamPool, notifiable HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient {
return &syncClient{
StreamPool: pool,
RequestFactory: factory,
notifiable: notifiable,
configuration: configuration,
spaceId: spaceId,
}
}
func (s *syncClient) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
s.notifyIfNeeded(message)
return s.BroadcastAsync(message)
}
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) {
if s.configuration.IsResponsible(s.spaceId) {
return s.SendAsync(s.configuration.NodeIds(s.spaceId), message)
}
return s.BroadcastAsync(message)
}
func (s *syncClient) notifyIfNeeded(message *spacesyncproto.ObjectSyncMessage) {
if message.GetContent().GetHeadUpdate() != nil {
update := message.GetContent().GetHeadUpdate()
s.notifiable.UpdateHeads(message.TreeId, update.Heads)
}
}

View File

@ -1,4 +1,3 @@
//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient
package syncservice package syncservice
import ( import (
@ -13,19 +12,17 @@ type syncHandler struct {
spaceId string spaceId string
treeCache cache.TreeCache treeCache cache.TreeCache
syncClient SyncClient syncClient SyncClient
factory RequestFactory
} }
type SyncHandler interface { type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
} }
func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient, factory RequestFactory) *syncHandler { func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
return &syncHandler{ return &syncHandler{
spaceId: spaceId, spaceId: spaceId,
treeCache: treeCache, treeCache: treeCache,
syncClient: syncClient, syncClient: syncClient,
factory: factory,
} }
} }
@ -48,7 +45,10 @@ func (s *syncHandler) handleHeadUpdate(
update *spacesyncproto.ObjectHeadUpdate, update *spacesyncproto.ObjectHeadUpdate,
msg *spacesyncproto.ObjectSyncMessage) (err error) { msg *spacesyncproto.ObjectSyncMessage) (err error) {
var fullRequest *spacesyncproto.ObjectSyncMessage var (
fullRequest *spacesyncproto.ObjectSyncMessage
isEmptyUpdate = len(update.Changes) == 0
)
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
if err != nil { if err != nil {
return return
@ -60,7 +60,14 @@ func (s *syncHandler) handleHeadUpdate(
defer res.Release() defer res.Release()
defer objTree.Unlock() defer objTree.Unlock()
if s.alreadyHaveHeads(objTree, update.Heads) { // isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
// we need to sync in any case
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
return err
}
if s.alreadyHasHeads(objTree, update.Heads) {
return nil return nil
} }
@ -69,19 +76,16 @@ func (s *syncHandler) handleHeadUpdate(
return err return err
} }
if s.alreadyHaveHeads(objTree, update.Heads) { if s.alreadyHasHeads(objTree, update.Heads) {
return nil return nil
} }
fullRequest, err = s.factory.FullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId) fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath, msg.TrackingId)
if err != nil {
return err return err
}
return nil
}() }()
if fullRequest != nil { if fullRequest != nil {
return s.syncClient.SendAsync(senderId, fullRequest) return s.syncClient.SendAsync([]string{senderId}, fullRequest)
} }
return return
} }
@ -97,7 +101,7 @@ func (s *syncHandler) handleFullSyncRequest(
) )
defer func() { defer func() {
if err != nil { if err != nil {
s.syncClient.SendAsync(senderId, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId)) s.syncClient.SendAsync([]string{senderId}, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId))
} }
}() }()
@ -116,21 +120,21 @@ func (s *syncHandler) handleFullSyncRequest(
header = objTree.Header() header = objTree.Header()
} }
if !s.alreadyHaveHeads(objTree, request.Heads) { if !s.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, request.Changes...) _, err = objTree.AddRawChanges(ctx, request.Changes...)
if err != nil { if err != nil {
return err return err
} }
} }
fullResponse, err = s.factory.FullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId) fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath, msg.TrackingId)
return err return err
}() }()
if err != nil { if err != nil {
return return
} }
return s.syncClient.SendAsync(senderId, fullResponse) return s.syncClient.SendAsync([]string{senderId}, fullResponse)
} }
func (s *syncHandler) handleFullSyncResponse( func (s *syncHandler) handleFullSyncResponse(
@ -149,7 +153,7 @@ func (s *syncHandler) handleFullSyncResponse(
defer res.Release() defer res.Release()
defer objTree.Unlock() defer objTree.Unlock()
if s.alreadyHaveHeads(objTree, response.Heads) { if s.alreadyHasHeads(objTree, response.Heads) {
return nil return nil
} }
@ -160,6 +164,6 @@ func (s *syncHandler) handleFullSyncResponse(
return return
} }
func (s *syncHandler) alreadyHaveHeads(t tree.ObjectTree, heads []string) bool { func (s *syncHandler) alreadyHasHeads(t tree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...) return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...)
} }

View File

@ -48,9 +48,12 @@ func TestSyncHandler_HandleMessage(t *testing.T) {
Release: func() {}, Release: func() {},
TreeContainer: treeContainer{objectTreeMock}, TreeContainer: treeContainer{objectTreeMock},
}, nil) }, nil)
objectTreeMock.EXPECT().Lock() objectTreeMock.EXPECT().
objectTreeMock.EXPECT().Heads().Return([]string{"h2"}) Lock()
objectTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})). objectTreeMock.EXPECT().
Heads().Return([]string{"h2"})
objectTreeMock.EXPECT().
AddRawChanges(gomock.Any(), gomock.Eq([]*treechangeproto.RawTreeChangeWithId{chWithId})).
Return(tree.AddResult{}, nil) Return(tree.AddResult{}, nil)
objectTreeMock.EXPECT().Unlock() objectTreeMock.EXPECT().Unlock()
err := syncHandler.HandleMessage(ctx, senderId, msg) err := syncHandler.HandleMessage(ctx, senderId, msg)

View File

@ -1,3 +1,4 @@
//go:generate mockgen -destination mock_syncservice/mock_syncservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice SyncClient
package syncservice package syncservice
import ( import (
@ -7,19 +8,13 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"time" "time"
) )
var log = logger.NewNamed("syncservice").Sugar() var log = logger.NewNamed("syncservice").Sugar()
type SyncService interface { type SyncService interface {
NotifyHeadUpdate( SyncClient() SyncClient
ctx context.Context,
treeId string,
root *treechangeproto.RawTreeChangeWithId,
update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool
Init() Init()
Close() (err error) Close() (err error)
@ -34,9 +29,7 @@ const respPeersStreamCheckInterval = time.Second * 10
type syncService struct { type syncService struct {
spaceId string spaceId string
syncHandler SyncHandler syncClient SyncClient
streamPool StreamPool
headNotifiable HeadNotifiable
configuration nodeconf.Configuration configuration nodeconf.Configuration
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
@ -47,30 +40,26 @@ type syncService struct {
func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService { func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
var syncHandler SyncHandler var syncHandler SyncHandler
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { pool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message) return syncHandler.HandleMessage(ctx, senderId, message)
}) })
syncHandler = newSyncHandler(spaceId, cache, streamPool, newRequestFactory()) factory := newRequestFactory()
syncClient := newSyncClient(spaceId, pool, headNotifiable, factory, configuration)
syncHandler = newSyncHandler(spaceId, cache, syncClient)
return newSyncService( return newSyncService(
spaceId, spaceId,
headNotifiable, syncClient,
syncHandler,
streamPool,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient), spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
configuration) configuration)
} }
func newSyncService( func newSyncService(
spaceId string, spaceId string,
headNotifiable HeadNotifiable, syncClient SyncClient,
syncHandler SyncHandler,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory, clientFactory spacesyncproto.ClientFactory,
configuration nodeconf.Configuration) *syncService { configuration nodeconf.Configuration) *syncService {
return &syncService{ return &syncService{
syncHandler: syncHandler, syncClient: syncClient,
streamPool: streamPool,
headNotifiable: headNotifiable,
configuration: configuration, configuration: configuration,
clientFactory: clientFactory, clientFactory: clientFactory,
spaceId: spaceId, spaceId: spaceId,
@ -86,16 +75,7 @@ func (s *syncService) Init() {
func (s *syncService) Close() (err error) { func (s *syncService) Close() (err error) {
s.stopStreamLoop() s.stopStreamLoop()
<-s.streamLoopDone <-s.streamLoopDone
return s.streamPool.Close() return s.syncClient.Close()
}
func (s *syncService) NotifyHeadUpdate(
ctx context.Context,
treeId string,
header *treechangeproto.RawTreeChangeWithId,
update *spacesyncproto.ObjectHeadUpdate) (err error) {
s.headNotifiable.UpdateHeads(treeId, update.Heads)
return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId, ""))
} }
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
@ -106,7 +86,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
return return
} }
for _, peer := range respPeers { for _, peer := range respPeers {
if s.streamPool.HasActiveStream(peer.Id()) { if s.syncClient.HasActiveStream(peer.Id()) {
continue continue
} }
stream, err := s.clientFactory.Client(peer).Stream(ctx) stream, err := s.clientFactory.Client(peer).Stream(ctx)
@ -124,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
continue continue
} }
s.streamPool.AddAndReadStreamAsync(stream) s.syncClient.AddAndReadStreamAsync(stream)
} }
} }
@ -141,6 +121,6 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
} }
} }
func (s *syncService) StreamPool() StreamPool { func (s *syncService) SyncClient() SyncClient {
return s.streamPool return s.syncClient
} }

View File

@ -2,7 +2,6 @@ package synctree
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
@ -14,14 +13,14 @@ import (
// SyncTree sends head updates to sync service and also sends new changes to update listener // SyncTree sends head updates to sync service and also sends new changes to update listener
type SyncTree struct { type SyncTree struct {
tree.ObjectTree tree.ObjectTree
syncService syncservice.SyncService syncClient syncservice.SyncClient
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
} }
func DeriveSyncTree( func DeriveSyncTree(
ctx context.Context, ctx context.Context,
payload tree.ObjectTreeCreatePayload, payload tree.ObjectTreeCreatePayload,
syncService syncservice.SyncService, syncClient syncservice.SyncClient,
listener updatelistener.UpdateListener, listener updatelistener.UpdateListener,
aclList list.ACLList, aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
@ -31,21 +30,19 @@ func DeriveSyncTree(
} }
t = &SyncTree{ t = &SyncTree{
ObjectTree: t, ObjectTree: t,
syncService: syncService, syncClient: syncClient,
listener: listener, listener: listener,
} }
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ headUpdate := syncClient.CreateHeadUpdate(t, nil)
Heads: t.Heads(), err = syncClient.BroadcastAsync(headUpdate)
SnapshotPath: t.SnapshotPath(),
})
return return
} }
func CreateSyncTree( func CreateSyncTree(
ctx context.Context, ctx context.Context,
payload tree.ObjectTreeCreatePayload, payload tree.ObjectTreeCreatePayload,
syncService syncservice.SyncService, syncClient syncservice.SyncClient,
listener updatelistener.UpdateListener, listener updatelistener.UpdateListener,
aclList list.ACLList, aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
@ -55,29 +52,27 @@ func CreateSyncTree(
} }
t = &SyncTree{ t = &SyncTree{
ObjectTree: t, ObjectTree: t,
syncService: syncService, syncClient: syncClient,
listener: listener, listener: listener,
} }
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ headUpdate := syncClient.CreateHeadUpdate(t, nil)
Heads: t.Heads(), err = syncClient.BroadcastAsync(headUpdate)
SnapshotPath: t.SnapshotPath(),
})
return return
} }
func BuildSyncTree( func BuildSyncTree(
ctx context.Context, ctx context.Context,
syncService syncservice.SyncService, syncClient syncservice.SyncClient,
treeStorage storage.TreeStorage, treeStorage storage.TreeStorage,
listener updatelistener.UpdateListener, listener updatelistener.UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) { aclList list.ACLList) (t tree.ObjectTree, err error) {
return buildSyncTree(ctx, syncService, treeStorage, listener, aclList) return buildSyncTree(ctx, syncClient, treeStorage, listener, aclList)
} }
func buildSyncTree( func buildSyncTree(
ctx context.Context, ctx context.Context,
syncService syncservice.SyncService, syncClient syncservice.SyncClient,
treeStorage storage.TreeStorage, treeStorage storage.TreeStorage,
listener updatelistener.UpdateListener, listener updatelistener.UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) { aclList list.ACLList) (t tree.ObjectTree, err error) {
@ -87,14 +82,13 @@ func buildSyncTree(
} }
t = &SyncTree{ t = &SyncTree{
ObjectTree: t, ObjectTree: t,
syncService: syncService, syncClient: syncClient,
listener: listener, listener: listener,
} }
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{ headUpdate := syncClient.CreateHeadUpdate(t, nil)
Heads: t.Heads(), // here we will have different behaviour based on who is sending this update
SnapshotPath: t.SnapshotPath(), err = syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
})
return return
} }
@ -103,11 +97,8 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if err != nil { if err != nil {
return return
} }
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
Heads: res.Heads, err = s.syncClient.BroadcastAsync(headUpdate)
Changes: res.Added,
SnapshotPath: s.SnapshotPath(),
})
return return
} }
@ -125,11 +116,8 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
s.listener.Rebuild(s) s.listener.Rebuild(s)
} }
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{ headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
Heads: res.Heads, err = s.syncClient.BroadcastAsync(headUpdate)
Changes: res.Added,
SnapshotPath: s.SnapshotPath(),
})
return return
} }