Simplify deletion pipeline and change other stuff

This commit is contained in:
mcrakhman 2022-11-24 01:17:56 +01:00 committed by Mikhail Iudin
parent 87997b6b4b
commit bba4efaa50
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
15 changed files with 362 additions and 185 deletions

View File

@ -0,0 +1,28 @@
package commonspace
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
)
type commonStorage struct {
storage.SpaceStorage
}
func newCommonStorage(spaceStorage storage.SpaceStorage) storage.SpaceStorage {
return &commonStorage{
SpaceStorage: spaceStorage,
}
}
func (c *commonStorage) CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (store treestorage.TreeStorage, err error) {
status, err := c.TreeDeletedStatus(payload.RootRawChange.Id)
if err != nil {
return
}
if status == "" {
return c.CreateTreeStorage(payload)
}
err = storage.ErrTreeStorageAlreadyDeleted
return
}

View File

@ -4,6 +4,7 @@ package diffservice
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
@ -20,7 +21,7 @@ type DiffService interface {
RemoveObjects(ids []string) RemoveObjects(ids []string)
AllIds() []string AllIds() []string
Init(objectIds []string) Init(objectIds []string, deletionState *deletionstate.DeletionState)
Close() (err error) Close() (err error)
} }
@ -60,8 +61,9 @@ func NewDiffService(
} }
} }
func (d *diffService) Init(objectIds []string) { func (d *diffService) Init(objectIds []string, deletionState *deletionstate.DeletionState) {
d.fillDiff(objectIds) d.fillDiff(objectIds)
d.syncer.Init(deletionState)
d.periodicSync.Run() d.periodicSync.Run()
} }

View File

@ -3,6 +3,7 @@ package diffservice
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
@ -11,7 +12,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff"
"go.uber.org/zap" "go.uber.org/zap"
"sync"
"time" "time"
) )
@ -19,6 +19,7 @@ type DiffSyncer interface {
Sync(ctx context.Context) error Sync(ctx context.Context) error
RemoveObjects(ids []string) RemoveObjects(ids []string)
UpdateHeads(id string, heads []string) UpdateHeads(id string, heads []string)
Init(deletionState *deletionstate.DeletionState)
} }
func newDiffSyncer( func newDiffSyncer(
@ -37,12 +38,10 @@ func newDiffSyncer(
confConnector: confConnector, confConnector: confConnector,
clientFactory: clientFactory, clientFactory: clientFactory,
log: log, log: log,
removedIds: map[string]struct{}{},
} }
} }
type diffSyncer struct { type diffSyncer struct {
sync.Mutex
spaceId string spaceId string
diff ldiff.Diff diff ldiff.Diff
confConnector nodeconf.ConfConnector confConnector nodeconf.ConfConnector
@ -50,22 +49,22 @@ type diffSyncer struct {
storage storage.SpaceStorage storage storage.SpaceStorage
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
log *zap.Logger log *zap.Logger
removedIds map[string]struct{} deletionState *deletionstate.DeletionState
}
func (d *diffSyncer) Init(deletionState *deletionstate.DeletionState) {
d.deletionState = deletionState
d.deletionState.AddObserver(d.RemoveObjects)
} }
func (d *diffSyncer) RemoveObjects(ids []string) { func (d *diffSyncer) RemoveObjects(ids []string) {
d.Lock()
defer d.Unlock()
for _, id := range ids { for _, id := range ids {
d.diff.RemoveId(id) d.diff.RemoveId(id)
d.removedIds[id] = struct{}{}
} }
} }
func (d *diffSyncer) UpdateHeads(id string, heads []string) { func (d *diffSyncer) UpdateHeads(id string, heads []string) {
d.Lock() if d.deletionState.Exists(id) {
defer d.Unlock()
if _, exists := d.removedIds[id]; exists {
return return
} }
d.diff.Set(ldiff.Element{ d.diff.Set(ldiff.Element{
@ -101,30 +100,17 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
if err == spacesyncproto.ErrSpaceMissing { if err == spacesyncproto.ErrSpaceMissing {
return d.sendPushSpaceRequest(ctx, cl) return d.sendPushSpaceRequest(ctx, cl)
} }
var afterFilterIds []string totalLen := len(newIds) + len(changedIds) + len(removedIds)
filter := func(ids []string) {
for _, id := range ids {
if _, exists := d.removedIds[id]; !exists {
afterFilterIds = append(afterFilterIds, id)
}
}
}
d.Lock()
totalLen := 0
// not syncing ids which were removed through settings document // not syncing ids which were removed through settings document
for _, ids := range [][]string{newIds, changedIds, removedIds} { filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
totalLen += len(ids)
filter(ids)
}
d.Unlock()
ctx = peer.CtxWithPeerId(ctx, p.Id()) ctx = peer.CtxWithPeerId(ctx, p.Id())
d.pingTreesInCache(ctx, afterFilterIds) d.pingTreesInCache(ctx, filteredIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)), d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)), zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)), zap.Int("removedIds", len(removedIds)),
zap.Int("filteredIds", totalLen-len(afterFilterIds))) zap.Int("already deleted ids", totalLen-len(filteredIds)))
return return
} }

