diff --git a/commonspace/deletionstate/deletionstate.go b/commonspace/deletionstate/deletionstate.go index 96f062e3..64da5417 100644 --- a/commonspace/deletionstate/deletionstate.go +++ b/commonspace/deletionstate/deletionstate.go @@ -44,7 +44,7 @@ func (st *objectDeletionState) Name() (name string) { return CName } -func NewObjectDeletionState() ObjectDeletionState { +func New() ObjectDeletionState { return &objectDeletionState{ log: log, queued: map[string]struct{}{}, diff --git a/commonspace/settings/settingsstate/deletionstate_test.go b/commonspace/deletionstate/deletionstate_test.go similarity index 96% rename from commonspace/settings/settingsstate/deletionstate_test.go rename to commonspace/deletionstate/deletionstate_test.go index ca2ea679..d95bcd9b 100644 --- a/commonspace/settings/settingsstate/deletionstate_test.go +++ b/commonspace/deletionstate/deletionstate_test.go @@ -1,4 +1,4 @@ -package settingsstate +package deletionstate import ( "github.com/anyproto/any-sync/app/logger" @@ -19,7 +19,7 @@ type fixture struct { func newFixture(t *testing.T) *fixture { ctrl := gomock.NewController(t) spaceStorage := mock_spacestorage.NewMockSpaceStorage(ctrl) - delState := NewObjectDeletionState(logger.NewNamed("test"), spaceStorage).(*objectDeletionState) + delState := New(logger.NewNamed("test"), spaceStorage).(*objectDeletionState) return &fixture{ ctrl: ctrl, delState: delState, diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 70238d01..1529bf6f 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -6,9 +6,9 @@ import ( "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" + "github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -22,7 +22,7 @@ type DiffSyncer interface { Sync(ctx context.Context) error RemoveObjects(ids []string) UpdateHeads(id string, heads []string) - Init(deletionState settingsstate.ObjectDeletionState) + Init() Close() error } @@ -37,6 +37,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer { credentialProvider: hs.credentialProvider, log: log, syncStatus: hs.syncStatus, + deletionState: hs.deletionState, } } @@ -48,14 +49,13 @@ type diffSyncer struct { storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory log logger.CtxLogger - deletionState settingsstate.ObjectDeletionState + deletionState deletionstate.ObjectDeletionState credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusUpdater treeSyncer treemanager.TreeSyncer } -func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { - d.deletionState = deletionState +func (d *diffSyncer) Init() { d.deletionState.AddObserver(d.RemoveObjects) d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId, d.treeManager) } diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 048569bb..0d761fd5 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -7,10 +7,9 @@ import ( "github.com/anyproto/any-sync/app/logger" config2 "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" - "github.com/anyproto/any-sync/commonspace/headsync" + "github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" @@ -18,6 +17,7 @@ import ( "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/util/periodicsync" + "github.com/anyproto/any-sync/util/slice" "go.uber.org/zap" "golang.org/x/exp/slices" "sync/atomic" @@ -34,11 +34,13 @@ type TreeHeads struct { } type HeadSync interface { + app.ComponentRunnable + ExternalIds() []string + DebugAllHeads() (res []TreeHeads) + AllIds() []string UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) RemoveObjects(ids []string) - AllIds() []string - DebugAllHeads() (res []TreeHeads) } type headSync struct { @@ -50,16 +52,16 @@ type headSync struct { storage spacestorage.SpaceStorage diff ldiff.Diff log logger.CtxLogger - syncer headsync.DiffSyncer + syncer DiffSyncer configuration nodeconf.NodeConf peerManager peermanager.PeerManager treeManager treemanager.TreeManager credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusProvider - deletionState settingsstate.ObjectDeletionState + deletionState deletionstate.ObjectDeletionState } -func New() *headSync { +func New() HeadSync { return &headSync{} } @@ -77,7 +79,7 @@ func (h *headSync) Init(a *app.App) (err error) { h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) - h.deletionState = a.MustComponent("deletionstate").(settingsstate.ObjectDeletionState) + h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) h.syncer = newDiffSyncer(h) sync := func(ctx context.Context) (err error) { // for clients cancelling the sync process @@ -87,6 +89,8 @@ func (h *headSync) Init(a *app.App) (err error) { return h.syncer.Sync(ctx) } h.periodicSync = periodicsync.NewPeriodicSync(h.syncPeriod, time.Minute, sync, h.log) + // TODO: move to run? + h.syncer.Init() return nil } @@ -126,6 +130,13 @@ func (h *headSync) AllIds() []string { return h.diff.Ids() } +func (h *headSync) ExternalIds() []string { + settingsId := h.storage.SpaceSettingsId() + return slice.DiscardFromSlice(h.AllIds(), func(id string) bool { + return id == settingsId + }) +} + func (h *headSync) DebugAllHeads() (res []TreeHeads) { els := h.diff.Elements() for _, el := range els { diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index dd1e647e..cc728e4a 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -2,20 +2,17 @@ package syncacl import ( "github.com/anyproto/any-sync/commonspace/object/acl/list" - "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" ) type SyncAcl struct { list.AclList synchandler.SyncHandler - messagePool objectsync.MessagePool } -func NewSyncAcl(aclList list.AclList, messagePool objectsync.MessagePool) *SyncAcl { +func NewSyncAcl(aclList list.AclList) *SyncAcl { return &SyncAcl{ AclList: aclList, SyncHandler: nil, - messagePool: messagePool, } } diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go index 8f57a9f2..a6d553a3 100644 --- a/commonspace/object/tree/synctree/utils_test.go +++ b/commonspace/object/tree/synctree/utils_test.go @@ -96,7 +96,7 @@ type testSyncHandler struct { // createSyncHandler creates a sync handler when a tree is already created func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { factory := syncclient.NewRequestFactory() - syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) + syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) netTree := &broadcastTree{ ObjectTree: objTree, SyncClient: syncClient, @@ -108,7 +108,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler { factory := syncclient.NewRequestFactory() - syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) + syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) batcher := mb.New[protocolMsg](0) return &testSyncHandler{ diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 64ab66f6..0a836cff 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -29,6 +29,9 @@ const CName = "common.commonspace.objectsync" var log = logger.NewNamed(CName) type ObjectSync interface { + LastUsage() time.Time + HandleMessage(ctx context.Context, hm HandleMessage) (err error) + CloseThread(id string) (err error) app.ComponentRunnable } @@ -88,10 +91,15 @@ func (s *objectSync) Close(ctx context.Context) (err error) { return s.handleQueue.Close() } -func NewObjectSync() ObjectSync { +func New() ObjectSync { return &objectSync{} } +func (s *objectSync) LastUsage() time.Time { + // TODO: add time + return time.Time{} +} + func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { threadId := hm.Message.ObjectId hm.ReceiveTime = time.Now() @@ -169,3 +177,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp } return } + +func (s *objectSync) CloseThread(id string) (err error) { + return s.handleQueue.CloseThread(id) +} diff --git a/commonspace/objectsync/syncclient/syncclient.go b/commonspace/objectsync/syncclient/syncclient.go index 6041a2d0..99360eab 100644 --- a/commonspace/objectsync/syncclient/syncclient.go +++ b/commonspace/objectsync/syncclient/syncclient.go @@ -32,7 +32,7 @@ type syncClient struct { streamSender streamsender.StreamSender } -func NewSyncClient() SyncClient { +func New() SyncClient { return &syncClient{} } diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go index f8bf346b..cf15c49c 100644 --- a/commonspace/objecttreebuilder/treebuilder.go +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -12,6 +12,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" + "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestate" @@ -40,12 +41,19 @@ type HistoryTreeOpts struct { } type TreeBuilder interface { - app.Component BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) - SetOnCloseHandler(handler func(id string)) +} + +type TreeBuilderComponent interface { + app.Component + TreeBuilder +} + +func New() TreeBuilderComponent { + return &treeBuilder{} } type treeBuilder struct { @@ -55,6 +63,7 @@ type treeBuilder struct { peerManager peermanager.PeerManager spaceStorage spacestorage.SpaceStorage syncStatus syncstatus.StatusUpdater + objectSync objectsync.ObjectSync log logger.CtxLogger builder objecttree.BuildObjectTreeFunc @@ -62,7 +71,6 @@ type treeBuilder struct { aclList list.AclList treesUsed *atomic.Int32 isClosed *atomic.Bool - onClose func(id string) } func (t *treeBuilder) Init(a *app.App) (err error) { @@ -78,8 +86,8 @@ func (t *treeBuilder) Init(a *app.App) (err error) { t.spaceStorage = state.SpaceStorage t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) + t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync) t.log = log.With(zap.String("spaceId", t.spaceId)) - t.onClose = state.Actions.OnObjectDelete return nil } @@ -87,10 +95,6 @@ func (t *treeBuilder) Name() (name string) { return CName } -func (t *treeBuilder) SetOnCloseHandler(handler func(id string)) { - t.onClose = handler -} - func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (ot objecttree.ObjectTree, err error) { if t.isClosed.Load() { // TODO: change to real error @@ -189,3 +193,9 @@ func (t *treeBuilder) PutTree(ctx context.Context, payload treestorage.TreeStora t.log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", t.treesUsed.Load())) return } + +func (t *treeBuilder) onClose(id string) { + t.treesUsed.Add(-1) + log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.String("spaceId", t.spaceId)) + _ = t.objectSync.CloseThread(id) +} diff --git a/commonspace/requestsender/requestsender.go b/commonspace/requestsender/requestsender.go index 1ba166fb..d7143b4a 100644 --- a/commonspace/requestsender/requestsender.go +++ b/commonspace/requestsender/requestsender.go @@ -17,6 +17,10 @@ type RequestSender interface { QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) } +func New() RequestSender { + return &requestSender{} +} + type requestSender struct { } diff --git a/commonspace/settings/settings.go b/commonspace/settings/settings.go index 4e2177ae..95252388 100644 --- a/commonspace/settings/settings.go +++ b/commonspace/settings/settings.go @@ -9,52 +9,53 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/objecttreebuilder" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/nodeconf" "go.uber.org/zap" + "sync/atomic" ) const CName = "common.commonspace.settings" type Settings interface { + DeleteTree(ctx context.Context, id string) (err error) + SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) + DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) app.ComponentRunnable } +func New() Settings { + return &settings{} +} + type settings struct { - account accountservice.Service - treeManager treemanager.TreeManager - storage spacestorage.SpaceStorage - configuration nodeconf.NodeConf - deletionState deletionstate.ObjectDeletionState - headsync headsync.HeadSync - spaceActions spacestate.SpaceActions - treeBuilder objecttreebuilder.TreeBuilder + account accountservice.Service + treeManager treemanager.TreeManager + storage spacestorage.SpaceStorage + configuration nodeconf.NodeConf + deletionState deletionstate.ObjectDeletionState + headsync headsync.HeadSync + treeBuilder objecttreebuilder.TreeBuilderComponent + spaceIsDeleted *atomic.Bool settingsObject SettingsObject } -func (s *settings) Run(ctx context.Context) (err error) { - return s.settingsObject.Init(ctx) -} - -func (s *settings) Close(ctx context.Context) (err error) { - return s.settingsObject.Close() -} - func (s *settings) Init(a *app.App) (err error) { s.account = a.MustComponent(accountservice.CName).(accountservice.Service) s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) s.headsync = a.MustComponent(headsync.CName).(headsync.HeadSync) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) s.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) - s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilder) + s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent) sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) - s.spaceActions = sharedState.Actions s.storage = sharedState.SpaceStorage + s.spaceIsDeleted = sharedState.SpaceIsDeleted deps := Deps{ BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) { @@ -77,7 +78,7 @@ func (s *settings) Init(a *app.App) (err error) { Configuration: s.configuration, DeletionState: s.deletionState, Provider: s.headsync, - OnSpaceDelete: s.spaceActions.OnSpaceDelete, + OnSpaceDelete: s.onSpaceDelete, } s.settingsObject = NewSettingsObject(deps, sharedState.SpaceId) return nil @@ -86,3 +87,31 @@ func (s *settings) Init(a *app.App) (err error) { func (s *settings) Name() (name string) { return CName } + +func (s *settings) Run(ctx context.Context) (err error) { + return s.settingsObject.Init(ctx) +} + +func (s *settings) Close(ctx context.Context) (err error) { + return s.settingsObject.Close() +} + +func (s *settings) DeleteTree(ctx context.Context, id string) (err error) { + return s.settingsObject.DeleteObject(id) +} + +func (s *settings) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) { + return s.settingsObject.SpaceDeleteRawChange() +} + +func (s *settings) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) { + return s.settingsObject.DeleteSpace(ctx, deleteChange) +} + +func (s *settings) onSpaceDelete() { + err := s.storage.SetSpaceDeleted() + if err != nil { + log.Warn("failed to set space deleted") + } + s.spaceIsDeleted.Swap(true) +} diff --git a/commonspace/settings/settingsstate/deletionstate.go b/commonspace/settings/settingsstate/deletionstate.go deleted file mode 100644 index f36f4fd0..00000000 --- a/commonspace/settings/settingsstate/deletionstate.go +++ /dev/null @@ -1,135 +0,0 @@ -//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory -package settingsstate - -import ( - "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace/spacestorage" - "go.uber.org/zap" - "sync" -) - -type StateUpdateObserver func(ids []string) - -type ObjectDeletionState interface { - AddObserver(observer StateUpdateObserver) - Add(ids map[string]struct{}) - GetQueued() (ids []string) - Delete(id string) (err error) - Exists(id string) bool - Filter(ids []string) (filtered []string) -} - -type objectDeletionState struct { - sync.RWMutex - log logger.CtxLogger - queued map[string]struct{} - deleted map[string]struct{} - stateUpdateObservers []StateUpdateObserver - storage spacestorage.SpaceStorage -} - -func NewObjectDeletionState(log logger.CtxLogger, storage spacestorage.SpaceStorage) ObjectDeletionState { - return &objectDeletionState{ - log: log, - queued: map[string]struct{}{}, - deleted: map[string]struct{}{}, - storage: storage, - } -} - -func (st *objectDeletionState) AddObserver(observer StateUpdateObserver) { - st.Lock() - defer st.Unlock() - st.stateUpdateObservers = append(st.stateUpdateObservers, observer) -} - -func (st *objectDeletionState) Add(ids map[string]struct{}) { - var added []string - st.Lock() - defer func() { - st.Unlock() - for _, ob := range st.stateUpdateObservers { - ob(added) - } - }() - - for id := range ids { - if _, exists := st.deleted[id]; exists { - continue - } - if _, exists := st.queued[id]; exists { - continue - } - - var status string - status, err := st.storage.TreeDeletedStatus(id) - if err != nil { - st.log.Warn("failed to get deleted status", zap.String("treeId", id), zap.Error(err)) - continue - } - - switch status { - case spacestorage.TreeDeletedStatusQueued: - st.queued[id] = struct{}{} - case spacestorage.TreeDeletedStatusDeleted: - st.deleted[id] = struct{}{} - default: - err := st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusQueued) - if err != nil { - st.log.Warn("failed to set deleted status", zap.String("treeId", id), zap.Error(err)) - continue - } - st.queued[id] = struct{}{} - } - added = append(added, id) - } -} - -func (st *objectDeletionState) GetQueued() (ids []string) { - st.RLock() - defer st.RUnlock() - ids = make([]string, 0, len(st.queued)) - for id := range st.queued { - ids = append(ids, id) - } - return -} - -func (st *objectDeletionState) Delete(id string) (err error) { - st.Lock() - defer st.Unlock() - delete(st.queued, id) - st.deleted[id] = struct{}{} - err = st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusDeleted) - if err != nil { - return - } - return -} - -func (st *objectDeletionState) Exists(id string) bool { - st.RLock() - defer st.RUnlock() - return st.exists(id) -} - -func (st *objectDeletionState) Filter(ids []string) (filtered []string) { - st.RLock() - defer st.RUnlock() - for _, id := range ids { - if !st.exists(id) { - filtered = append(filtered, id) - } - } - return -} - -func (st *objectDeletionState) exists(id string) bool { - if _, exists := st.deleted[id]; exists { - return true - } - if _, exists := st.queued[id]; exists { - return true - } - return false -} diff --git a/commonspace/settings/settingsstate/settingsstate.go b/commonspace/settings/settingsstate/settingsstate.go index a6c8b0b6..20619cb2 100644 --- a/commonspace/settings/settingsstate/settingsstate.go +++ b/commonspace/settings/settingsstate/settingsstate.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory +//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate StateBuilder,ChangeFactory package settingsstate import "github.com/anyproto/any-sync/commonspace/spacesyncproto" diff --git a/commonspace/space.go b/commonspace/space.go index d8574acf..a5732e5b 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -3,36 +3,21 @@ package commonspace import ( "context" "errors" - "github.com/anyproto/any-sync/accountservice" - "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/headsync" - "github.com/anyproto/any-sync/commonspace/object/acl/list" - "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" - "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/objectsync" - "github.com/anyproto/any-sync/commonspace/peermanager" + "github.com/anyproto/any-sync/commonspace/objecttreebuilder" "github.com/anyproto/any-sync/commonspace/settings" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate" + "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" - "github.com/anyproto/any-sync/metric" - "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/util/crypto" - "github.com/anyproto/any-sync/util/multiqueue" - "github.com/anyproto/any-sync/util/slice" - "github.com/cheggaaa/mb/v3" - "github.com/zeebo/errs" "go.uber.org/zap" "strconv" "strings" "sync" - "sync/atomic" "time" ) @@ -55,25 +40,6 @@ type SpaceCreatePayload struct { MasterKey crypto.PrivKey } -type HandleMessage struct { - Id uint64 - ReceiveTime time.Time - StartHandlingTime time.Time - Deadline time.Time - SenderId string - Message *spacesyncproto.ObjectSyncMessage - PeerCtx context.Context -} - -func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { - return append(fields, - metric.SpaceId(m.Message.SpaceId), - metric.ObjectId(m.Message.ObjectId), - metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)), - metric.TotalDur(time.Since(m.ReceiveTime)), - ) -} - type SpaceDerivePayload struct { SigningKey crypto.PrivKey MasterKey crypto.PrivKey @@ -97,145 +63,72 @@ type Space interface { Id() string Init(ctx context.Context) error - StoredIds() []string - DebugAllHeads() []headsync.TreeHeads - Description() (SpaceDescription, error) - - CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) - PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) - BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) - DeleteTree(ctx context.Context, id string) (err error) - BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) - - SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) - DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) - - HeadSync() headsync.HeadSync - ObjectSync() objectsync.ObjectSync + TreeBuilder() objecttreebuilder.TreeBuilder SyncStatus() syncstatus.StatusUpdater Storage() spacestorage.SpaceStorage - HandleMessage(ctx context.Context, msg HandleMessage) (err error) + DeleteTree(ctx context.Context, id string) (err error) + SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) + DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) + + HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error) TryClose(objectTTL time.Duration) (close bool, err error) Close() error } type space struct { - id string mu sync.RWMutex header *spacesyncproto.RawSpaceHeaderWithId - objectSync objectsync.ObjectSync - headSync headsync.HeadSync - syncStatus syncstatus.StatusUpdater - storage spacestorage.SpaceStorage - treeManager *objectManager - account accountservice.Service - aclList *syncacl.SyncAcl - configuration nodeconf.NodeConf - settingsObject settings.SettingsObject - peerManager peermanager.PeerManager - treeBuilder objecttree.BuildObjectTreeFunc - metric metric.Metric + state *spacestate.SpaceState + app *app.App - handleQueue multiqueue.MultiQueue[HandleMessage] + treeBuilder objecttreebuilder.TreeBuilderComponent + headSync headsync.HeadSync + objectSync objectsync.ObjectSync + syncStatus syncstatus.StatusProvider + settings settings.Settings +} - isClosed *atomic.Bool - isDeleted *atomic.Bool - treesUsed *atomic.Int32 +func (s *space) DeleteTree(ctx context.Context, id string) (err error) { + return s.settings.DeleteTree(ctx, id) +} + +func (s *space) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) { + return s.settings.SpaceDeleteRawChange(ctx) +} + +func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) { + return s.settings.DeleteSpace(ctx, deleteChange) +} + +func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error) { + return s.objectSync.HandleMessage(ctx, msg) +} + +func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder { + return s.treeBuilder } func (s *space) Id() string { - return s.id -} - -func (s *space) Description() (desc SpaceDescription, err error) { - root := s.aclList.Root() - settingsStorage, err := s.storage.TreeStorage(s.storage.SpaceSettingsId()) - if err != nil { - return - } - settingsRoot, err := settingsStorage.Root() - if err != nil { - return - } - - desc = SpaceDescription{ - SpaceHeader: s.header, - AclId: root.Id, - AclPayload: root.Payload, - SpaceSettingsId: settingsRoot.Id, - SpaceSettingsPayload: settingsRoot.RawChange, - } - return + return s.state.SpaceId } func (s *space) Init(ctx context.Context) (err error) { - log.With(zap.String("spaceId", s.id)).Debug("initializing space") - s.storage = newCommonStorage(s.storage) - - header, err := s.storage.SpaceHeader() + err = s.app.Start(ctx) if err != nil { return } - s.header = header - initialIds, err := s.storage.StoredIds() - if err != nil { - return - } - aclStorage, err := s.storage.AclStorage() - if err != nil { - return - } - aclList, err := list.BuildAclListWithIdentity(s.account.Account(), aclStorage) - if err != nil { - return - } - s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.SyncClient().MessagePool()) - s.treeManager.AddObject(s.aclList) - - deletionState := settingsstate.NewObjectDeletionState(log, s.storage) - deps := settings.Deps{ - BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) { - res, err := s.BuildTree(ctx, id, BuildTreeOpts{ - Listener: listener, - WaitTreeRemoteSync: false, - // space settings document should not have empty data - treeBuilder: objecttree.BuildObjectTree, - }) - log.Debug("building settings tree", zap.String("id", id), zap.String("spaceId", s.id)) - if err != nil { - return - } - t = res.(synctree.SyncTree) - return - }, - Account: s.account, - TreeManager: s.treeManager, - Store: s.storage, - DeletionState: deletionState, - Provider: s.headSync, - Configuration: s.configuration, - OnSpaceDelete: s.onSpaceDelete, - } - s.settingsObject = settings.NewSettingsObject(deps, s.id) - s.headSync.Init(initialIds, deletionState) - err = s.settingsObject.Init(ctx) - if err != nil { - return - } - s.treeManager.AddObject(s.settingsObject) - s.syncStatus.Run() - s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 100) + s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent) + s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync) + s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) + s.settings = s.app.MustComponent(settings.CName).(settings.Settings) + s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync) return nil } -func (s *space) ObjectSync() objectsync.ObjectSync { - return s.objectSync -} - -func (s *space) HeadSync() headsync.HeadSync { +func (s *space) HeadSync() headsync.HeadSyncExternal { return s.headSync } @@ -244,249 +137,28 @@ func (s *space) SyncStatus() syncstatus.StatusUpdater { } func (s *space) Storage() spacestorage.SpaceStorage { - return s.storage -} - -func (s *space) StoredIds() []string { - return slice.DiscardFromSlice(s.headSync.AllIds(), func(id string) bool { - return id == s.settingsObject.Id() - }) -} - -func (s *space) DebugAllHeads() []headsync.TreeHeads { - return s.headSync.DebugAllHeads() -} - -func (s *space) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) { - if s.isClosed.Load() { - err = ErrSpaceClosed - return - } - root, err := objecttree.CreateObjectTreeRoot(payload, s.aclList) - if err != nil { - return - } - - res = treestorage.TreeStorageCreatePayload{ - RootRawChange: root, - Changes: []*treechangeproto.RawTreeChangeWithId{root}, - Heads: []string{root.Id}, - } - return -} - -func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) { - if s.isClosed.Load() { - err = ErrSpaceClosed - return - } - deps := synctree.BuildDeps{ - SpaceId: s.id, - SyncClient: s.objectSync.SyncClient(), - Configuration: s.configuration, - HeadNotifiable: s.headSync, - Listener: listener, - AclList: s.aclList, - SpaceStorage: s.storage, - OnClose: s.onObjectClose, - SyncStatus: s.syncStatus, - PeerGetter: s.peerManager, - BuildObjectTree: s.treeBuilder, - } - t, err = synctree.PutSyncTree(ctx, payload, deps) - if err != nil { - return - } - s.treesUsed.Add(1) - log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id)) - return -} - -type BuildTreeOpts struct { - Listener updatelistener.UpdateListener - WaitTreeRemoteSync bool - treeBuilder objecttree.BuildObjectTreeFunc -} - -type HistoryTreeOpts struct { - BeforeId string - Include bool - BuildFullTree bool -} - -func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { - if s.isClosed.Load() { - err = ErrSpaceClosed - return - } - treeBuilder := opts.treeBuilder - if treeBuilder == nil { - treeBuilder = s.treeBuilder - } - deps := synctree.BuildDeps{ - SpaceId: s.id, - SyncClient: s.objectSync.SyncClient(), - Configuration: s.configuration, - HeadNotifiable: s.headSync, - Listener: opts.Listener, - AclList: s.aclList, - SpaceStorage: s.storage, - OnClose: s.onObjectClose, - SyncStatus: s.syncStatus, - WaitTreeRemoteSync: opts.WaitTreeRemoteSync, - PeerGetter: s.peerManager, - BuildObjectTree: treeBuilder, - } - s.treesUsed.Add(1) - log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id)) - if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { - s.treesUsed.Add(-1) - log.Debug("decrementing counter, load failed", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id), zap.Error(err)) - return nil, err - } - return -} - -func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { - if s.isClosed.Load() { - err = ErrSpaceClosed - return - } - - params := objecttree.HistoryTreeParams{ - AclList: s.aclList, - BeforeId: opts.BeforeId, - IncludeBeforeId: opts.Include, - BuildFullTree: opts.BuildFullTree, - } - params.TreeStorage, err = s.storage.TreeStorage(id) - if err != nil { - return - } - return objecttree.BuildHistoryTree(params) -} - -func (s *space) DeleteTree(ctx context.Context, id string) (err error) { - return s.settingsObject.DeleteObject(id) -} - -func (s *space) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) { - return s.settingsObject.SpaceDeleteRawChange() -} - -func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) { - return s.settingsObject.DeleteSpace(ctx, deleteChange) -} - -func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { - threadId := hm.Message.ObjectId - hm.ReceiveTime = time.Now() - if hm.Message.ReplyId != "" { - threadId += hm.Message.ReplyId - defer func() { - _ = s.handleQueue.CloseThread(threadId) - }() - } - if hm.PeerCtx == nil { - hm.PeerCtx = ctx - } - err = s.handleQueue.Add(ctx, threadId, hm) - if err == mb.ErrOverflowed { - log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId)) - // skip overflowed error - return nil - } - return -} - -func (s *space) handleMessage(msg HandleMessage) { - var err error - msg.StartHandlingTime = time.Now() - ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) - ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) - defer func() { - if s.metric == nil { - return - } - s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields( - zap.Error(err), - )...) - }() - - if !msg.Deadline.IsZero() { - now := time.Now() - if now.After(msg.Deadline) { - log.InfoCtx(ctx, "skip message: deadline exceed") - err = context.DeadlineExceeded - return - } - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, msg.Deadline) - defer cancel() - } - - if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { - if msg.Message.ObjectId != "" { - // cleanup thread on error - _ = s.handleQueue.CloseThread(msg.Message.ObjectId) - } - log.InfoCtx(ctx, "handleMessage error", zap.Error(err)) - } -} - -func (s *space) onObjectClose(id string) { - s.treesUsed.Add(-1) - log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id)) - _ = s.handleQueue.CloseThread(id) -} - -func (s *space) onSpaceDelete() { - err := s.storage.SetSpaceDeleted() - if err != nil { - log.Debug("failed to set space deleted") - } - s.isDeleted.Swap(true) + return s.state.SpaceStorage } func (s *space) Close() error { - if s.isClosed.Swap(true) { - log.Warn("call space.Close on closed space", zap.String("id", s.id)) + if s.state.SpaceIsClosed.Swap(true) { + log.Warn("call space.Close on closed space", zap.String("id", s.state.SpaceId)) return nil } - log.With(zap.String("id", s.id)).Debug("space is closing") + log := log.With(zap.String("spaceId", s.state.SpaceId)) + log.Debug("space is closing") - var mError errs.Group - if err := s.handleQueue.Close(); err != nil { - mError.Add(err) - } - if err := s.headSync.Close(); err != nil { - mError.Add(err) - } - if err := s.objectSync.Close(); err != nil { - mError.Add(err) - } - if err := s.settingsObject.Close(); err != nil { - mError.Add(err) - } - if err := s.aclList.Close(); err != nil { - mError.Add(err) - } - if err := s.storage.Close(); err != nil { - mError.Add(err) - } - if err := s.syncStatus.Close(); err != nil { - mError.Add(err) - } - log.With(zap.String("id", s.id)).Debug("space closed") - return mError.Err() + err := s.app.Close(context.Background()) + log.Debug("space closed") + return err } func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) { if time.Now().Sub(s.objectSync.LastUsage()) < objectTTL { return false, nil } - locked := s.treesUsed.Load() > 1 - log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check") + locked := s.state.TreesUsed.Load() > 1 + log.With(zap.Int32("trees used", s.state.TreesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.state.SpaceId)).Debug("space lock status check") if locked { return false, nil } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 3657d423..796a3daf 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -7,15 +7,24 @@ import ( "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" + "github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/headsync" "github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto" + "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" + "github.com/anyproto/any-sync/commonspace/objecttreebuilder" "github.com/anyproto/any-sync/commonspace/peermanager" + "github.com/anyproto/any-sync/commonspace/requestsender" + "github.com/anyproto/any-sync/commonspace/settings" + "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/commonspace/streamsender" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" @@ -73,6 +82,7 @@ func (s *spaceService) Init(a *app.App) (err error) { } s.pool = a.MustComponent(pool.CName).(pool.Pool) s.metric, _ = a.Component(metric.CName).(metric.Metric) + s.app = a return nil } @@ -140,8 +150,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { } } } - - lastConfiguration := s.configurationService var ( spaceIsClosed = &atomic.Bool{} spaceIsDeleted = &atomic.Bool{} @@ -151,42 +159,57 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { return nil, err } spaceIsDeleted.Swap(isDeleted) - getter := NewObjectManager(st.Id(), s.treeManager, spaceIsClosed) - syncStatus := syncstatus.NewNoOpSyncStatus() - // this will work only for clients, not the best solution, but... - if !lastConfiguration.IsResponsible(st.Id()) { - // TODO: move it to the client package and add possibility to inject StatusProvider from the client - syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) - } - var builder objecttree.BuildObjectTreeFunc - if s.config.KeepTreeDataInMemory { - builder = objecttree.BuildObjectTree - } else { - builder = objecttree.BuildEmptyDataObjectTree - } - - peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id) + aclStorage, err := st.AclStorage() if err != nil { return nil, err } + aclList, err := list.BuildAclListWithIdentity(s.account.Account(), aclStorage) + if err != nil { + return nil, err + } + aclList = syncacl.NewSyncAcl(aclList) + state := &spacestate.SpaceState{ + SpaceId: st.Id(), + SpaceIsDeleted: spaceIsDeleted, + SpaceIsClosed: spaceIsClosed, + TreesUsed: &atomic.Int32{}, + AclList: aclList, + SpaceStorage: st, + } + if s.config.KeepTreeDataInMemory { + state.TreeBuilderFunc = objecttree.BuildObjectTree + } else { + state.TreeBuilderFunc = objecttree.BuildEmptyDataObjectTree + } + var syncStatus syncstatus.StatusProvider + if !s.configurationService.IsResponsible(st.Id()) { + // TODO: move it to the client package and add possibility to inject StatusProvider from the client + syncStatus = syncstatus.NewSyncStatusProvider() + } else { + syncStatus = syncstatus.NewNoOpSyncStatus() + } + //lastConfiguration := s.configurationService + // + //peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id) + //if err != nil { + // return nil, err + //} + spaceApp := s.app.ChildApp() + spaceApp.Register(state). + Register(syncStatus). + Register(NewObjectManager(s.treeManager)). + Register(streamsender.New()). + Register(requestsender.New()). + Register(deletionstate.New()). + Register(settings.New()). + Register(syncclient.New()). + Register(objecttreebuilder.New()). + Register(objectsync.New()). + Register(headsync.New()) - headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, s.credentialProvider, log) - objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter, st) sp := &space{ - id: id, - objectSync: objectSync, - headSync: headSync, - syncStatus: syncStatus, - treeManager: getter, - account: s.account, - configuration: lastConfiguration, - peerManager: peerManager, - storage: st, - treesUsed: &atomic.Int32{}, - treeBuilder: builder, - isClosed: spaceIsClosed, - isDeleted: spaceIsDeleted, - metric: s.metric, + state: state, + app: spaceApp, } return sp, nil } diff --git a/commonspace/spacestate/shareddata.go b/commonspace/spacestate/shareddata.go index bf231cb7..9d6eac2f 100644 --- a/commonspace/spacestate/shareddata.go +++ b/commonspace/spacestate/shareddata.go @@ -10,11 +10,6 @@ import ( const CName = "common.commonspace.shareddata" -type SpaceActions interface { - OnObjectDelete(id string) - OnSpaceDelete() -} - type SpaceState struct { SpaceId string SpaceIsDeleted *atomic.Bool @@ -23,7 +18,6 @@ type SpaceState struct { AclList list.AclList SpaceStorage spacestorage.SpaceStorage TreeBuilderFunc objecttree.BuildObjectTreeFunc - Actions SpaceActions } func (s *SpaceState) Init(a *app.App) (err error) { diff --git a/commonspace/streamsender/streamsender.go b/commonspace/streamsender/streamsender.go index 9482a81a..99a45f5c 100644 --- a/commonspace/streamsender/streamsender.go +++ b/commonspace/streamsender/streamsender.go @@ -8,7 +8,30 @@ import ( const CName = "common.commonspace.streamsender" type StreamSender interface { - app.ComponentRunnable + app.Component SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) } + +func New() StreamSender { + return &streamSender{} +} + +type streamSender struct { +} + +func (s *streamSender) Init(a *app.App) (err error) { + return +} + +func (s *streamSender) Name() (name string) { + return CName +} + +func (s *streamSender) SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + return nil +} + +func (s *streamSender) Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) { + return nil +}