WIP rearrange components

This commit is contained in:
mcrakhman 2023-06-03 15:57:55 +02:00
parent a89a325d6c
commit 748681d765
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
29 changed files with 338 additions and 375 deletions

View File

@ -125,14 +125,10 @@ func (app *App) ChildApp() *App {
func (app *App) Register(s Component) *App { func (app *App) Register(s Component) *App {
app.mu.Lock() app.mu.Lock()
defer app.mu.Unlock() defer app.mu.Unlock()
current := app for _, es := range app.components {
for current != nil { if s.Name() == es.Name() {
for _, es := range current.components { panic(fmt.Errorf("component '%s' already registered", s.Name()))
if s.Name() == es.Name() {
panic(fmt.Errorf("component '%s' already registered", s.Name()))
}
} }
current = current.parent
} }
app.components = append(app.components, s) app.components = append(app.components, s)
return app return app

View File

@ -48,6 +48,11 @@ func TestAppServiceRegistry(t *testing.T) {
names := app.ComponentNames() names := app.ComponentNames()
assert.Equal(t, names, []string{"x1", "c1", "r1", "s1"}) assert.Equal(t, names, []string{"x1", "c1", "r1", "s1"})
}) })
t.Run("Child override", func(t *testing.T) {
app := app.ChildApp()
app.Register(newTestService(testTypeRunnable, "s1", nil, nil))
_ = app.MustComponent("s1").(*testRunnable)
})
} }
func TestAppStart(t *testing.T) { func TestAppStart(t *testing.T) {

View File

@ -35,7 +35,7 @@ type objectDeletionState struct {
} }
func (st *objectDeletionState) Init(a *app.App) (err error) { func (st *objectDeletionState) Init(a *app.App) (err error) {
st.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) st.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
return nil return nil
} }

View File

