diff --git a/common/commonspace/commonstorage.go b/common/commonspace/commonstorage.go new file mode 100644 index 00000000..ab6d5ca6 --- /dev/null +++ b/common/commonspace/commonstorage.go @@ -0,0 +1,28 @@ +package commonspace + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" +) + +type commonStorage struct { + storage.SpaceStorage +} + +func newCommonStorage(spaceStorage storage.SpaceStorage) storage.SpaceStorage { + return &commonStorage{ + SpaceStorage: spaceStorage, + } +} + +func (c *commonStorage) CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (store treestorage.TreeStorage, err error) { + status, err := c.TreeDeletedStatus(payload.RootRawChange.Id) + if err != nil { + return + } + if status == "" { + return c.CreateTreeStorage(payload) + } + err = storage.ErrTreeStorageAlreadyDeleted + return +} diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index 8888a111..1c5df8b3 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -4,6 +4,7 @@ package diffservice import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" @@ -20,7 +21,7 @@ type DiffService interface { RemoveObjects(ids []string) AllIds() []string - Init(objectIds []string) + Init(objectIds []string, deletionState *deletionstate.DeletionState) Close() (err error) } @@ -60,8 +61,9 @@ func NewDiffService( } } -func (d *diffService) Init(objectIds []string) { +func (d *diffService) Init(objectIds []string, deletionState *deletionstate.DeletionState) { d.fillDiff(objectIds) + d.syncer.Init(deletionState) d.periodicSync.Run() } diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 31c5071a..4f92d570 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -3,6 +3,7 @@ package diffservice import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" @@ -11,7 +12,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff" "go.uber.org/zap" - "sync" "time" ) @@ -19,6 +19,7 @@ type DiffSyncer interface { Sync(ctx context.Context) error RemoveObjects(ids []string) UpdateHeads(id string, heads []string) + Init(deletionState *deletionstate.DeletionState) } func newDiffSyncer( @@ -37,12 +38,10 @@ func newDiffSyncer( confConnector: confConnector, clientFactory: clientFactory, log: log, - removedIds: map[string]struct{}{}, } } type diffSyncer struct { - sync.Mutex spaceId string diff ldiff.Diff confConnector nodeconf.ConfConnector @@ -50,22 +49,22 @@ type diffSyncer struct { storage storage.SpaceStorage clientFactory spacesyncproto.ClientFactory log *zap.Logger - removedIds map[string]struct{} + deletionState *deletionstate.DeletionState +} + +func (d *diffSyncer) Init(deletionState *deletionstate.DeletionState) { + d.deletionState = deletionState + d.deletionState.AddObserver(d.RemoveObjects) } func (d *diffSyncer) RemoveObjects(ids []string) { - d.Lock() - defer d.Unlock() for _, id := range ids { d.diff.RemoveId(id) - d.removedIds[id] = struct{}{} } } func (d *diffSyncer) UpdateHeads(id string, heads []string) { - d.Lock() - defer d.Unlock() - if _, exists := d.removedIds[id]; exists { + if d.deletionState.Exists(id) { return } d.diff.Set(ldiff.Element{ @@ -101,30 +100,17 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) if err == spacesyncproto.ErrSpaceMissing { return d.sendPushSpaceRequest(ctx, cl) } - var afterFilterIds []string - filter := func(ids []string) { - for _, id := range ids { - if _, exists := d.removedIds[id]; !exists { - afterFilterIds = append(afterFilterIds, id) - } - } - } - d.Lock() - totalLen := 0 + totalLen := len(newIds) + len(changedIds) + len(removedIds) // not syncing ids which were removed through settings document - for _, ids := range [][]string{newIds, changedIds, removedIds} { - totalLen += len(ids) - filter(ids) - } - d.Unlock() + filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) ctx = peer.CtxWithPeerId(ctx, p.Id()) - d.pingTreesInCache(ctx, afterFilterIds) + d.pingTreesInCache(ctx, filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds)), - zap.Int("filteredIds", totalLen-len(afterFilterIds))) + zap.Int("already deleted ids", totalLen-len(filteredIds))) return } diff --git a/common/commonspace/diffservice/diffsyncer_test.go b/common/commonspace/diffservice/diffsyncer_test.go index 13001df8..e6074a71 100644 --- a/common/commonspace/diffservice/diffsyncer_test.go +++ b/common/commonspace/diffservice/diffsyncer_test.go @@ -34,7 +34,7 @@ func (p pushSpaceRequestMatcher) Matches(x interface{}) bool { return false } - return res.AclPayloadId == p.aclRootId && res.SpaceHeader == p.spaceHeader + return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader } func (p pushSpaceRequestMatcher) String() string { diff --git a/common/commonspace/settingsdocument/deleteloop.go b/common/commonspace/settingsdocument/deleteloop.go new file mode 100644 index 00000000..f122fd83 --- /dev/null +++ b/common/commonspace/settingsdocument/deleteloop.go @@ -0,0 +1,53 @@ +package settingsdocument + +import ( + "context" +) + +type deleteLoop struct { + deleteCtx context.Context + deleteCancel context.CancelFunc + deleteChan chan struct{} + deleteFunc func() + loopDone chan struct{} +} + +func newDeleteLoop(deleteFunc func()) *deleteLoop { + ctx, cancel := context.WithCancel(context.Background()) + return &deleteLoop{ + deleteCtx: ctx, + deleteCancel: cancel, + deleteChan: make(chan struct{}, 1), + deleteFunc: deleteFunc, + loopDone: make(chan struct{}), + } +} + +func (dl *deleteLoop) Run() { + go dl.loop() +} + +func (dl *deleteLoop) loop() { + defer close(dl.loopDone) + dl.deleteFunc() + for { + select { + case <-dl.deleteCtx.Done(): + return + case <-dl.deleteChan: + dl.deleteFunc() + } + } +} + +func (dl *deleteLoop) notify() { + select { + case dl.deleteChan <- struct{}{}: + default: + } +} + +func (dl *deleteLoop) Close() { + dl.deleteCancel() + <-dl.loopDone +} diff --git a/common/commonspace/settingsdocument/deleter.go b/common/commonspace/settingsdocument/deleter.go new file mode 100644 index 00000000..48b6c871 --- /dev/null +++ b/common/commonspace/settingsdocument/deleter.go @@ -0,0 +1,36 @@ +package settingsdocument + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" + "go.uber.org/zap" +) + +type deleter struct { + st storage.SpaceStorage + state *deletionstate.DeletionState + getter treegetter.TreeGetter +} + +func newDeleter(st storage.SpaceStorage, state *deletionstate.DeletionState, getter treegetter.TreeGetter) *deleter { + return &deleter{st, state, getter} +} + +func (d *deleter) delete() { + allQueued := d.state.GetQueued() + for _, id := range allQueued { + if _, err := d.st.TreeStorage(id); err == nil { + err := d.getter.DeleteTree(context.Background(), d.st.Id(), id) + if err != nil { + log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object") + continue + } + } + err := d.state.Delete(id) + if err != nil { + log.With(zap.String("id", id), zap.Error(err)).Error("failed to mark object as deleted") + } + } +} diff --git a/common/commonspace/settingsdocument/deletionstate/deletionstate.go b/common/commonspace/settingsdocument/deletionstate/deletionstate.go new file mode 100644 index 00000000..3387dd9d --- /dev/null +++ b/common/commonspace/settingsdocument/deletionstate/deletionstate.go @@ -0,0 +1,126 @@ +package deletionstate + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "sync" +) + +type StateUpdateObserver func(ids []string) + +type DeletionState struct { + sync.RWMutex + queued map[string]struct{} + deleted map[string]struct{} + stateUpdateObservers []StateUpdateObserver + storage storage.SpaceStorage +} + +func NewDeletionState(storage storage.SpaceStorage) *DeletionState { + return &DeletionState{ + queued: map[string]struct{}{}, + deleted: map[string]struct{}{}, + storage: storage, + } +} + +func (st *DeletionState) AddObserver(observer StateUpdateObserver) { + st.Lock() + defer st.Unlock() + st.stateUpdateObservers = append(st.stateUpdateObservers, observer) +} + +func (st *DeletionState) Add(ids []string) (err error) { + st.Lock() + defer func() { + st.Unlock() + if err != nil { + return + } + for _, ob := range st.stateUpdateObservers { + ob(ids) + } + }() + + for _, id := range ids { + if _, exists := st.deleted[id]; exists { + continue + } + if _, exists := st.queued[id]; exists { + continue + } + + var status string + status, err = st.storage.TreeDeletedStatus(id) + if err != nil { + return + } + + switch status { + case storage.TreeDeletedStatusQueued: + st.queued[id] = struct{}{} + case storage.TreeDeletedStatusDeleted: + st.deleted[id] = struct{}{} + default: + st.queued[id] = struct{}{} + err = st.storage.SetTreeDeletedStatus(id, storage.TreeDeletedStatusQueued) + if err != nil { + return + } + } + } + return +} + +func (st *DeletionState) GetQueued() (ids []string) { + st.RLock() + defer st.RUnlock() + ids = make([]string, 0, len(st.queued)) + for id := range st.queued { + ids = append(ids, id) + } + return +} + +func (st *DeletionState) Delete(id string) (err error) { + st.Lock() + defer st.Unlock() + delete(st.queued, id) + st.deleted[id] = struct{}{} + err = st.storage.SetTreeDeletedStatus(id, storage.TreeDeletedStatusQueued) + if err != nil { + return + } + return +} + +func (st *DeletionState) Exists(id string) bool { + st.RLock() + defer st.RUnlock() + return st.exists(id) +} + +func (st *DeletionState) FilterJoin(ids ...[]string) (filtered []string) { + st.RLock() + defer st.RUnlock() + filter := func(ids []string) { + for _, id := range ids { + if !st.exists(id) { + filtered = append(filtered, id) + } + } + } + for _, arr := range ids { + filter(arr) + } + return +} + +func (st *DeletionState) exists(id string) bool { + if _, exists := st.deleted[id]; exists { + return true + } + if _, exists := st.queued[id]; exists { + return true + } + return false +} diff --git a/common/commonspace/settingsdocument/provider.go b/common/commonspace/settingsdocument/idprovider.go similarity index 100% rename from common/commonspace/settingsdocument/provider.go rename to common/commonspace/settingsdocument/idprovider.go diff --git a/common/commonspace/settingsdocument/settingsdocument.go b/common/commonspace/settingsdocument/settingsdocument.go index ca275663..3060cb20 100644 --- a/common/commonspace/settingsdocument/settingsdocument.go +++ b/common/commonspace/settingsdocument/settingsdocument.go @@ -3,98 +3,108 @@ package settingsdocument import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" + "go.uber.org/zap" ) +var log = logger.NewNamed("commonspace.settingsdocument") + type SettingsDocument interface { tree.ObjectTree Init(ctx context.Context) (err error) Refresh() DeleteObject(id string) (err error) - NotifyObjectUpdate(id string) } type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) -type RemoveObjectsFunc func([]string) type Deps struct { - BuildFunc BuildTreeFunc - Account account.Service - TreeGetter treegetter.TreeGetter - Store spacestorage.SpaceStorage - RemoveFunc RemoveObjectsFunc + BuildFunc BuildTreeFunc + Account account.Service + TreeGetter treegetter.TreeGetter + Store spacestorage.SpaceStorage + DeletionState *deletionstate.DeletionState // prov exists mainly for the ease of testing prov deletedIdsProvider } type settingsDocument struct { tree.ObjectTree - account account.Service - spaceId string - treeGetter treegetter.TreeGetter - store spacestorage.SpaceStorage - prov deletedIdsProvider - removeNotifyFunc RemoveObjectsFunc - buildFunc BuildTreeFunc + account account.Service + spaceId string + treeGetter treegetter.TreeGetter + store spacestorage.SpaceStorage + prov deletedIdsProvider + buildFunc BuildTreeFunc + loop deleteLoop - queue *settingsQueue - documentIds []string - lastChangeId string + deletionState *deletionstate.DeletionState + lastChangeId string } func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err error) { + deleter := newDeleter(deps.Store, deps.DeletionState, deps.TreeGetter) + loop := newDeleteLoop(func() { + deleter.delete() + }) + deps.DeletionState.AddObserver(func(ids []string) { + loop.notify() + }) + s := &settingsDocument{ - account: deps.Account, - spaceId: spaceId, - queue: newSettingsQueue(), - treeGetter: deps.TreeGetter, - store: deps.Store, - removeNotifyFunc: deps.RemoveFunc, - buildFunc: deps.BuildFunc, + spaceId: spaceId, + account: deps.Account, + deletionState: deps.DeletionState, + treeGetter: deps.TreeGetter, + store: deps.Store, + buildFunc: deps.BuildFunc, } // this is needed mainly for testing if deps.prov == nil { s.prov = &provider{} } + doc = s return } -func (s *settingsDocument) NotifyObjectUpdate(id string) { - s.queue.queueIfDeleted(id) -} - func (s *settingsDocument) Update(tr tree.ObjectTree) { ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId) if err != nil { + log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to update state") return } - s.documentIds = append(s.documentIds, ids...) s.lastChangeId = lastId - s.queue.add(ids) - s.removeNotifyFunc(ids) - s.deleteQueued() + if err = s.deletionState.Add(ids); err != nil { + log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete") + } } func (s *settingsDocument) Rebuild(tr tree.ObjectTree) { ids, lastId, err := s.prov.ProvideIds(tr, "") if err != nil { + log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to rebuild state") return } - s.documentIds = ids s.lastChangeId = lastId - s.queue.add(ids) - s.removeNotifyFunc(ids) - s.deleteQueued() + if err = s.deletionState.Add(ids); err != nil { + log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete") + } } func (s *settingsDocument) Init(ctx context.Context) (err error) { s.ObjectTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s) + if err != nil { + return + } + s.loop.Run() return } @@ -104,29 +114,19 @@ func (s *settingsDocument) Refresh() { if s.lastChangeId == "" { s.Rebuild(s) } else { - s.deleteQueued() + s.Update(s) } } -func (s *settingsDocument) deleteQueued() { - allQueued := s.queue.getQueued() - for _, id := range allQueued { - if _, err := s.store.TreeStorage(id); err == nil { - err := s.treeGetter.DeleteTree(context.Background(), s.spaceId, id) - if err != nil { - // TODO: some errors may tell us that the tree is actually deleted, so we should have more checks here - // TODO: add logging - continue - } - } - s.queue.delete(id) - } +func (s *settingsDocument) Close() error { + s.loop.Close() + return s.ObjectTree.Close() } func (s *settingsDocument) DeleteObject(id string) (err error) { s.Lock() defer s.Unlock() - if s.queue.exists(id) { + if s.deletionState.Exists(id) { return nil } diff --git a/common/commonspace/settingsdocument/settingsqueue.go b/common/commonspace/settingsdocument/settingsqueue.go deleted file mode 100644 index 52a7329f..00000000 --- a/common/commonspace/settingsdocument/settingsqueue.go +++ /dev/null @@ -1,69 +0,0 @@ -package settingsdocument - -import "sync" - -type settingsQueue struct { - sync.Mutex - queued map[string]struct{} - deleted map[string]struct{} -} - -func newSettingsQueue() *settingsQueue { - return &settingsQueue{ - Mutex: sync.Mutex{}, - queued: map[string]struct{}{}, - deleted: map[string]struct{}{}, - } -} - -func (q *settingsQueue) add(ids []string) { - q.Lock() - defer q.Unlock() - for _, id := range ids { - if _, exists := q.deleted[id]; exists { - continue - } - if _, exists := q.queued[id]; exists { - continue - } - q.queued[id] = struct{}{} - } -} - -func (q *settingsQueue) getQueued() (ids []string) { - q.Lock() - defer q.Unlock() - ids = make([]string, 0, len(q.queued)) - for id := range q.queued { - ids = append(ids, id) - } - return -} - -func (q *settingsQueue) delete(id string) { - q.Lock() - defer q.Unlock() - delete(q.queued, id) - q.deleted[id] = struct{}{} -} - -func (q *settingsQueue) queueIfDeleted(id string) { - q.Lock() - defer q.Unlock() - if _, exists := q.deleted[id]; exists { - delete(q.deleted, id) - q.queued[id] = struct{}{} - } -} - -func (q *settingsQueue) exists(id string) bool { - q.Lock() - defer q.Unlock() - if _, exists := q.deleted[id]; exists { - return true - } - if _, exists := q.queued[id]; exists { - return true - } - return false -} diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 8ee4ba5b..9a1f39ff 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" @@ -19,7 +20,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "github.com/zeebo/errs" "go.uber.org/zap" "sync" @@ -96,8 +96,6 @@ type space struct { aclList *syncacl.SyncACL configuration nodeconf.Configuration settingsDocument settingsdocument.SettingsDocument - settingsSync periodicsync.PeriodicSync - headNotifiable diffservice.HeadNotifiable isClosed atomic.Bool } @@ -132,6 +130,8 @@ func (s *space) Description() (desc SpaceDescription, err error) { } func (s *space) Init(ctx context.Context) (err error) { + s.storage = newCommonStorage(s.storage) + header, err := s.storage.SpaceHeader() if err != nil { return @@ -152,33 +152,27 @@ func (s *space) Init(ctx context.Context) (err error) { } s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool()) + deletionState := deletionstate.NewDeletionState(s.storage) deps := settingsdocument.Deps{ - BuildFunc: s.BuildTree, - Account: s.account, - TreeGetter: s.cache, - Store: s.storage, - RemoveFunc: s.diffService.RemoveObjects, + BuildFunc: s.BuildTree, + Account: s.account, + TreeGetter: s.cache, + Store: s.storage, + DeletionState: deletionState, } s.settingsDocument, err = settingsdocument.NewSettingsDocument(deps, s.id) if err != nil { return } - s.headNotifiable = diffservice.HeadNotifiableFunc(func(id string, heads []string) { - s.diffService.UpdateHeads(id, heads) - s.settingsDocument.NotifyObjectUpdate(id) - }) + + objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument) + s.syncService.Init(objectGetter) + s.diffService.Init(initialIds, deletionState) err = s.settingsDocument.Init(ctx) if err != nil { return } - objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument) - s.syncService.Init(objectGetter) - s.diffService.Init(initialIds) - s.settingsSync = periodicsync.NewPeriodicSync(SettingsSyncPeriodSeconds, func(ctx context.Context) error { - s.settingsDocument.Refresh() - return nil - }, log) - s.settingsSync.Run() + return nil } @@ -208,10 +202,10 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay Payload: payload, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.headNotifiable, + HeadNotifiable: s.diffService, Listener: listener, AclList: s.aclList, - CreateStorage: s.storage.CreateTreeStorage, + SpaceStorage: s.storage, } return synctree.DeriveSyncTree(ctx, deps) } @@ -226,10 +220,10 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay Payload: payload, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.headNotifiable, + HeadNotifiable: s.diffService, Listener: listener, AclList: s.aclList, - CreateStorage: s.storage.CreateTreeStorage, + SpaceStorage: s.storage, } return synctree.CreateSyncTree(ctx, deps) } @@ -243,7 +237,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene SpaceId: s.id, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.headNotifiable, + HeadNotifiable: s.diffService, Listener: listener, AclList: s.aclList, SpaceStorage: s.storage, @@ -268,7 +262,6 @@ func (s *space) Close() error { if err := s.syncService.Close(); err != nil { mError.Add(err) } - s.settingsSync.Close() if err := s.settingsDocument.Close(); err != nil { mError.Add(err) } diff --git a/common/commonspace/storage/storage.go b/common/commonspace/storage/storage.go index ba9d0e26..bbca9c55 100644 --- a/common/commonspace/storage/storage.go +++ b/common/commonspace/storage/storage.go @@ -12,12 +12,23 @@ import ( const CName = "commonspace.storage" -var ErrSpaceStorageExists = errors.New("space storage exists") -var ErrSpaceStorageMissing = errors.New("space storage missing") +var ( + ErrSpaceStorageExists = errors.New("space storage exists") + ErrSpaceStorageMissing = errors.New("space storage missing") + + ErrTreeStorageAlreadyDeleted = errors.New("tree storage already deleted") +) + +const ( + TreeDeletedStatusQueued = "queued" + TreeDeletedStatusDeleted = "deleted" +) type SpaceStorage interface { storage.Provider Id() string + SetTreeDeletedStatus(id, state string) error + TreeDeletedStatus(id string) (string, error) SpaceSettingsId() string ACLStorage() (storage.ListStorage, error) SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error) diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index f9445fe0..024aa1dd 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -48,7 +48,7 @@ type CreateDeps struct { StreamPool syncservice.StreamPool Listener updatelistener.UpdateListener AclList list.ACLList - CreateStorage storage.TreeStorageCreatorFunc + SpaceStorage spacestorage.SpaceStorage } type BuildDeps struct { @@ -63,7 +63,7 @@ type BuildDeps struct { } func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { - t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) + t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage) if err != nil { return } @@ -81,6 +81,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree + syncTree.Lock() + defer syncTree.Unlock() + syncTree.listener.Rebuild(syncTree) headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -88,7 +91,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er } func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { - t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) + t, err = createObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage) if err != nil { return } @@ -106,6 +109,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree + syncTree.Lock() + defer syncTree.Unlock() + syncTree.listener.Rebuild(syncTree) headUpdate := syncClient.CreateHeadUpdate(t, nil) err = syncClient.BroadcastAsync(headUpdate) @@ -186,6 +192,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tr syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree + syncTree.Lock() + defer syncTree.Unlock() + syncTree.listener.Rebuild(syncTree) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // here we will have different behaviour based on who is sending this update diff --git a/common/pkg/acl/storage/provider.go b/common/pkg/acl/storage/provider.go index 51b53b4f..fd065f78 100644 --- a/common/pkg/acl/storage/provider.go +++ b/common/pkg/acl/storage/provider.go @@ -5,9 +5,11 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" ) -var ErrUnknownTreeId = errors.New("tree does not exist") -var ErrTreeExists = errors.New("tree already exists") -var ErrUnkownChange = errors.New("change doesn't exist") +var ( + ErrUnknownTreeId = errors.New("tree does not exist") + ErrTreeExists = errors.New("tree already exists") + ErrUnknownChange = errors.New("change doesn't exist") +) type TreeStorageCreatePayload struct { RootRawChange *treechangeproto.RawTreeChangeWithId diff --git a/node/storage/treestorage.go b/node/storage/treestorage.go index f3da3c83..9cca39eb 100644 --- a/node/storage/treestorage.go +++ b/node/storage/treestorage.go @@ -122,7 +122,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha return } if res == nil { - err = storage.ErrUnkownChange + err = storage.ErrUnknownChange } raw = &treechangeproto.RawTreeChangeWithId{