From a6507db992f2634a789055e7e60e9610f36267c0 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 12 Nov 2022 18:17:01 +0100 Subject: [PATCH] Add sync loop to settings document --- client/api/service.go | 13 ++++++++++ client/document/service.go | 1 + common/commonspace/diffservice/diffservice.go | 5 ++-- .../commonspace/diffservice/headnotifiable.go | 6 +++++ .../settingsdocument/settingsdocument.go | 17 ++++++------ common/commonspace/space.go | 23 +++++++++++++--- .../periodicsync}/periodicsync.go | 10 ++++--- .../periodicsync}/periodicsync_test.go | 26 +++++++++++++------ 8 files changed, 75 insertions(+), 26 deletions(-) rename common/{commonspace/diffservice => util/periodicsync}/periodicsync.go (83%) rename common/{commonspace/diffservice => util/periodicsync}/periodicsync_test.go (55%) diff --git a/client/api/service.go b/client/api/service.go index 1f7ed647..7155ea38 100644 --- a/client/api/service.go +++ b/client/api/service.go @@ -65,6 +65,7 @@ func (s *service) Run(ctx context.Context) (err error) { mux.HandleFunc("/loadSpace", s.loadSpace) mux.HandleFunc("/allSpaceIds", s.allSpaceIds) mux.HandleFunc("/createDocument", s.createDocument) + mux.HandleFunc("/deleteDocument", s.deleteDocument) mux.HandleFunc("/allDocumentIds", s.allDocumentIds) mux.HandleFunc("/addText", s.addText) mux.HandleFunc("/dumpDocumentTree", s.dumpDocumentTree) @@ -134,6 +135,18 @@ func (s *service) createDocument(w http.ResponseWriter, req *http.Request) { sendText(w, http.StatusOK, id) } +func (s *service) deleteDocument(w http.ResponseWriter, req *http.Request) { + query := req.URL.Query() + spaceId := query.Get("spaceId") + documentId := query.Get("documentId") + err := s.controller.DeleteDocument(spaceId, documentId) + if err != nil { + sendText(w, http.StatusInternalServerError, err.Error()) + return + } + sendText(w, http.StatusOK, documentId) +} + func (s *service) allDocumentIds(w http.ResponseWriter, req *http.Request) { query := req.URL.Query() spaceId := query.Get("spaceId") diff --git a/client/document/service.go b/client/document/service.go index 57e1cd98..d6f727ea 100644 --- a/client/document/service.go +++ b/client/document/service.go @@ -14,6 +14,7 @@ import ( type Service interface { app.Component CreateDocument(spaceId string) (id string, err error) + DeleteDocument(spaceId, documentId string) (err error) AllDocumentIds(spaceId string) (ids []string, err error) AddText(spaceId, documentId, text string) (err error) DumpDocumentTree(spaceId, documentId string) (dump string, err error) diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index a0369ece..8888a111 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -9,6 +9,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "go.uber.org/zap" "strings" ) @@ -25,7 +26,7 @@ type DiffService interface { type diffService struct { spaceId string - periodicSync PeriodicSync + periodicSync periodicsync.PeriodicSync storage storage.SpaceStorage diff ldiff.Diff log *zap.Logger @@ -46,7 +47,7 @@ func NewDiffService( l := log.With(zap.String("spaceId", spaceId)) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, l) - periodicSync := newPeriodicSync(syncPeriod, syncer, l) + periodicSync := periodicsync.NewPeriodicSync(syncPeriod, syncer.Sync, l) return &diffService{ spaceId: spaceId, diff --git a/common/commonspace/diffservice/headnotifiable.go b/common/commonspace/diffservice/headnotifiable.go index 80819bfe..8e987dc9 100644 --- a/common/commonspace/diffservice/headnotifiable.go +++ b/common/commonspace/diffservice/headnotifiable.go @@ -3,3 +3,9 @@ package diffservice type HeadNotifiable interface { UpdateHeads(id string, heads []string) } + +type HeadNotifiableFunc func(id string, heads []string) + +func (h HeadNotifiableFunc) UpdateHeads(id string, heads []string) { + h(id, heads) +} diff --git a/common/commonspace/settingsdocument/settingsdocument.go b/common/commonspace/settingsdocument/settingsdocument.go index a93a4fae..5ba52732 100644 --- a/common/commonspace/settingsdocument/settingsdocument.go +++ b/common/commonspace/settingsdocument/settingsdocument.go @@ -22,6 +22,7 @@ type SettingsDocument interface { tree.ObjectTree Refresh() DeleteObject(id string) (err error) + NotifyObjectUpdate(id string) } type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) @@ -33,7 +34,8 @@ type Deps struct { TreeGetter treegetter.TreeGetter Store spacestorage.SpaceStorage RemoveFunc RemoveObjectsFunc - prov deletedIdsProvider + // prov exists mainly for the ease of testing + prov deletedIdsProvider } type settingsDocument struct { @@ -70,9 +72,10 @@ func NewSettingsDocument(ctx context.Context, deps Deps, spaceId string) (doc Se return } -func (s *settingsDocument) NotifyHeadsUpdate(id string) { +func (s *settingsDocument) NotifyObjectUpdate(id string) { s.deletionStateLock.Lock() - if _, exists := s.deletionState[id]; exists { + if state, exists := s.deletionState[id]; exists && state == DeletionStateDeleted { + // marking the document as queued, that means that document appeared later than we checked the storage for deletion s.deletionState[id] = DeletionStateQueued } s.deletionStateLock.Unlock() @@ -109,8 +112,7 @@ func (s *settingsDocument) toBeDeleted(ids []string) { s.deletionStateLock.Unlock() continue } - // if not already deleted - // TODO: here we can possibly have problems if the document is synced later, maybe we should block syncing with deleted documents + // if the document is not in storage it can happen that it will appear later, for that we have NotifyObjectUpdate method if _, err := s.store.TreeStorage(id); err == nil { s.deletionState[id] = DeletionStateQueued s.deletionStateLock.Unlock() @@ -121,14 +123,13 @@ func (s *settingsDocument) toBeDeleted(ids []string) { // TODO: add logging continue } - // TODO: add loop to double check that everything that should be deleted is actually deleted s.deletionStateLock.Lock() } - + s.deletionState[id] = DeletionStateDeleted s.deletionStateLock.Unlock() } - // notifying about removal + // notifying diff service that the ids should not be synced anymore s.removeNotifyFunc(ids) } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 55d9f338..f8502ff5 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -19,6 +19,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync" "github.com/zeebo/errs" "go.uber.org/zap" "sync" @@ -41,7 +42,10 @@ type SpaceCreatePayload struct { ReplicationKey uint64 } -const SpaceTypeDerived = "derived.space" +const ( + SpaceTypeDerived = "derived.space" + SettingsSyncPeriodSeconds = 10 +) type SpaceDerivePayload struct { SigningKey signingkey.PrivKey @@ -92,6 +96,8 @@ type space struct { aclList *syncacl.SyncACL configuration nodeconf.Configuration settingsDocument settingsdocument.SettingsDocument + settingsSync periodicsync.PeriodicSync + headNotifiable diffservice.HeadNotifiable isClosed atomic.Bool } @@ -156,11 +162,19 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } + s.headNotifiable = diffservice.HeadNotifiableFunc(func(id string, heads []string) { + s.diffService.UpdateHeads(id, heads) + s.settingsDocument.NotifyObjectUpdate(id) + }) s.settingsDocument.Refresh() s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool()) objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument) s.syncService.Init(objectGetter) s.diffService.Init(initialIds) + s.settingsSync = periodicsync.NewPeriodicSync(SettingsSyncPeriodSeconds, func(ctx context.Context) error { + s.settingsDocument.Refresh() + return nil + }, log) return nil } @@ -190,7 +204,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay Payload: payload, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.diffService, + HeadNotifiable: s.headNotifiable, Listener: listener, AclList: s.aclList, CreateStorage: s.storage.CreateTreeStorage, @@ -208,7 +222,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay Payload: payload, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.diffService, + HeadNotifiable: s.headNotifiable, Listener: listener, AclList: s.aclList, CreateStorage: s.storage.CreateTreeStorage, @@ -225,7 +239,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene SpaceId: s.id, StreamPool: s.syncService.StreamPool(), Configuration: s.configuration, - HeadNotifiable: s.diffService, + HeadNotifiable: s.headNotifiable, Listener: listener, AclList: s.aclList, SpaceStorage: s.storage, @@ -250,6 +264,7 @@ func (s *space) Close() error { if err := s.syncService.Close(); err != nil { mError.Add(err) } + s.settingsSync.Close() if err := s.settingsDocument.Close(); err != nil { mError.Add(err) } diff --git a/common/commonspace/diffservice/periodicsync.go b/common/util/periodicsync/periodicsync.go similarity index 83% rename from common/commonspace/diffservice/periodicsync.go rename to common/util/periodicsync/periodicsync.go index a74b25cf..37647810 100644 --- a/common/commonspace/diffservice/periodicsync.go +++ b/common/util/periodicsync/periodicsync.go @@ -1,4 +1,4 @@ -package diffservice +package periodicsync import ( "context" @@ -11,7 +11,9 @@ type PeriodicSync interface { Close() } -func newPeriodicSync(periodSeconds int, syncer DiffSyncer, l *zap.Logger) *periodicSync { +type SyncerFunc func(ctx context.Context) error + +func NewPeriodicSync(periodSeconds int, syncer SyncerFunc, l *zap.Logger) PeriodicSync { ctx, cancel := context.WithCancel(context.Background()) return &periodicSync{ syncer: syncer, @@ -25,7 +27,7 @@ func newPeriodicSync(periodSeconds int, syncer DiffSyncer, l *zap.Logger) *perio type periodicSync struct { log *zap.Logger - syncer DiffSyncer + syncer SyncerFunc syncCtx context.Context syncCancel context.CancelFunc syncLoopDone chan struct{} @@ -42,7 +44,7 @@ func (p *periodicSync) syncLoop(periodSeconds int) { doSync := func() { ctx, cancel := context.WithTimeout(p.syncCtx, time.Minute) defer cancel() - if err := p.syncer.Sync(ctx); err != nil { + if err := p.syncer(ctx); err != nil { p.log.Warn("periodic sync error", zap.Error(err)) } } diff --git a/common/commonspace/diffservice/periodicsync_test.go b/common/util/periodicsync/periodicsync_test.go similarity index 55% rename from common/commonspace/diffservice/periodicsync_test.go rename to common/util/periodicsync/periodicsync_test.go index 068da366..c4463e41 100644 --- a/common/commonspace/diffservice/periodicsync_test.go +++ b/common/util/periodicsync/periodicsync_test.go @@ -1,9 +1,10 @@ -package diffservice +package periodicsync import ( + "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice/mock_diffservice" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" "testing" "time" ) @@ -14,25 +15,34 @@ func TestPeriodicSync_Run(t *testing.T) { defer ctrl.Finish() l := logger.NewNamed("sync") - diffSyncer := mock_diffservice.NewMockDiffSyncer(ctrl) + t.Run("diff syncer 1 time", func(t *testing.T) { secs := 0 - pSync := newPeriodicSync(secs, diffSyncer, l) - - diffSyncer.EXPECT().Sync(gomock.Any()).Times(1).Return(nil) + times := 0 + diffSyncer := func(ctx context.Context) (err error) { + times += 1 + return nil + } + pSync := NewPeriodicSync(secs, diffSyncer, l) pSync.Run() pSync.Close() + require.Equal(t, 1, times) }) t.Run("diff syncer 2 times", func(t *testing.T) { secs := 1 - pSync := newPeriodicSync(secs, diffSyncer, l) - diffSyncer.EXPECT().Sync(gomock.Any()).Times(2).Return(nil) + times := 0 + diffSyncer := func(ctx context.Context) (err error) { + times += 1 + return nil + } + pSync := NewPeriodicSync(secs, diffSyncer, l) pSync.Run() time.Sleep(time.Second * time.Duration(secs)) pSync.Close() + require.Equal(t, 2, times) }) }