@ -73,12 +73,12 @@ func (h *headSync) Init(a *app.App) (err error) {
h.syncPeriod = cfg.GetSpace().SyncPeriod h.syncPeriod = cfg.GetSpace().SyncPeriod
h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
h.log = log.With(zap.String("spaceId", h.spaceId)) h.log = log.With(zap.String("spaceId", h.spaceId))
h.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
h.diff = ldiff.New(16, 16) h.diff = ldiff.New(16, 16)
h.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager) h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
h.treeManager = app.MustComponent[treemanager.TreeManager](a) h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
h.syncer = newDiffSyncer(h) h.syncer = newDiffSyncer(h)
sync := func(ctx context.Context) (err error) { sync := func(ctx context.Context) (err error) {

View File

@ -19,12 +19,16 @@ type SyncAcl struct {
list.AclList list.AclList
} }
func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
return nil, nil
}
func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
return nil return nil
} }
func (s *SyncAcl) Init(a *app.App) (err error) { func (s *SyncAcl) Init(a *app.App) (err error) {
storage := a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) storage := a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
aclStorage, err := storage.AclStorage() aclStorage, err := storage.AclStorage()
if err != nil { if err != nil {
return err return err

View File

@ -1,4 +1,4 @@
package syncclient package synctree
import ( import (
"fmt" "fmt"

View File

@ -1,23 +1,15 @@
package syncclient package synctree
import ( import (
"context" "context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/requestsender" "github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/streamsender"
"go.uber.org/zap" "go.uber.org/zap"
) )
const CName = "common.objectsync.syncclient"
var log = logger.NewNamed(CName)
type SyncClient interface { type SyncClient interface {
app.Component
RequestFactory RequestFactory
Broadcast(msg *treechangeproto.TreeSyncMessage) Broadcast(msg *treechangeproto.TreeSyncMessage)
SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error)
@ -27,35 +19,25 @@ type SyncClient interface {
type syncClient struct { type syncClient struct {
RequestFactory RequestFactory
spaceId string spaceId string
requestSender requestsender.RequestSender requestManager requestmanager.RequestManager
streamSender streamsender.StreamSender peerManager peermanager.PeerManager
} }
func New() SyncClient { func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {
return &syncClient{ return &syncClient{
RequestFactory: &requestFactory{}, RequestFactory: &requestFactory{},
spaceId: spaceId,
requestManager: requestManager,
peerManager: peerManager,
} }
} }
func (s *syncClient) Init(a *app.App) (err error) {
sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.spaceId = sharedState.SpaceId
s.requestSender = a.MustComponent(requestsender.CName).(requestsender.RequestSender)
s.streamSender = a.MustComponent(streamsender.CName).(streamsender.StreamSender)
return nil
}
func (s *syncClient) Name() (name string) {
return CName
}
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) { func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
if err != nil { if err != nil {
return return
} }
err = s.streamSender.Broadcast(objMsg) err = s.peerManager.Broadcast(context.Background(), objMsg)
if err != nil { if err != nil {
log.Debug("broadcast error", zap.Error(err)) log.Debug("broadcast error", zap.Error(err))
} }
@ -66,7 +48,7 @@ func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.Tr
if err != nil { if err != nil {
return return
} }
return s.streamSender.SendPeer(peerId, objMsg) return s.peerManager.SendPeer(context.Background(), peerId, objMsg)
} }
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
@ -74,7 +56,7 @@ func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, m
if err != nil { if err != nil {
return return
} }
return s.requestSender.SendRequest(ctx, peerId, objMsg) return s.requestManager.SendRequest(ctx, peerId, objMsg)
} }
func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
@ -82,7 +64,7 @@ func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.
if err != nil { if err != nil {
return return
} }
return s.requestSender.QueueRequest(peerId, objMsg) return s.requestManager.QueueRequest(peerId, objMsg)
} }
func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {

View File

@ -0,0 +1,152 @@
package synctree
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
)
type TreeSyncProtocol interface {
HeadUpdate(ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate) (request *treechangeproto.TreeSyncMessage, err error)
FullSyncRequest(ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest) (response *treechangeproto.TreeSyncMessage, err error)
FullSyncResponse(ctx context.Context, senderId string, response *treechangeproto.TreeFullSyncResponse) (err error)
}
type treeSyncProtocol struct {
log logger.CtxLogger
spaceId string
objTree objecttree.ObjectTree
reqFactory RequestFactory
}
func newTreeSyncProtocol(spaceId string, objTree objecttree.ObjectTree, reqFactory RequestFactory) *treeSyncProtocol {
return &treeSyncProtocol{
log: log.With(zap.String("spaceId", spaceId), zap.String("treeId", objTree.Id())),
spaceId: spaceId,
objTree: objTree,
reqFactory: reqFactory,
}
}
func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate) (fullRequest *treechangeproto.TreeSyncMessage, err error) {
var (
isEmptyUpdate = len(update.Changes) == 0
objTree = t.objTree
)
log := t.log.With(
zap.String("senderId", senderId),
zap.Strings("update heads", update.Heads),
zap.Int("len(update changes)", len(update.Changes)))
log.DebugCtx(ctx, "received head update message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "head update finished with error", zap.Error(err))
} else if fullRequest != nil {
cnt := fullRequest.Content.GetFullSyncRequest()
log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "returning full sync request")
} else {
if !isEmptyUpdate {
log.DebugCtx(ctx, "head update finished correctly")
}
}
}()
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
if headEquals {
return
}
// we need to sync in any case
fullRequest, err = t.reqFactory.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
return
}
if t.alreadyHasHeads(objTree, update.Heads) {
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: update.Heads,
RawChanges: update.Changes,
})
if err != nil {
return
}
if t.alreadyHasHeads(objTree, update.Heads) {
return
}
fullRequest, err = t.reqFactory.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
return
}
func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest) (fullResponse *treechangeproto.TreeSyncMessage, err error) {
var (
objTree = t.objTree
)
log := t.log.With(zap.String("senderId", senderId),
zap.Strings("request heads", request.Heads),
zap.Int("len(request changes)", len(request.Changes)))
log.DebugCtx(ctx, "received full sync request message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err))
} else if fullResponse != nil {
cnt := fullResponse.Content.GetFullSyncResponse()
log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "full sync response sent")
}
}()
if len(request.Changes) != 0 && !t.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
})
if err != nil {
return
}
}
fullResponse, err = t.reqFactory.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
return
}
func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *treechangeproto.TreeFullSyncResponse) (err error) {
var (
objTree = t.objTree
)
log := log.With(
zap.Strings("heads", response.Heads),
zap.Int("len(changes)", len(response.Changes)))
log.DebugCtx(ctx, "received full sync response message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "full sync response succeeded")
}
}()
if t.alreadyHasHeads(objTree, response.Heads) {
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: response.Heads,
RawChanges: response.Changes,
})
return
}
func (t *treeSyncProtocol) alreadyHasHeads(ot objecttree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...)
}

