Diff service notify logic

This commit is contained in:
mcrakhman 2022-11-12 16:24:06 +01:00 committed by Mikhail Iudin
parent 696cc37055
commit 676976e7d3
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
6 changed files with 78 additions and 24 deletions

View File

@ -16,7 +16,7 @@ import (
type DiffService interface {
HeadNotifiable
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
RemoveObject(id string)
RemoveObjects(ids []string)
AllIds() []string
Init(objectIds []string)
@ -29,6 +29,7 @@ type diffService struct {
storage storage.SpaceStorage
diff ldiff.Diff
log *zap.Logger
syncer DiffSyncer
syncPeriod int
}
@ -50,6 +51,7 @@ func NewDiffService(
return &diffService{
spaceId: spaceId,
storage: storage,
syncer: syncer,
periodicSync: periodicSync,
diff: diff,
log: log,
@ -77,9 +79,11 @@ func (d *diffService) AllIds() []string {
return d.diff.Ids()
}
func (d *diffService) RemoveObject(id string) {
// TODO: add space document to remove ids
d.diff.RemoveId(id)
func (d *diffService) RemoveObjects(ids []string) {
for _, id := range ids {
d.diff.RemoveId(id)
}
d.syncer.RemoveObjects(ids)
}
func (d *diffService) Close() (err error) {

View File

@ -11,11 +11,13 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff"
"go.uber.org/zap"
"sync"
"time"
)
type DiffSyncer interface {
Sync(ctx context.Context) error
RemoveObjects(ids []string)
}
func newDiffSyncer(
@ -34,10 +36,12 @@ func newDiffSyncer(
confConnector: confConnector,
clientFactory: clientFactory,
log: log,
removedIds: map[string]struct{}{},
}
}
type diffSyncer struct {
sync.Mutex
spaceId string
diff ldiff.Diff
confConnector nodeconf.ConfConnector
@ -45,6 +49,15 @@ type diffSyncer struct {
storage storage.SpaceStorage
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
removedIds map[string]struct{}
}
func (d *diffSyncer) RemoveObjects(ids []string) {
d.Lock()
defer d.Unlock()
for _, id := range ids {
d.removedIds[id] = struct{}{}
}
}
func (d *diffSyncer) Sync(ctx context.Context) error {
@ -74,15 +87,30 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
if err == spacesyncproto.ErrSpaceMissing {
return d.sendPushSpaceRequest(ctx, cl)
}
var afterFilterIds []string
filter := func(ids []string) {
for _, id := range ids {
if _, exists := d.removedIds[id]; !exists {
afterFilterIds = append(afterFilterIds, id)
}
}
}
d.Lock()
totalLen := 0
// not syncing ids which were removed through settings document
for _, ids := range [][]string{newIds, changedIds, removedIds} {
totalLen += len(ids)
filter(ids)
}
d.Unlock()
ctx = peer.CtxWithPeerId(ctx, p.Id())
d.pingTreesInCache(ctx, newIds)
d.pingTreesInCache(ctx, changedIds)
d.pingTreesInCache(ctx, removedIds)
d.pingTreesInCache(ctx, afterFilterIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)))
zap.Int("removedIds", len(removedIds)),
zap.Int("filteredIds", totalLen-len(afterFilterIds)))
return
}

View File

@ -23,17 +23,25 @@ func (p *provider) convert(decrypted []byte) (res any, err error) {
func (p *provider) ProvideIds(tr tree.ObjectTree, startId string) (ids []string, lastId string, err error) {
processChange := func(change *tree.Change) bool {
// ignoring first change if startId is not ""
if change.Id == startId {
// ignoring root change which has empty model or startId change
lastId = change.Id
if change.Model == nil || (change.Id == startId && startId != "") {
return true
}
deleteChange := change.Model.(*spacesyncproto.SettingsData)
// getting data from snapshot if we start from it
if change.Id == tr.Root().Id {
ids = deleteChange.Snapshot.DeletedIds
return true
}
// otherwise getting data from content
for _, cnt := range deleteChange.Content {
if cnt.GetObjectDelete() != nil {
ids = append(ids, cnt.GetObjectDelete().GetId())
}
}
lastId = change.Id
return true
}
if startId == "" {

View File

@ -19,37 +19,42 @@ const (
type SettingsDocument interface {
tree.ObjectTree
Init()
DeleteObject(id string) (err error)
}
type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error)
type RemoveObjectsFunc func([]string)
type Deps struct {
BuildFunc BuildTreeFunc
Account account.Service
TreeGetter treegetter.TreeGetter
Store spacestorage.SpaceStorage
RemoveFunc RemoveObjectsFunc
prov deletedIdsProvider
}
type settingsDocument struct {
tree.ObjectTree
account account.Service
spaceId string
deletionState map[string]DeletionState
treeGetter treegetter.TreeGetter
store spacestorage.SpaceStorage
lastChangeId string
prov deletedIdsProvider
account account.Service
spaceId string
deletionState map[string]DeletionState
treeGetter treegetter.TreeGetter
store spacestorage.SpaceStorage
lastChangeId string
prov deletedIdsProvider
removeNotifyFunc RemoveObjectsFunc
}
func NewSettingsDocument(ctx context.Context, deps Deps, spaceId string) (doc SettingsDocument, err error) {
s := &settingsDocument{
account: deps.Account,
spaceId: spaceId,
deletionState: map[string]DeletionState{},
treeGetter: deps.TreeGetter,
store: deps.Store,
account: deps.Account,
spaceId: spaceId,
deletionState: map[string]DeletionState{},
treeGetter: deps.TreeGetter,
store: deps.Store,
removeNotifyFunc: deps.RemoveFunc,
}
s.ObjectTree, err = deps.BuildFunc(ctx, deps.Store.SpaceSettingsId(), s)
if err != nil {
@ -104,6 +109,8 @@ func (s *settingsDocument) toBeDeleted(ids []string) {
}
s.deletionState[id] = DeletionStateDeleted
}
// notifying about removal
s.removeNotifyFunc(ids)
}
func (s *settingsDocument) DeleteObject(id string) (err error) {

View File

@ -72,6 +72,7 @@ type Space interface {
DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error)
CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tree.ObjectTree, error)
BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (tree.ObjectTree, error)
DeleteTree(ctx context.Context, id string) (err error)
Close() error
}
@ -149,11 +150,13 @@ func (s *space) Init(ctx context.Context) (err error) {
Account: s.account,
TreeGetter: s.cache,
Store: s.storage,
RemoveFunc: s.diffService.RemoveObjects,
}
s.settingsDocument, err = settingsdocument.NewSettingsDocument(context.Background(), deps, s.id)
if err != nil {
return
}
s.settingsDocument.Init()
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool())
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument)
s.syncService.Init(objectGetter)
@ -230,6 +233,10 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsDocument.DeleteObject(id)
}
func (s *space) Close() error {
log.With(zap.String("id", s.id)).Debug("space is closing")
defer func() {

View File

@ -463,7 +463,7 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate
return
}
ot.tree.Iterate(ot.tree.RootId(), func(c *Change) (isContinue bool) {
ot.tree.Iterate(id, func(c *Change) (isContinue bool) {
var model any
// if already saved as a model
if c.Model != nil {