View File

@ -34,7 +34,7 @@ func (p pushSpaceRequestMatcher) Matches(x interface{}) bool {
return false return false
} }
return res.AclPayloadId == p.aclRootId && res.SpaceHeader == p.spaceHeader return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader
} }
func (p pushSpaceRequestMatcher) String() string { func (p pushSpaceRequestMatcher) String() string {

View File

@ -0,0 +1,53 @@
package settingsdocument
import (
"context"
)
type deleteLoop struct {
deleteCtx context.Context
deleteCancel context.CancelFunc
deleteChan chan struct{}
deleteFunc func()
loopDone chan struct{}
}
func newDeleteLoop(deleteFunc func()) *deleteLoop {
ctx, cancel := context.WithCancel(context.Background())
return &deleteLoop{
deleteCtx: ctx,
deleteCancel: cancel,
deleteChan: make(chan struct{}, 1),
deleteFunc: deleteFunc,
loopDone: make(chan struct{}),
}
}
func (dl *deleteLoop) Run() {
go dl.loop()
}
func (dl *deleteLoop) loop() {
defer close(dl.loopDone)
dl.deleteFunc()
for {
select {
case <-dl.deleteCtx.Done():
return
case <-dl.deleteChan:
dl.deleteFunc()
}
}
}
func (dl *deleteLoop) notify() {
select {
case dl.deleteChan <- struct{}{}:
default:
}
}
func (dl *deleteLoop) Close() {
dl.deleteCancel()
<-dl.loopDone
}

View File

@ -0,0 +1,36 @@
package settingsdocument
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"go.uber.org/zap"
)
type deleter struct {
st storage.SpaceStorage
state *deletionstate.DeletionState
getter treegetter.TreeGetter
}
func newDeleter(st storage.SpaceStorage, state *deletionstate.DeletionState, getter treegetter.TreeGetter) *deleter {
return &deleter{st, state, getter}
}
func (d *deleter) delete() {
allQueued := d.state.GetQueued()
for _, id := range allQueued {
if _, err := d.st.TreeStorage(id); err == nil {
err := d.getter.DeleteTree(context.Background(), d.st.Id(), id)
if err != nil {
log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object")
continue
}
}
err := d.state.Delete(id)
if err != nil {
log.With(zap.String("id", id), zap.Error(err)).Error("failed to mark object as deleted")
}
}
}

View File

@ -0,0 +1,126 @@
package deletionstate
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"sync"
)
type StateUpdateObserver func(ids []string)
type DeletionState struct {
sync.RWMutex
queued map[string]struct{}
deleted map[string]struct{}
stateUpdateObservers []StateUpdateObserver
storage storage.SpaceStorage
}
func NewDeletionState(storage storage.SpaceStorage) *DeletionState {
return &DeletionState{
queued: map[string]struct{}{},
deleted: map[string]struct{}{},
storage: storage,
}
}
func (st *DeletionState) AddObserver(observer StateUpdateObserver) {
st.Lock()
defer st.Unlock()
st.stateUpdateObservers = append(st.stateUpdateObservers, observer)
}
func (st *DeletionState) Add(ids []string) (err error) {
st.Lock()
defer func() {
st.Unlock()
if err != nil {
return
}
for _, ob := range st.stateUpdateObservers {
ob(ids)
}
}()
for _, id := range ids {
if _, exists := st.deleted[id]; exists {
continue
}
if _, exists := st.queued[id]; exists {
continue
}
var status string
status, err = st.storage.TreeDeletedStatus(id)
if err != nil {
return
}
switch status {
case storage.TreeDeletedStatusQueued:
st.queued[id] = struct{}{}
case storage.TreeDeletedStatusDeleted:
st.deleted[id] = struct{}{}
default:
st.queued[id] = struct{}{}
err = st.storage.SetTreeDeletedStatus(id, storage.TreeDeletedStatusQueued)
if err != nil {
return
}
}
}
return
}
func (st *DeletionState) GetQueued() (ids []string) {
st.RLock()
defer st.RUnlock()
ids = make([]string, 0, len(st.queued))
for id := range st.queued {
ids = append(ids, id)
}
return
}
func (st *DeletionState) Delete(id string) (err error) {
st.Lock()
defer st.Unlock()
delete(st.queued, id)
st.deleted[id] = struct{}{}
err = st.storage.SetTreeDeletedStatus(id, storage.TreeDeletedStatusQueued)
if err != nil {
return
}
return
}
func (st *DeletionState) Exists(id string) bool {
st.RLock()
defer st.RUnlock()
return st.exists(id)
}
func (st *DeletionState) FilterJoin(ids ...[]string) (filtered []string) {
st.RLock()
defer st.RUnlock()
filter := func(ids []string) {
for _, id := range ids {
if !st.exists(id) {
filtered = append(filtered, id)
}
}
}
for _, arr := range ids {
filter(arr)
}
return
}
func (st *DeletionState) exists(id string) bool {
if _, exists := st.deleted[id]; exists {
return true
}
if _, exists := st.queued[id]; exists {
return true
}
return false
}