View File

@ -4,7 +4,6 @@ package synctree
import ( import (
"context" "context"
"errors" "errors"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"time" "time"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
@ -44,7 +43,7 @@ type SyncTree interface {
type syncTree struct { type syncTree struct {
objecttree.ObjectTree objecttree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
syncClient syncclient.SyncClient syncClient SyncClient
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
notifiable HeadNotifiable notifiable HeadNotifiable
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
@ -61,7 +60,7 @@ type ResponsiblePeersGetter interface {
type BuildDeps struct { type BuildDeps struct {
SpaceId string SpaceId string
SyncClient syncclient.SyncClient SyncClient SyncClient
Configuration nodeconf.NodeConf Configuration nodeconf.NodeConf
HeadNotifiable HeadNotifiable HeadNotifiable HeadNotifiable
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener

View File

@ -9,7 +9,6 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
@ -19,7 +18,7 @@ import (
type syncTreeMatcher struct { type syncTreeMatcher struct {
objTree objecttree.ObjectTree objTree objecttree.ObjectTree
client syncclient.SyncClient client SyncClient
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
} }
@ -35,8 +34,8 @@ func (s syncTreeMatcher) String() string {
return "" return ""
} }
func syncClientFuncCreator(client syncclient.SyncClient) func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { func syncClientFuncCreator(client SyncClient) func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient {
return func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { return func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient {
return client return client
} }
} }

View File

@ -2,40 +2,64 @@ package synctree
import ( import (
"context" "context"
"errors"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync" "sync"
) )
var (
ErrMessageIsRequest = errors.New("message is request")
ErrMessageIsNotRequest = errors.New("message is not request")
)
type syncTreeHandler struct { type syncTreeHandler struct {
objTree objecttree.ObjectTree objTree objecttree.ObjectTree
syncClient syncclient.SyncClient syncClient SyncClient
syncStatus syncstatus.StatusUpdater syncProtocol TreeSyncProtocol
handlerLock sync.Mutex syncStatus syncstatus.StatusUpdater
spaceId string handlerLock sync.Mutex
queue ReceiveQueue spaceId string
queue ReceiveQueue
} }
const maxQueueSize = 5 const maxQueueSize = 5
// TODO: Make sync and async message handling func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient syncclient.SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{ return &syncTreeHandler{
objTree: objTree, objTree: objTree,
syncClient: syncClient, syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient),
syncStatus: syncStatus, syncClient: syncClient,
spaceId: spaceId, syncStatus: syncStatus,
queue: newReceiveQueue(maxQueueSize), spaceId: spaceId,
queue: newReceiveQueue(maxQueueSize),
} }
} }
func (s *syncTreeHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest()
if fullSyncRequest == nil {
err = ErrMessageIsNotRequest
return
}
s.syncStatus.HeadsReceive(senderId, request.ObjectId, treechangeproto.GetHeads(unmarshalled))
treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest)
if err != nil {
return
}
response, err = MarshallTreeMessage(treeResp, s.spaceId, request.ObjectId, "")
return
}
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{} unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled) err = proto.Unmarshal(msg.Payload, unmarshalled)
@ -55,181 +79,27 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) { func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) {
s.objTree.Lock() s.objTree.Lock()
defer s.objTree.Unlock() defer s.objTree.Unlock()
msg, replyId, err := s.queue.GetMessage(senderId) msg, _, err := s.queue.GetMessage(senderId)
if err != nil { if err != nil {
return return
} }
defer s.queue.ClearQueue(senderId) defer s.queue.ClearQueue(senderId)
treeId := s.objTree.Id()
content := msg.GetContent() content := msg.GetContent()
switch { switch {
case content.GetHeadUpdate() != nil: case content.GetHeadUpdate() != nil:
return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), replyId) var syncReq *treechangeproto.TreeSyncMessage
syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate())
if err != nil {
return
}
return s.syncClient.QueueRequest(senderId, treeId, syncReq)
case content.GetFullSyncRequest() != nil: case content.GetFullSyncRequest() != nil:
return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), replyId) return ErrMessageIsRequest
case content.GetFullSyncResponse() != nil: case content.GetFullSyncResponse() != nil:
return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
} }
return return
} }
func (s *syncTreeHandler) handleHeadUpdate(
ctx context.Context,
senderId string,
update *treechangeproto.TreeHeadUpdate,
replyId string) (err error) {
var (
fullRequest *treechangeproto.TreeSyncMessage
isEmptyUpdate = len(update.Changes) == 0
objTree = s.objTree
treeId = objTree.Id()
)
log := log.With(
zap.Strings("update heads", update.Heads),
zap.String("treeId", treeId),
zap.String("spaceId", s.spaceId),
zap.Int("len(update changes)", len(update.Changes)))
log.DebugCtx(ctx, "received head update message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "head update finished with error", zap.Error(err))
} else if fullRequest != nil {
cnt := fullRequest.Content.GetFullSyncRequest()
log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "sending full sync request")
} else {
if !isEmptyUpdate {
log.DebugCtx(ctx, "head update finished correctly")
}
}
}()
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
if headEquals {
return
}
// we need to sync in any case
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
if err != nil {
return
}
return s.syncClient.QueueRequest(senderId, treeId, fullRequest)
}
if s.alreadyHasHeads(objTree, update.Heads) {
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: update.Heads,
RawChanges: update.Changes,
})
if err != nil {
return
}
if s.alreadyHasHeads(objTree, update.Heads) {
return
}
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
if err != nil {
return
}
return s.syncClient.QueueRequest(senderId, treeId, fullRequest)
}
func (s *syncTreeHandler) handleFullSyncRequest(
ctx context.Context,
senderId string,
request *treechangeproto.TreeFullSyncRequest,
replyId string) (err error) {
var (
fullResponse *treechangeproto.TreeSyncMessage
header = s.objTree.Header()
objTree = s.objTree
treeId = s.objTree.Id()
)
log := log.With(zap.String("senderId", senderId),
zap.Strings("request heads", request.Heads),
zap.String("treeId", treeId),
zap.String("replyId", replyId),
zap.String("spaceId", s.spaceId),
zap.Int("len(request changes)", len(request.Changes)))
log.DebugCtx(ctx, "received full sync request message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err))
s.syncClient.QueueRequest(senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header))
return
} else if fullResponse != nil {
cnt := fullResponse.Content.GetFullSyncResponse()
log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes)))
log.DebugCtx(ctx, "full sync response sent")
}
}()
if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
})
if err != nil {
return
}
}
fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath)
if err != nil {
return
}
return s.syncClient.QueueRequest(senderId, treeId, fullResponse)
}
func (s *syncTreeHandler) handleFullSyncResponse(
ctx context.Context,
senderId string,
response *treechangeproto.TreeFullSyncResponse) (err error) {
var (
objTree = s.objTree
treeId = s.objTree.Id()
)
log := log.With(
zap.Strings("heads", response.Heads),
zap.String("treeId", treeId),
zap.String("spaceId", s.spaceId),
zap.Int("len(changes)", len(response.Changes)))
log.DebugCtx(ctx, "received full sync response message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "full sync response succeeded")
}
}()
if s.alreadyHasHeads(objTree, response.Heads) {
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: response.Heads,
RawChanges: response.Changes,
})
return
}
func (s *syncTreeHandler) alreadyHasHeads(t objecttree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...)
}

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"sync" "sync"
"testing" "testing"
@ -110,7 +109,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).Times(2) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).Times(2)
@ -139,7 +138,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes()
@ -172,7 +171,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes()
@ -193,7 +192,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes()
@ -218,7 +217,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes()
@ -251,7 +250,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT().Header().Return(nil) fx.objectTreeMock.EXPECT().Header().Return(nil)
@ -284,7 +283,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT(). fx.objectTreeMock.EXPECT().
Id().AnyTimes().Return(treeId) Id().AnyTimes().Return(treeId)
@ -313,7 +312,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg.RequestId = replyId objectMsg.RequestId = replyId
fx.objectTreeMock.EXPECT(). fx.objectTreeMock.EXPECT().
@ -340,7 +339,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.objectTreeMock.EXPECT(). fx.objectTreeMock.EXPECT().
Id().AnyTimes().Return(treeId) Id().AnyTimes().Return(treeId)
@ -381,7 +380,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId)
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT(). fx.objectTreeMock.EXPECT().
@ -414,7 +413,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) {
SnapshotPath: []string{"h1"}, SnapshotPath: []string{"h1"},
} }
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId)
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId)
fx.objectTreeMock.EXPECT(). fx.objectTreeMock.EXPECT().

View File

@ -7,7 +7,6 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
@ -89,14 +88,14 @@ type testSyncHandler struct {
peerId string peerId string
aclList list.AclList aclList list.AclList
log *messageLog log *messageLog
syncClient syncclient.SyncClient syncClient SyncClient
builder objecttree.BuildObjectTreeFunc builder objecttree.BuildObjectTreeFunc
} }
// createSyncHandler creates a sync handler when a tree is already created // createSyncHandler creates a sync handler when a tree is already created
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler {
factory := syncclient.NewRequestFactory() factory := NewRequestFactory()
syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) syncClient := NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
netTree := &broadcastTree{ netTree := &broadcastTree{
ObjectTree: objTree, ObjectTree: objTree,
SyncClient: syncClient, SyncClient: syncClient,
@ -107,8 +106,8 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler { func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler {
factory := syncclient.NewRequestFactory() factory := NewRequestFactory()
syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) syncClient := NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
batcher := mb.New[protocolMsg](0) batcher := mb.New[protocolMsg](0)
return &testSyncHandler{ return &testSyncHandler{
@ -140,9 +139,9 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re
return return
} }
if unmarshalled.Content.GetFullSyncResponse() == nil { if unmarshalled.Content.GetFullSyncResponse() == nil {
newTreeRequest := syncclient.NewRequestFactory().CreateNewTreeRequest() newTreeRequest := NewRequestFactory().CreateNewTreeRequest()
var objMsg *spacesyncproto.ObjectSyncMessage var objMsg *spacesyncproto.ObjectSyncMessage
objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") objMsg, err = MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil { if err != nil {
return return
} }
@ -167,8 +166,8 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re
} }
h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus()) h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus())
var objMsg *spacesyncproto.ObjectSyncMessage var objMsg *spacesyncproto.ObjectSyncMessage
newTreeRequest := syncclient.NewRequestFactory().CreateHeadUpdate(netTree, res.Added) newTreeRequest := NewRequestFactory().CreateHeadUpdate(netTree, res.Added)
objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") objMsg, err = MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil { if err != nil {
return return
} }
@ -278,7 +277,7 @@ func (m *testMessagePool) SendSync(ctx context.Context, peerId string, message *
// it is a simplified version of SyncTree which is easier to use in the test environment // it is a simplified version of SyncTree which is easier to use in the test environment
type broadcastTree struct { type broadcastTree struct {
objecttree.ObjectTree objecttree.ObjectTree
syncclient.SyncClient SyncClient
} }
func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) { func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {

View File

@ -17,8 +17,6 @@ var (
ErrSpaceClosed = errors.New("space is closed") ErrSpaceClosed = errors.New("space is closed")
) )
const CName = "common.commonspace.objectmanager"
type ObjectManager interface { type ObjectManager interface {
treemanager.TreeManager treemanager.TreeManager
AddObject(object syncobjectgetter.SyncObject) AddObject(object syncobjectgetter.SyncObject)
@ -62,7 +60,7 @@ func (o *objectManager) AddObject(object syncobjectgetter.SyncObject) {
} }
func (o *objectManager) Name() string { func (o *objectManager) Name() string {
return CName return treemanager.CName
} }
func (o *objectManager) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { func (o *objectManager) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
@ -30,6 +29,7 @@ var log = logger.NewNamed(CName)
type ObjectSync interface { type ObjectSync interface {
LastUsage() time.Time LastUsage() time.Time
HandleMessage(ctx context.Context, hm HandleMessage) (err error) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
CloseThread(id string) (err error) CloseThread(id string) (err error)
app.ComponentRunnable app.ComponentRunnable
} }
@ -56,7 +56,6 @@ func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
type objectSync struct { type objectSync struct {
spaceId string spaceId string
syncClient syncclient.SyncClient
objectGetter syncobjectgetter.SyncObjectGetter objectGetter syncobjectgetter.SyncObjectGetter
configuration nodeconf.NodeConf configuration nodeconf.NodeConf
spaceStorage spacestorage.SpaceStorage spaceStorage spacestorage.SpaceStorage
@ -67,8 +66,7 @@ type objectSync struct {
} }
func (s *objectSync) Init(a *app.App) (err error) { func (s *objectSync) Init(a *app.App) (err error) {
s.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient) s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a) s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a)
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
@ -103,6 +101,10 @@ func (s *objectSync) LastUsage() time.Time {
return time.Time{} return time.Time{}
} }
func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
}
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
threadId := hm.Message.ObjectId threadId := hm.Message.ObjectId
hm.ReceiveTime = time.Now() hm.ReceiveTime = time.Now()

View File

@ -6,5 +6,6 @@ import (
) )
type SyncHandler interface { type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error)
} }