View File

@ -3,98 +3,108 @@ package settingsdocument
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"go.uber.org/zap"
) )
var log = logger.NewNamed("commonspace.settingsdocument")
type SettingsDocument interface { type SettingsDocument interface {
tree.ObjectTree tree.ObjectTree
Init(ctx context.Context) (err error) Init(ctx context.Context) (err error)
Refresh() Refresh()
DeleteObject(id string) (err error) DeleteObject(id string) (err error)
NotifyObjectUpdate(id string)
} }
type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error)
type RemoveObjectsFunc func([]string)
type Deps struct { type Deps struct {
BuildFunc BuildTreeFunc BuildFunc BuildTreeFunc
Account account.Service Account account.Service
TreeGetter treegetter.TreeGetter TreeGetter treegetter.TreeGetter
Store spacestorage.SpaceStorage Store spacestorage.SpaceStorage
RemoveFunc RemoveObjectsFunc DeletionState *deletionstate.DeletionState
// prov exists mainly for the ease of testing // prov exists mainly for the ease of testing
prov deletedIdsProvider prov deletedIdsProvider
} }
type settingsDocument struct { type settingsDocument struct {
tree.ObjectTree tree.ObjectTree
account account.Service account account.Service
spaceId string spaceId string
treeGetter treegetter.TreeGetter treeGetter treegetter.TreeGetter
store spacestorage.SpaceStorage store spacestorage.SpaceStorage
prov deletedIdsProvider prov deletedIdsProvider
removeNotifyFunc RemoveObjectsFunc buildFunc BuildTreeFunc
buildFunc BuildTreeFunc loop deleteLoop
queue *settingsQueue deletionState *deletionstate.DeletionState
documentIds []string lastChangeId string
lastChangeId string
} }
func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err error) { func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err error) {
deleter := newDeleter(deps.Store, deps.DeletionState, deps.TreeGetter)
loop := newDeleteLoop(func() {
deleter.delete()
})
deps.DeletionState.AddObserver(func(ids []string) {
loop.notify()
})
s := &settingsDocument{ s := &settingsDocument{
account: deps.Account, spaceId: spaceId,
spaceId: spaceId, account: deps.Account,
queue: newSettingsQueue(), deletionState: deps.DeletionState,
treeGetter: deps.TreeGetter, treeGetter: deps.TreeGetter,
store: deps.Store, store: deps.Store,
removeNotifyFunc: deps.RemoveFunc, buildFunc: deps.BuildFunc,
buildFunc: deps.BuildFunc,
} }
// this is needed mainly for testing // this is needed mainly for testing
if deps.prov == nil { if deps.prov == nil {
s.prov = &provider{} s.prov = &provider{}
} }
doc = s doc = s
return return
} }
func (s *settingsDocument) NotifyObjectUpdate(id string) {
s.queue.queueIfDeleted(id)
}
func (s *settingsDocument) Update(tr tree.ObjectTree) { func (s *settingsDocument) Update(tr tree.ObjectTree) {
ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId) ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId)
if err != nil { if err != nil {
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to update state")
return return
} }
s.documentIds = append(s.documentIds, ids...)
s.lastChangeId = lastId s.lastChangeId = lastId
s.queue.add(ids) if err = s.deletionState.Add(ids); err != nil {
s.removeNotifyFunc(ids) log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete")
s.deleteQueued() }
} }
func (s *settingsDocument) Rebuild(tr tree.ObjectTree) { func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
ids, lastId, err := s.prov.ProvideIds(tr, "") ids, lastId, err := s.prov.ProvideIds(tr, "")
if err != nil { if err != nil {
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to rebuild state")
return return
} }
s.documentIds = ids
s.lastChangeId = lastId s.lastChangeId = lastId
s.queue.add(ids) if err = s.deletionState.Add(ids); err != nil {
s.removeNotifyFunc(ids) log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete")
s.deleteQueued() }
} }
func (s *settingsDocument) Init(ctx context.Context) (err error) { func (s *settingsDocument) Init(ctx context.Context) (err error) {
s.ObjectTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s) s.ObjectTree, err = s.buildFunc(ctx, s.store.SpaceSettingsId(), s)
if err != nil {
return
}
s.loop.Run()
return return
} }
@ -104,29 +114,19 @@ func (s *settingsDocument) Refresh() {
if s.lastChangeId == "" { if s.lastChangeId == "" {
s.Rebuild(s) s.Rebuild(s)
} else { } else {
s.deleteQueued() s.Update(s)
} }
} }
func (s *settingsDocument) deleteQueued() { func (s *settingsDocument) Close() error {
allQueued := s.queue.getQueued() s.loop.Close()
for _, id := range allQueued { return s.ObjectTree.Close()
if _, err := s.store.TreeStorage(id); err == nil {
err := s.treeGetter.DeleteTree(context.Background(), s.spaceId, id)
if err != nil {
// TODO: some errors may tell us that the tree is actually deleted, so we should have more checks here
// TODO: add logging
continue
}
}
s.queue.delete(id)
}
} }
func (s *settingsDocument) DeleteObject(id string) (err error) { func (s *settingsDocument) DeleteObject(id string) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if s.queue.exists(id) { if s.deletionState.Exists(id) {
return nil return nil
} }

View File

@ -1,69 +0,0 @@
package settingsdocument
import "sync"
type settingsQueue struct {
sync.Mutex
queued map[string]struct{}
deleted map[string]struct{}
}
func newSettingsQueue() *settingsQueue {
return &settingsQueue{
Mutex: sync.Mutex{},
queued: map[string]struct{}{},
deleted: map[string]struct{}{},
}
}
func (q *settingsQueue) add(ids []string) {
q.Lock()
defer q.Unlock()
for _, id := range ids {
if _, exists := q.deleted[id]; exists {
continue
}
if _, exists := q.queued[id]; exists {
continue
}
q.queued[id] = struct{}{}
}
}
func (q *settingsQueue) getQueued() (ids []string) {
q.Lock()
defer q.Unlock()
ids = make([]string, 0, len(q.queued))
for id := range q.queued {
ids = append(ids, id)
}
return
}
func (q *settingsQueue) delete(id string) {
q.Lock()
defer q.Unlock()
delete(q.queued, id)
q.deleted[id] = struct{}{}
}
func (q *settingsQueue) queueIfDeleted(id string) {
q.Lock()
defer q.Unlock()
if _, exists := q.deleted[id]; exists {
delete(q.deleted, id)
q.queued[id] = struct{}{}
}
}
func (q *settingsQueue) exists(id string) bool {
q.Lock()
defer q.Unlock()
if _, exists := q.deleted[id]; exists {
return true
}
if _, exists := q.queued[id]; exists {
return true
}
return false
}

View File

@ -7,6 +7,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl"
@ -19,7 +20,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
@ -96,8 +96,6 @@ type space struct {
aclList *syncacl.SyncACL aclList *syncacl.SyncACL
configuration nodeconf.Configuration configuration nodeconf.Configuration
settingsDocument settingsdocument.SettingsDocument settingsDocument settingsdocument.SettingsDocument
settingsSync periodicsync.PeriodicSync
headNotifiable diffservice.HeadNotifiable
isClosed atomic.Bool isClosed atomic.Bool
} }
@ -132,6 +130,8 @@ func (s *space) Description() (desc SpaceDescription, err error) {
} }
func (s *space) Init(ctx context.Context) (err error) { func (s *space) Init(ctx context.Context) (err error) {
s.storage = newCommonStorage(s.storage)
header, err := s.storage.SpaceHeader() header, err := s.storage.SpaceHeader()
if err != nil { if err != nil {
return return
@ -152,33 +152,27 @@ func (s *space) Init(ctx context.Context) (err error) {
} }
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool()) s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool())
deletionState := deletionstate.NewDeletionState(s.storage)
deps := settingsdocument.Deps{ deps := settingsdocument.Deps{
BuildFunc: s.BuildTree, BuildFunc: s.BuildTree,
Account: s.account, Account: s.account,
TreeGetter: s.cache, TreeGetter: s.cache,
Store: s.storage, Store: s.storage,
RemoveFunc: s.diffService.RemoveObjects, DeletionState: deletionState,
} }
s.settingsDocument, err = settingsdocument.NewSettingsDocument(deps, s.id) s.settingsDocument, err = settingsdocument.NewSettingsDocument(deps, s.id)
if err != nil { if err != nil {
return return
} }
s.headNotifiable = diffservice.HeadNotifiableFunc(func(id string, heads []string) {
s.diffService.UpdateHeads(id, heads) objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument)
s.settingsDocument.NotifyObjectUpdate(id) s.syncService.Init(objectGetter)
}) s.diffService.Init(initialIds, deletionState)
err = s.settingsDocument.Init(ctx) err = s.settingsDocument.Init(ctx)
if err != nil { if err != nil {
return return
} }
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument)
s.syncService.Init(objectGetter)
s.diffService.Init(initialIds)
s.settingsSync = periodicsync.NewPeriodicSync(SettingsSyncPeriodSeconds, func(ctx context.Context) error {
s.settingsDocument.Refresh()
return nil
}, log)
s.settingsSync.Run()
return nil return nil
} }
@ -208,10 +202,10 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Payload: payload, Payload: payload,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.headNotifiable, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage, SpaceStorage: s.storage,
} }
return synctree.DeriveSyncTree(ctx, deps) return synctree.DeriveSyncTree(ctx, deps)
} }
@ -226,10 +220,10 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Payload: payload, Payload: payload,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.headNotifiable, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
CreateStorage: s.storage.CreateTreeStorage, SpaceStorage: s.storage,
} }
return synctree.CreateSyncTree(ctx, deps) return synctree.CreateSyncTree(ctx, deps)
} }
@ -243,7 +237,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
SpaceId: s.id, SpaceId: s.id,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.headNotifiable, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
@ -268,7 +262,6 @@ func (s *space) Close() error {
if err := s.syncService.Close(); err != nil { if err := s.syncService.Close(); err != nil {
mError.Add(err) mError.Add(err)
} }
s.settingsSync.Close()
if err := s.settingsDocument.Close(); err != nil { if err := s.settingsDocument.Close(); err != nil {
mError.Add(err) mError.Add(err)
} }

View File

@ -12,12 +12,23 @@ import (
const CName = "commonspace.storage" const CName = "commonspace.storage"
var ErrSpaceStorageExists = errors.New("space storage exists") var (
var ErrSpaceStorageMissing = errors.New("space storage missing") ErrSpaceStorageExists = errors.New("space storage exists")
ErrSpaceStorageMissing = errors.New("space storage missing")
ErrTreeStorageAlreadyDeleted = errors.New("tree storage already deleted")
)
const (
TreeDeletedStatusQueued = "queued"
TreeDeletedStatusDeleted = "deleted"
)
type SpaceStorage interface { type SpaceStorage interface {
storage.Provider storage.Provider
Id() string Id() string
SetTreeDeletedStatus(id, state string) error
TreeDeletedStatus(id string) (string, error)
SpaceSettingsId() string SpaceSettingsId() string
ACLStorage() (storage.ListStorage, error) ACLStorage() (storage.ListStorage, error)
SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error) SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error)

View File

@ -48,7 +48,7 @@ type CreateDeps struct {
StreamPool syncservice.StreamPool StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
AclList list.ACLList AclList list.ACLList
CreateStorage storage.TreeStorageCreatorFunc SpaceStorage spacestorage.SpaceStorage
} }
type BuildDeps struct { type BuildDeps struct {
@ -63,7 +63,7 @@ type BuildDeps struct {
} }
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage)
if err != nil { if err != nil {
return return
} }
@ -81,6 +81,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
t = syncTree t = syncTree
syncTree.Lock()
defer syncTree.Unlock()
syncTree.listener.Rebuild(syncTree)
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -88,7 +91,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er
} }
func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) t, err = createObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage)
if err != nil { if err != nil {
return return
} }
@ -106,6 +109,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
t = syncTree t = syncTree
syncTree.Lock()
defer syncTree.Unlock()
syncTree.listener.Rebuild(syncTree)
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -186,6 +192,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tr
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
t = syncTree t = syncTree
syncTree.Lock()
defer syncTree.Unlock()
syncTree.listener.Rebuild(syncTree)
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update // here we will have different behaviour based on who is sending this update

View File

@ -5,9 +5,11 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
) )
var ErrUnknownTreeId = errors.New("tree does not exist") var (
var ErrTreeExists = errors.New("tree already exists") ErrUnknownTreeId = errors.New("tree does not exist")
var ErrUnkownChange = errors.New("change doesn't exist") ErrTreeExists = errors.New("tree already exists")
ErrUnknownChange = errors.New("change doesn't exist")
)
type TreeStorageCreatePayload struct { type TreeStorageCreatePayload struct {
RootRawChange *treechangeproto.RawTreeChangeWithId RootRawChange *treechangeproto.RawTreeChangeWithId

View File

@ -122,7 +122,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
return return
} }
if res == nil { if res == nil {
err = storage.ErrUnkownChange err = storage.ErrUnknownChange
} }
raw = &treechangeproto.RawTreeChangeWithId{ raw = &treechangeproto.RawTreeChangeWithId{