View File

@ -14,8 +14,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
@ -58,10 +58,11 @@ func New() TreeBuilderComponent {
} }
type treeBuilder struct { type treeBuilder struct {
syncClient syncclient.SyncClient syncClient synctree.SyncClient
configuration nodeconf.NodeConf configuration nodeconf.NodeConf
headsNotifiable synctree.HeadNotifiable headsNotifiable synctree.HeadNotifiable
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
requestManager requestmanager.RequestManager
spaceStorage spacestorage.SpaceStorage spaceStorage spacestorage.SpaceStorage
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
objectSync objectsync.ObjectSync objectSync objectsync.ObjectSync
@ -81,14 +82,15 @@ func (t *treeBuilder) Init(a *app.App) (err error) {
t.treesUsed = state.TreesUsed t.treesUsed = state.TreesUsed
t.builder = state.TreeBuilderFunc t.builder = state.TreeBuilderFunc
t.aclList = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl) t.aclList = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl)
t.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) t.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync) t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync)
t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
t.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager) t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
t.requestManager = a.MustComponent(requestmanager.CName).(requestmanager.RequestManager)
t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync) t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync)
t.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient)
t.log = log.With(zap.String("spaceId", t.spaceId)) t.log = log.With(zap.String("spaceId", t.spaceId))
t.syncClient = synctree.NewSyncClient(t.spaceId, t.requestManager, t.peerManager)
return nil return nil
} }

View File

@ -9,8 +9,7 @@ import (
) )
const ( const (
ProviderName = "common.commonspace.peermanagerprovider" CName = "common.commonspace.peermanager"
ManagerName = "common.commonspace.peermanager"
) )
type PeerManager interface { type PeerManager interface {

View File

@ -0,0 +1,49 @@
package requestmanager
import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
const CName = "common.commonspace.requestmanager"
var log = logger.NewNamed(CName)
type RequestManager interface {
app.ComponentRunnable
SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() RequestManager {
return &requestManager{}
}
type requestManager struct {
}
func (r *requestManager) Init(a *app.App) (err error) {
return
}
func (r *requestManager) Name() (name string) {
return CName
}
func (r *requestManager) Run(ctx context.Context) (err error) {
return nil
}
func (r *requestManager) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestManager) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
return nil, nil
}
func (r *requestManager) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}

View File

@ -1,49 +0,0 @@
package requestsender
import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
const CName = "common.commonspace.requestsender"
var log = logger.NewNamed(CName)
type RequestSender interface {
app.ComponentRunnable
SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() RequestSender {
return &requestSender{}
}
type requestSender struct {
}
func (r *requestSender) Init(a *app.App) (err error) {
return
}
func (r *requestSender) Name() (name string) {
return CName
}
func (r *requestSender) Run(ctx context.Context) (err error) {
return nil
}
func (r *requestSender) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestSender) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
return nil, nil
}
func (r *requestSender) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}

View File

@ -55,7 +55,7 @@ func (s *settings) Init(a *app.App) (err error) {
s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent) s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.spaceIsDeleted = sharedState.SpaceIsDeleted s.spaceIsDeleted = sharedState.SpaceIsDeleted
deps := Deps{ deps := Deps{

View File

@ -121,7 +121,7 @@ func (s *space) Init(ctx context.Context) (err error) {
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings) s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync) s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
s.storage = s.app.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
return nil return nil
} }

View File

@ -16,15 +16,13 @@ import (
"github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objectmanager" "github.com/anyproto/any-sync/commonspace/objectmanager"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder" "github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestsender" "github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/settings" "github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/streamsender"
"github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/pool"
@ -69,10 +67,10 @@ type spaceService struct {
func (s *spaceService) Init(a *app.App) (err error) { func (s *spaceService) Init(a *app.App) (err error) {
s.config = a.MustComponent("config").(config.ConfigGetter).GetSpace() s.config = a.MustComponent("config").(config.ConfigGetter).GetSpace()
s.account = a.MustComponent(accountservice.CName).(accountservice.Service) s.account = a.MustComponent(accountservice.CName).(accountservice.Service)
s.storageProvider = a.MustComponent(spacestorage.ProviderName).(spacestorage.SpaceStorageProvider) s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
s.peermanagerProvider = a.MustComponent(peermanager.ProviderName).(peermanager.PeerManagerProvider) s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.metric, _ = a.Component(metric.CName).(metric.Metric) s.metric, _ = a.Component(metric.CName).(metric.Metric)
s.app = a s.app = a
@ -172,12 +170,10 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
Register(peerManager). Register(peerManager).
Register(newCommonStorage(st)). Register(newCommonStorage(st)).
Register(syncacl.New()). Register(syncacl.New()).
Register(streamsender.New()). Register(requestmanager.New()).
Register(requestsender.New()).
Register(deletionstate.New()). Register(deletionstate.New()).
Register(settings.New()). Register(settings.New()).
Register(objectmanager.New(s.treeManager)). Register(objectmanager.New(s.treeManager)).
Register(syncclient.New()).
Register(objecttreebuilder.New()). Register(objecttreebuilder.New()).
Register(objectsync.New()). Register(objectsync.New()).
Register(headsync.New()) Register(headsync.New())

View File

@ -22,7 +22,7 @@ func (i *InMemorySpaceStorageProvider) Init(a *app.App) (err error) {
} }
func (i *InMemorySpaceStorageProvider) Name() (name string) { func (i *InMemorySpaceStorageProvider) Name() (name string) {
return ProviderName return CName
} }
func (i *InMemorySpaceStorageProvider) WaitSpaceStorage(ctx context.Context, id string) (SpaceStorage, error) { func (i *InMemorySpaceStorageProvider) WaitSpaceStorage(ctx context.Context, id string) (SpaceStorage, error) {

View File

@ -27,7 +27,7 @@ func (i *InMemorySpaceStorage) Init(a *app.App) (err error) {
} }
func (i *InMemorySpaceStorage) Name() (name string) { func (i *InMemorySpaceStorage) Name() (name string) {
return StorageName return CName
} }
func NewInMemorySpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) { func NewInMemorySpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) {

View File

@ -12,10 +12,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
) )
const ( const CName = "common.commonspace.spacestorage"
ProviderName = "common.commonspace.spacestorageprovider"
StorageName = "common.commonspace.spacestorage"
)
var ( var (
ErrSpaceStorageExists = errors.New("space storage exists") ErrSpaceStorageExists = errors.New("space storage exists")

View File

@ -136,7 +136,7 @@ func (p *mockPeerManager) Init(a *app.App) (err error) {
} }
func (p *mockPeerManager) Name() (name string) { func (p *mockPeerManager) Name() (name string) {
return peermanager.ManagerName return peermanager.CName
} }
func (p *mockPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (p *mockPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
@ -163,7 +163,7 @@ func (m *mockPeerManagerProvider) Init(a *app.App) (err error) {
} }
func (m *mockPeerManagerProvider) Name() (name string) { func (m *mockPeerManagerProvider) Name() (name string) {
return peermanager.ProviderName return peermanager.CName
} }
func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) { func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) {

View File

@ -1,37 +0,0 @@
package streamsender
import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
const CName = "common.commonspace.streamsender"
type StreamSender interface {
app.Component
SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() StreamSender {
return &streamSender{}
}
type streamSender struct {
}
func (s *streamSender) Init(a *app.App) (err error) {
return
}
func (s *streamSender) Name() (name string) {
return CName
}
func (s *streamSender) SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}
func (s *streamSender) Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}

View File

@ -104,7 +104,7 @@ func (s *syncStatusProvider) Init(a *app.App) (err error) {
s.updateTimeout = syncTimeout s.updateTimeout = syncTimeout
s.spaceId = sharedState.SpaceId s.spaceId = sharedState.SpaceId
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
return return
} }