WIP refactoring syncstatus headsync

This commit is contained in:
mcrakhman 2022-12-25 15:08:26 +01:00 committed by Mikhail Iudin
parent 934f413807
commit 53e945b956
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
32 changed files with 390 additions and 286 deletions

View File

@ -2,15 +2,15 @@ package clientspace
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"go.uber.org/zap"
)
type statusReceiver struct {
}
func (s *statusReceiver) UpdateTree(ctx context.Context, treeId string, status statusservice.SyncStatus) (err error) {
log.With(zap.String("treeId", treeId), zap.Bool("synced", status == statusservice.SyncStatusSynced)).
func (s *statusReceiver) UpdateTree(ctx context.Context, treeId string, status syncstatus.SyncStatus) (err error) {
log.With(zap.String("treeId", treeId), zap.Bool("synced", status == syncstatus.SyncStatusSynced)).
Debug("updating sync status")
return nil
}

View File

@ -8,7 +8,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
)
@ -17,7 +16,7 @@ type Service interface {
CreateDocument(spaceId string) (id string, err error)
DeleteDocument(spaceId, documentId string) (err error)
AllDocumentIds(spaceId string) (ids []string, err error)
AllDocumentHeads(spaceId string) (ids []diffservice.TreeHeads, err error)
AllDocumentHeads(spaceId string) (ids []headsync.TreeHeads, err error)
AddText(spaceId, documentId, text string, isSnapshot bool) (root, head string, err error)
DumpDocumentTree(spaceId, documentId string) (dump string, err error)
TreeParams(spaceId, documentId string) (root string, head []string, err error)
@ -74,7 +73,7 @@ func (s *service) AllDocumentIds(spaceId string) (ids []string, err error) {
return
}
func (s *service) AllDocumentHeads(spaceId string) (ids []diffservice.TreeHeads, err error) {
func (s *service) AllDocumentHeads(spaceId string) (ids []headsync.TreeHeads, err error) {
space, err := s.spaceService.GetSpace(context.Background(), spaceId)
if err != nil {
return

View File

@ -3,7 +3,7 @@ package commonspace
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectgetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
)
@ -12,10 +12,10 @@ type commonSpaceGetter struct {
spaceId string
aclList *syncacl.SyncACL
treeGetter treegetter.TreeGetter
settings settingsdocument.SettingsDocument
settings settings.SettingsObject
}
func newCommonSpaceGetter(spaceId string, aclList *syncacl.SyncACL, treeGetter treegetter.TreeGetter, settings settingsdocument.SettingsDocument) objectgetter.ObjectGetter {
func newCommonSpaceGetter(spaceId string, aclList *syncacl.SyncACL, treeGetter treegetter.TreeGetter, settings settings.SettingsObject) objectgetter.ObjectGetter {
return &commonSpaceGetter{
spaceId: spaceId,
aclList: aclList,

View File

@ -1,12 +1,11 @@
package diffservice
package headsync
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
@ -31,7 +30,7 @@ func newDiffSyncer(
cache treegetter.TreeGetter,
storage storage.SpaceStorage,
clientFactory spacesyncproto.ClientFactory,
statusService statusservice.StatusService,
syncStatus syncstatus.SyncStatusUpdater,
log *zap.Logger) DiffSyncer {
return &diffSyncer{
diff: diff,
@ -41,7 +40,7 @@ func newDiffSyncer(
confConnector: confConnector,
clientFactory: clientFactory,
log: log,
statusService: statusService,
syncStatus: syncStatus,
}
}
@ -54,7 +53,7 @@ type diffSyncer struct {
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
deletionState deletionstate.DeletionState
statusService statusservice.StatusService
syncStatus syncstatus.SyncStatusUpdater
}
func (d *diffSyncer) Init(deletionState deletionstate.DeletionState) {
@ -96,26 +95,28 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
var (
cl = d.clientFactory.Client(p)
rdiff = remotediff.NewRemoteDiff(d.spaceId, cl)
stateCounter uint64 = 0
cl = d.clientFactory.Client(p)
rdiff = NewRemoteDiff(d.spaceId, cl)
stateCounter = d.syncStatus.StateCounter()
)
stateCounter = d.statusService.StateCounter()
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
err = rpcerr.Unwrap(err)
if err != nil && err != spacesyncproto.ErrSpaceMissing {
d.statusService.SetNodesOnline(p.Id(), false)
d.syncStatus.SetNodesOnline(p.Id(), false)
return err
}
d.statusService.SetNodesOnline(p.Id(), true)
d.syncStatus.SetNodesOnline(p.Id(), true)
if err == spacesyncproto.ErrSpaceMissing {
return d.sendPushSpaceRequest(ctx, cl)
}
totalLen := len(newIds) + len(changedIds) + len(removedIds)
// not syncing ids which were removed through settings document
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
d.statusService.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
ctx = peer.CtxWithPeerId(ctx, p.Id())
d.pingTreesInCache(ctx, filteredIds)

View File

@ -1,15 +1,14 @@
package diffservice
package headsync
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto/mock_spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter/mock_treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf/mock_nodeconf"
@ -111,7 +110,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
spaceId := "spaceId"
aclRootId := "aclRootId"
l := logger.NewNamed(spaceId)
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, statusservice.NewNoOpStatusService(), l)
diffSyncer := newDiffSyncer(spaceId, diffMock, connectorMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l)
delState.EXPECT().AddObserver(gomock.Any())
diffSyncer.Init(delState)
@ -120,7 +119,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return([]string{"new"}, []string{"changed"}, nil, nil)
delState.EXPECT().FilterJoin(gomock.Any()).Return([]string{"new", "changed"})
for _, arg := range []string{"new", "changed"} {
@ -175,7 +174,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing)
stMock.EXPECT().ACLStorage().Return(aclStorageMock, nil)
@ -199,7 +198,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
GetResponsiblePeers(gomock.Any(), spaceId).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remotediff.NewRemoteDiff(spaceId, clientMock))).
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return(nil, nil, nil, spacesyncproto.ErrUnexpected)
require.NoError(t, diffSyncer.Sync(ctx))

View File

@ -1,13 +1,12 @@
//go:generate mockgen -destination mock_diffservice/mock_diffservice.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice DiffSyncer
package diffservice
//go:generate mockgen -destination mock_headsync/mock_headsync.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/headsync DiffSyncer
package headsync
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff"
@ -22,7 +21,7 @@ type TreeHeads struct {
Heads []string
}
type DiffService interface {
type HeadSync interface {
UpdateHeads(id string, heads []string)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
RemoveObjects(ids []string)
@ -33,7 +32,7 @@ type DiffService interface {
Close() (err error)
}
type diffService struct {
type headSync struct {
spaceId string
periodicSync periodicsync.PeriodicSync
storage storage.SpaceStorage
@ -44,22 +43,22 @@ type diffService struct {
syncPeriod int
}
func NewDiffService(
func NewHeadSync(
spaceId string,
syncPeriod int,
storage storage.SpaceStorage,
confConnector nodeconf.ConfConnector,
cache treegetter.TreeGetter,
statusService statusservice.StatusService,
log *zap.Logger) DiffService {
syncStatus syncstatus.SyncStatusUpdater,
log *zap.Logger) HeadSync {
diff := ldiff.New(16, 16)
l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, statusService, l)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
return &diffService{
return &headSync{
spaceId: spaceId,
storage: storage,
syncer: syncer,
@ -70,25 +69,25 @@ func NewDiffService(
}
}
func (d *diffService) Init(objectIds []string, deletionState deletionstate.DeletionState) {
func (d *headSync) Init(objectIds []string, deletionState deletionstate.DeletionState) {
d.fillDiff(objectIds)
d.syncer.Init(deletionState)
d.periodicSync.Run()
}
func (d *diffService) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
return remotediff.HandleRangeRequest(ctx, d.diff, req)
func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
return HandleRangeRequest(ctx, d.diff, req)
}
func (d *diffService) UpdateHeads(id string, heads []string) {
func (d *headSync) UpdateHeads(id string, heads []string) {
d.syncer.UpdateHeads(id, heads)
}
func (d *diffService) AllIds() []string {
func (d *headSync) AllIds() []string {
return d.diff.Ids()
}
func (d *diffService) DebugAllHeads() (res []TreeHeads) {
func (d *headSync) DebugAllHeads() (res []TreeHeads) {
els := d.diff.Elements()
for _, el := range els {
idHead := TreeHeads{
@ -100,16 +99,16 @@ func (d *diffService) DebugAllHeads() (res []TreeHeads) {
return
}
func (d *diffService) RemoveObjects(ids []string) {
func (d *headSync) RemoveObjects(ids []string) {
d.syncer.RemoveObjects(ids)
}
func (d *diffService) Close() (err error) {
func (d *headSync) Close() (err error) {
d.periodicSync.Close()
return nil
}
func (d *diffService) fillDiff(objectIds []string) {
func (d *headSync) fillDiff(objectIds []string) {
var els = make([]ldiff.Element, 0, len(objectIds))
for _, id := range objectIds {
st, err := d.storage.TreeStorage(id)

View File

@ -1,9 +1,9 @@
package diffservice
package headsync
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice/mock_diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/headsync/mock_headsync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
mock_storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ldiff"
@ -23,12 +23,12 @@ func TestDiffService(t *testing.T) {
storageMock := mock_storage.NewMockSpaceStorage(ctrl)
treeStorageMock := mock_storage2.NewMockTreeStorage(ctrl)
diffMock := mock_ldiff.NewMockDiff(ctrl)
syncer := mock_diffservice.NewMockDiffSyncer(ctrl)
syncer := mock_headsync.NewMockDiffSyncer(ctrl)
delState := mock_deletionstate.NewMockDeletionState(ctrl)
syncPeriod := 1
initId := "initId"
service := &diffService{
service := &headSync{
spaceId: spaceId,
storage: storageMock,
periodicSync: pSyncMock,

View File

@ -1,14 +1,14 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice (interfaces: DiffSyncer)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/headsync (interfaces: DiffSyncer)
// Package mock_diffservice is a generated GoMock package.
package mock_diffservice
// Package mock_headsync is a generated GoMock package.
package mock_headsync
import (
context "context"
reflect "reflect"
deletionstate "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
deletionstate "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
gomock "github.com/golang/mock/gomock"
)

View File

@ -1,4 +1,4 @@
package remotediff
package headsync
import (
"context"

View File

@ -1,4 +1,4 @@
package remotediff
package headsync
import (
"context"

View File

@ -15,10 +15,10 @@ type rpcHandler struct {
}
func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
return r.s.DiffService().HandleRangeRequest(ctx, req)
return r.s.HeadSync().HandleRangeRequest(ctx, req)
}
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
// TODO: if needed we can launch full sync here
return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream)
return r.s.ObjectSync().StreamPool().AddAndReadStreamSync(stream)
}

View File

@ -5,11 +5,11 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/headsync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
@ -110,19 +110,20 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast()
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
statusService := statusservice.NewNoOpStatusService()
syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) {
statusService = statusservice.NewStatusService(st.Id(), lastConfiguration, st)
// TODO: move it to the client package and add possibility to inject SyncStatusProvider from the client
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
}
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, statusService, log)
syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, syncStatus, log)
objectSync := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
sp := &space{
id: id,
syncService: syncService,
diffService: diffService,
statusService: statusService,
objectSync: objectSync,
headSync: headSync,
syncStatus: syncStatus,
cache: s.treeGetter,
account: s.account,
configuration: lastConfiguration,

View File

@ -1,4 +1,4 @@
package settingsdocument
package settings
import (
"context"

View File

@ -1,8 +1,8 @@
package settingsdocument
package settings
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
"go.uber.org/zap"

View File

@ -1,8 +1,8 @@
package settingsdocument
package settings
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter/mock_treegetter"

View File

@ -1,4 +1,4 @@
//go:generate mockgen -destination mock_deletionstate/mock_deletionstate.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate DeletionState
//go:generate mockgen -destination mock_deletionstate/mock_deletionstate.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate DeletionState
package deletionstate
import (

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate (interfaces: DeletionState)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate (interfaces: DeletionState)
// Package mock_deletionstate is a generated GoMock package.
package mock_deletionstate
@ -7,7 +7,7 @@ package mock_deletionstate
import (
reflect "reflect"
deletionstate "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
deletionstate "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
gomock "github.com/golang/mock/gomock"
)

View File

@ -1,4 +1,4 @@
package settingsdocument
package settings
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"

View File

@ -1,4 +1,4 @@
package settingsdocument
package settings
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"

View File

@ -0,0 +1,86 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings (interfaces: DeletedIdsProvider,Deleter)
// Package mock_settings is a generated GoMock package.
package mock_settings
import (
reflect "reflect"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
gomock "github.com/golang/mock/gomock"
)
// MockDeletedIdsProvider is a mock of DeletedIdsProvider interface.
type MockDeletedIdsProvider struct {
ctrl *gomock.Controller
recorder *MockDeletedIdsProviderMockRecorder
}
// MockDeletedIdsProviderMockRecorder is the mock recorder for MockDeletedIdsProvider.
type MockDeletedIdsProviderMockRecorder struct {
mock *MockDeletedIdsProvider
}
// NewMockDeletedIdsProvider creates a new mock instance.
func NewMockDeletedIdsProvider(ctrl *gomock.Controller) *MockDeletedIdsProvider {
mock := &MockDeletedIdsProvider{ctrl: ctrl}
mock.recorder = &MockDeletedIdsProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDeletedIdsProvider) EXPECT() *MockDeletedIdsProviderMockRecorder {
return m.recorder
}
// ProvideIds mocks base method.
func (m *MockDeletedIdsProvider) ProvideIds(arg0 tree.ObjectTree, arg1 string) ([]string, string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ProvideIds", arg0, arg1)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(string)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// ProvideIds indicates an expected call of ProvideIds.
func (mr *MockDeletedIdsProviderMockRecorder) ProvideIds(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProvideIds", reflect.TypeOf((*MockDeletedIdsProvider)(nil).ProvideIds), arg0, arg1)
}
// MockDeleter is a mock of Deleter interface.
type MockDeleter struct {
ctrl *gomock.Controller
recorder *MockDeleterMockRecorder
}
// MockDeleterMockRecorder is the mock recorder for MockDeleter.
type MockDeleterMockRecorder struct {
mock *MockDeleter
}
// NewMockDeleter creates a new mock instance.
func NewMockDeleter(ctrl *gomock.Controller) *MockDeleter {
mock := &MockDeleter{ctrl: ctrl}
mock.recorder = &MockDeleterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDeleter) EXPECT() *MockDeleterMockRecorder {
return m.recorder
}
// Delete mocks base method.
func (m *MockDeleter) Delete() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Delete")
}
// Delete indicates an expected call of Delete.
func (mr *MockDeleterMockRecorder) Delete() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete))
}

View File

@ -1,8 +1,8 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument (interfaces: DeletedIdsProvider,Deleter)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings (interfaces: DeletedIdsProvider,Deleter)
// Package mock_settingsdocument is a generated GoMock package.
package mock_settingsdocument
// Package mock_settings is a generated GoMock package.
package mock_settings
import (
reflect "reflect"

View File

@ -1,12 +1,12 @@
//go:generate mockgen -destination mock_settingsdocument/mock_settingsdocument.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument DeletedIdsProvider,Deleter
package settingsdocument
//go:generate mockgen -destination mock_settings/mock_settings.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings DeletedIdsProvider,Deleter
package settings
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
@ -15,18 +15,18 @@ import (
"go.uber.org/zap"
)
var log = logger.NewNamed("commonspace.settingsdocument")
var log = logger.NewNamed("commonspace.settings")
type SettingsDocument interface {
type SettingsObject interface {
synctree.SyncTree
Init(ctx context.Context) (err error)
DeleteObject(id string) (err error)
}
var (
ErrDeleteSelf = errors.New("cannot delete seld")
ErrAlreadyDeleted = errors.New("the document is already deleted")
ErrDocDoesNotExist = errors.New("the document does not exist")
ErrDeleteSelf = errors.New("cannot delete self")
ErrAlreadyDeleted = errors.New("the object is already deleted")
ErrObjDoesNotExist = errors.New("the object does not exist")
)
type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error)
@ -42,7 +42,7 @@ type Deps struct {
del Deleter
}
type settingsDocument struct {
type settingsObject struct {
synctree.SyncTree
account account.Service
spaceId string
@ -56,7 +56,7 @@ type settingsDocument struct {
lastChangeId string
}
func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument) {
func NewSettingsObject(deps Deps, spaceId string) (obj SettingsObject) {
var deleter Deleter
if deps.del == nil {
deleter = newDeleter(deps.Store, deps.DeletionState, deps.TreeGetter)
@ -71,7 +71,7 @@ func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument) {
loop.notify()
})
s := &settingsDocument{
s := &settingsObject{
loop: loop,
spaceId: spaceId,
account: deps.Account,
@ -88,11 +88,11 @@ func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument) {
s.prov = deps.prov
}
doc = s
obj = s
return
}
func (s *settingsDocument) updateIds(tr tree.ObjectTree, lastChangeId string) {
func (s *settingsObject) updateIds(tr tree.ObjectTree, lastChangeId string) {
s.lastChangeId = lastChangeId
ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId)
if err != nil {
@ -106,17 +106,17 @@ func (s *settingsDocument) updateIds(tr tree.ObjectTree, lastChangeId string) {
}
// Update is called as part of UpdateListener interface
func (s *settingsDocument) Update(tr tree.ObjectTree) {
func (s *settingsObject) Update(tr tree.ObjectTree) {
s.updateIds(tr, s.lastChangeId)
}
// Rebuild is called as part of UpdateListener interface (including when the object is built for the first time, e.g. on Init call)
func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
func (s *settingsObject) Rebuild(tr tree.ObjectTree) {
// at initial build "s" may not contain the object tree, so it is safer to provide it from the function parameter
s.updateIds(tr, "")
}
func (s *settingsDocument) Init(ctx context.Context) (err error) {
func (s *settingsObject) Init(ctx context.Context) (err error) {
settingsId := s.store.SpaceSettingsId()
log.Debug("space settings id", zap.String("id", settingsId))
s.SyncTree, err = s.buildFunc(ctx, settingsId, s)
@ -128,12 +128,12 @@ func (s *settingsDocument) Init(ctx context.Context) (err error) {
return
}
func (s *settingsDocument) Close() error {
func (s *settingsObject) Close() error {
s.loop.Close()
return s.SyncTree.Close()
}
func (s *settingsDocument) DeleteObject(id string) (err error) {
func (s *settingsObject) DeleteObject(id string) (err error) {
s.Lock()
defer s.Unlock()
if s.ID() == id {
@ -146,7 +146,7 @@ func (s *settingsDocument) DeleteObject(id string) (err error) {
}
_, err = s.store.TreeStorage(id)
if err != nil {
err = ErrDocDoesNotExist
err = ErrObjDoesNotExist
return
}

View File

@ -1,10 +1,10 @@
package settingsdocument
package settings
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account/mock_account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/mock_settingsdocument"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate/mock_deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/mock_settings"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
@ -42,12 +42,12 @@ func (t *testSyncTreeMock) Unlock() {
type settingsFixture struct {
spaceId string
docId string
doc *settingsDocument
doc *settingsObject
ctrl *gomock.Controller
treeGetter *mock_treegetter.MockTreeGetter
spaceStorage *mock_storage.MockSpaceStorage
provider *mock_settingsdocument.MockDeletedIdsProvider
deleter *mock_settingsdocument.MockDeleter
provider *mock_settings.MockDeletedIdsProvider
deleter *mock_settings.MockDeleter
syncTree *mock_synctree.MockSyncTree
delState *mock_deletionstate.MockDeletionState
account *mock_account.MockService
@ -55,21 +55,21 @@ type settingsFixture struct {
func newSettingsFixture(t *testing.T) *settingsFixture {
spaceId := "spaceId"
docId := "documentId"
objectId := "objectId"
ctrl := gomock.NewController(t)
acc := mock_account.NewMockService(ctrl)
treeGetter := mock_treegetter.NewMockTreeGetter(ctrl)
st := mock_storage.NewMockSpaceStorage(ctrl)
delState := mock_deletionstate.NewMockDeletionState(ctrl)
prov := mock_settingsdocument.NewMockDeletedIdsProvider(ctrl)
prov := mock_settings.NewMockDeletedIdsProvider(ctrl)
syncTree := mock_synctree.NewMockSyncTree(ctrl)
del := mock_settingsdocument.NewMockDeleter(ctrl)
del := mock_settings.NewMockDeleter(ctrl)
delState.EXPECT().AddObserver(gomock.Any())
buildFunc := BuildTreeFunc(func(ctx context.Context, id string, listener updatelistener.UpdateListener) (synctree.SyncTree, error) {
require.Equal(t, docId, id)
require.Equal(t, objectId, id)
return newTestObjMock(syncTree), nil
})
@ -82,10 +82,10 @@ func newSettingsFixture(t *testing.T) *settingsFixture {
prov: prov,
del: del,
}
doc := NewSettingsDocument(deps, spaceId).(*settingsDocument)
doc := NewSettingsObject(deps, spaceId).(*settingsObject)
return &settingsFixture{
spaceId: spaceId,
docId: docId,
docId: objectId,
doc: doc,
ctrl: ctrl,
treeGetter: treeGetter,
@ -102,7 +102,7 @@ func (fx *settingsFixture) stop() {
fx.ctrl.Finish()
}
func TestSettingsDocument_Init(t *testing.T) {
func TestSettingsObject_Init(t *testing.T) {
fx := newSettingsFixture(t)
defer fx.stop()
@ -116,7 +116,7 @@ func TestSettingsDocument_Init(t *testing.T) {
require.NoError(t, err)
}
func TestSettingsDocument_DeleteObject(t *testing.T) {
func TestSettingsObject_DeleteObject(t *testing.T) {
fx := newSettingsFixture(t)
defer fx.stop()
@ -164,7 +164,7 @@ func TestSettingsDocument_DeleteObject(t *testing.T) {
require.NoError(t, err)
}
func TestSettingsDocument_Rebuild(t *testing.T) {
func TestSettingsObject_Rebuild(t *testing.T) {
fx := newSettingsFixture(t)
defer fx.stop()

View File

@ -5,14 +5,14 @@ import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/headsync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
@ -71,7 +71,7 @@ type Space interface {
Init(ctx context.Context) error
StoredIds() []string
DebugAllHeads() []diffservice.TreeHeads
DebugAllHeads() []headsync.TreeHeads
Description() (SpaceDescription, error)
SpaceSyncRpc() RpcHandler
@ -81,7 +81,7 @@ type Space interface {
BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (tree.ObjectTree, error)
DeleteTree(ctx context.Context, id string) (err error)
StatusService() statusservice.StatusService
StatusService() syncstatus.SyncStatusUpdater
Close() error
}
@ -93,22 +93,22 @@ type space struct {
rpc *rpcHandler
syncService syncservice.SyncService
diffService diffservice.DiffService
statusService statusservice.StatusService
storage storage.SpaceStorage
cache treegetter.TreeGetter
account account.Service
aclList *syncacl.SyncACL
configuration nodeconf.Configuration
settingsDocument settingsdocument.SettingsDocument
objectSync syncservice.SyncService
headSync headsync.HeadSync
syncStatus syncstatus.SyncStatusUpdater
storage storage.SpaceStorage
cache treegetter.TreeGetter
account account.Service
aclList *syncacl.SyncACL
configuration nodeconf.Configuration
settingsObject settings.SettingsObject
isClosed atomic.Bool
treesUsed atomic.Int32
}
func (s *space) LastUsage() time.Time {
return s.syncService.LastUsage()
return s.objectSync.LastUsage()
}
func (s *space) Locked() bool {
@ -164,10 +164,10 @@ func (s *space) Init(ctx context.Context) (err error) {
if err != nil {
return
}
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool())
s.aclList = syncacl.NewSyncACL(aclList, s.objectSync.StreamPool())
deletionState := deletionstate.NewDeletionState(s.storage)
deps := settingsdocument.Deps{
deps := settings.Deps{
BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) {
res, err := s.BuildTree(ctx, id, listener)
if err != nil {
@ -181,16 +181,16 @@ func (s *space) Init(ctx context.Context) (err error) {
Store: s.storage,
DeletionState: deletionState,
}
s.settingsDocument = settingsdocument.NewSettingsDocument(deps, s.id)
s.settingsObject = settings.NewSettingsObject(deps, s.id)
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsDocument)
s.syncService.Init(objectGetter)
s.diffService.Init(initialIds, deletionState)
err = s.settingsDocument.Init(ctx)
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject)
s.objectSync.Init(objectGetter)
s.headSync.Init(initialIds, deletionState)
err = s.settingsObject.Init(ctx)
if err != nil {
return
}
s.statusService.Run()
s.syncStatus.Run()
return nil
}
@ -199,24 +199,24 @@ func (s *space) SpaceSyncRpc() RpcHandler {
return s.rpc
}
func (s *space) SyncService() syncservice.SyncService {
return s.syncService
func (s *space) ObjectSync() syncservice.SyncService {
return s.objectSync
}
func (s *space) DiffService() diffservice.DiffService {
return s.diffService
func (s *space) HeadSync() headsync.HeadSync {
return s.headSync
}
func (s *space) StatusService() statusservice.StatusService {
return s.statusService
func (s *space) StatusService() syncstatus.SyncStatusUpdater {
return s.syncStatus
}
func (s *space) StoredIds() []string {
return s.diffService.AllIds()
return s.headSync.AllIds()
}
func (s *space) DebugAllHeads() []diffservice.TreeHeads {
return s.diffService.DebugAllHeads()
func (s *space) DebugAllHeads() []headsync.TreeHeads {
return s.headSync.DebugAllHeads()
}
func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePayload) (id string, err error) {
@ -227,12 +227,12 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
SyncService: s.syncService,
SyncService: s.objectSync,
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
StatusService: s.statusService,
HeadNotifiable: s.diffService,
SyncStatus: s.syncStatus,
HeadNotifiable: s.headSync,
}
return synctree.DeriveSyncTree(ctx, deps)
}
@ -245,12 +245,12 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
SyncService: s.syncService,
SyncService: s.objectSync,
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
StatusService: s.statusService,
HeadNotifiable: s.diffService,
SyncStatus: s.syncStatus,
HeadNotifiable: s.headSync,
}
return synctree.CreateSyncTree(ctx, deps)
}
@ -262,20 +262,20 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
}
deps := synctree.BuildDeps{
SpaceId: s.id,
SyncService: s.syncService,
SyncService: s.objectSync,
Configuration: s.configuration,
HeadNotifiable: s.diffService,
HeadNotifiable: s.headSync,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
TreeUsage: &s.treesUsed,
StatusService: s.statusService,
SyncStatus: s.syncStatus,
}
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsDocument.DeleteObject(id)
return s.settingsObject.DeleteObject(id)
}
func (s *space) Close() error {
@ -285,13 +285,13 @@ func (s *space) Close() error {
log.With(zap.String("id", s.id)).Debug("space closed")
}()
var mError errs.Group
if err := s.diffService.Close(); err != nil {
if err := s.headSync.Close(); err != nil {
mError.Add(err)
}
if err := s.syncService.Close(); err != nil {
if err := s.objectSync.Close(); err != nil {
mError.Add(err)
}
if err := s.settingsDocument.Close(); err != nil {
if err := s.settingsObject.Close(); err != nil {
mError.Add(err)
}
if err := s.aclList.Close(); err != nil {
@ -300,7 +300,7 @@ func (s *space) Close() error {
if err := s.storage.Close(); err != nil {
mError.Add(err)
}
if err := s.statusService.Close(); err != nil {
if err := s.syncStatus.Close(); err != nil {
mError.Add(err)
}

View File

@ -1,40 +0,0 @@
package statusservice
type noOpStatusService struct{}
func NewNoOpStatusService() StatusService {
return &noOpStatusService{}
}
func (n *noOpStatusService) HeadsChange(treeId string, heads []string) {
}
func (n *noOpStatusService) HeadsReceive(senderId, treeId string, heads []string) {
}
func (n *noOpStatusService) Watch(treeId string) (err error) {
return
}
func (n *noOpStatusService) Unwatch(treeId string) {
}
func (n *noOpStatusService) SetNodesOnline(senderId string, online bool) {
}
func (n *noOpStatusService) StateCounter() uint64 {
return 0
}
func (n *noOpStatusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
}
func (n *noOpStatusService) SetUpdateReceiver(updater UpdateReceiver) {
}
func (n *noOpStatusService) Run() {
}
func (n *noOpStatusService) Close() error {
return nil
}

View File

@ -0,0 +1,30 @@
package syncstatus
type noOpSyncStatus struct{}
func NewNoOpSyncStatus() SyncStatusUpdater {
return &noOpSyncStatus{}
}
func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) {
}
func (n *noOpSyncStatus) HeadsReceive(senderId, treeId string, heads []string) {
}
func (n *noOpSyncStatus) SetNodesOnline(senderId string, online bool) {
}
func (n *noOpSyncStatus) StateCounter() uint64 {
return 0
}
func (n *noOpSyncStatus) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
}
func (n *noOpSyncStatus) Run() {
}
func (n *noOpSyncStatus) Close() error {
return nil
}

View File

@ -1,4 +1,4 @@
package statusservice
package syncstatus
import (
"context"
@ -15,31 +15,40 @@ import (
)
const (
statusServiceUpdateInterval = 5
statusServiceTimeout = time.Second
syncUpdateInterval = 5
syncTimeout = time.Second
)
var log = logger.NewNamed("commonspace.statusservice")
var log = logger.NewNamed("commonspace.syncstatus")
type UpdateReceiver interface {
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
UpdateNodeConnection(online bool)
}
type StatusService interface {
type SyncStatusUpdater interface {
HeadsChange(treeId string, heads []string)
HeadsReceive(senderId, treeId string, heads []string)
Watch(treeId string) (err error)
Unwatch(treeId string)
SetNodesOnline(senderId string, online bool)
StateCounter() uint64
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
SetUpdateReceiver(updater UpdateReceiver)
Run()
Close() error
}
type SyncStatusWatcher interface {
Watch(treeId string) (err error)
Unwatch(treeId string)
SetUpdateReceiver(updater UpdateReceiver)
}
type SyncStatusProvider interface {
SyncStatusUpdater
SyncStatusWatcher
}
type SyncStatus int
const (
@ -60,7 +69,7 @@ type treeStatus struct {
heads []string
}
type statusService struct {
type syncStatusProvider struct {
sync.Mutex
configuration nodeconf.Configuration
periodicSync periodicsync.PeriodicSync
@ -74,36 +83,57 @@ type statusService struct {
nodesOnline bool
treeStatusBuf []treeStatus
updateIntervalSecs int
updateTimeout time.Duration
}
func NewStatusService(spaceId string, configuration nodeconf.Configuration, store storage.SpaceStorage) StatusService {
return &statusService{
spaceId: spaceId,
treeHeads: map[string]treeHeadsEntry{},
watchers: map[string]struct{}{},
configuration: configuration,
storage: store,
stateCounter: 0,
type SyncStatusDeps struct {
UpdateIntervalSecs int
UpdateTimeout time.Duration
Configuration nodeconf.Configuration
Storage storage.SpaceStorage
}
func DefaultDeps(configuration nodeconf.Configuration, store storage.SpaceStorage) SyncStatusDeps {
return SyncStatusDeps{
UpdateIntervalSecs: syncUpdateInterval,
UpdateTimeout: syncTimeout,
Configuration: configuration,
Storage: store,
}
}
func (s *statusService) SetUpdateReceiver(updater UpdateReceiver) {
func NewSyncStatusProvider(spaceId string, deps SyncStatusDeps) SyncStatusProvider {
return &syncStatusProvider{
spaceId: spaceId,
treeHeads: map[string]treeHeadsEntry{},
watchers: map[string]struct{}{},
updateIntervalSecs: deps.UpdateIntervalSecs,
updateTimeout: deps.UpdateTimeout,
configuration: deps.Configuration,
storage: deps.Storage,
stateCounter: 0,
}
}
func (s *syncStatusProvider) SetUpdateReceiver(updater UpdateReceiver) {
s.Lock()
defer s.Unlock()
s.updateReceiver = updater
}
func (s *statusService) Run() {
func (s *syncStatusProvider) Run() {
s.periodicSync = periodicsync.NewPeriodicSync(
statusServiceUpdateInterval,
statusServiceTimeout,
s.updateIntervalSecs,
s.updateTimeout,
s.update,
log)
s.periodicSync.Run()
}
func (s *statusService) HeadsChange(treeId string, heads []string) {
func (s *syncStatusProvider) HeadsChange(treeId string, heads []string) {
s.Lock()
defer s.Unlock()
@ -118,7 +148,7 @@ func (s *statusService) HeadsChange(treeId string, heads []string) {
s.stateCounter++
}
func (s *statusService) SetNodesOnline(senderId string, online bool) {
func (s *syncStatusProvider) SetNodesOnline(senderId string, online bool) {
if !s.isSenderResponsible(senderId) {
return
}
@ -129,7 +159,7 @@ func (s *statusService) SetNodesOnline(senderId string, online bool) {
s.nodesOnline = online
}
func (s *statusService) update(ctx context.Context) (err error) {
func (s *syncStatusProvider) update(ctx context.Context) (err error) {
s.treeStatusBuf = s.treeStatusBuf[:0]
s.Lock()
@ -158,7 +188,7 @@ func (s *statusService) update(ctx context.Context) (err error) {
return
}
func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
func (s *syncStatusProvider) HeadsReceive(senderId, treeId string, heads []string) {
s.Lock()
defer s.Unlock()
@ -187,7 +217,7 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
s.treeHeads[treeId] = curTreeHeads
}
func (s *statusService) Watch(treeId string) (err error) {
func (s *syncStatusProvider) Watch(treeId string) (err error) {
s.Lock()
defer s.Unlock()
_, ok := s.treeHeads[treeId]
@ -217,7 +247,7 @@ func (s *statusService) Watch(treeId string) (err error) {
return
}
func (s *statusService) Unwatch(treeId string) {
func (s *syncStatusProvider) Unwatch(treeId string) {
s.Lock()
defer s.Unlock()
@ -226,19 +256,19 @@ func (s *statusService) Unwatch(treeId string) {
}
}
func (s *statusService) Close() (err error) {
func (s *syncStatusProvider) Close() (err error) {
s.periodicSync.Close()
return
}
func (s *statusService) StateCounter() uint64 {
func (s *syncStatusProvider) StateCounter() uint64 {
s.Lock()
defer s.Unlock()
return s.stateCounter
}
func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
func (s *syncStatusProvider) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
// if sender is not a responsible node, then this should have no effect
if !s.isSenderResponsible(senderId) {
return
@ -261,6 +291,6 @@ func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []st
}
}
func (s *statusService) isSenderResponsible(senderId string) bool {
func (s *syncStatusProvider) isSenderResponsible(senderId string) bool {
return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId)
}

View File

@ -5,10 +5,10 @@ import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
@ -40,13 +40,13 @@ type SyncTree interface {
type syncTree struct {
tree.ObjectTree
synchandler.SyncHandler
syncClient SyncClient
statusService statusservice.StatusService
notifiable HeadNotifiable
listener updatelistener.UpdateListener
treeUsage *atomic.Int32
isClosed bool
isDeleted bool
syncClient SyncClient
syncStatus syncstatus.SyncStatusUpdater
notifiable HeadNotifiable
listener updatelistener.UpdateListener
treeUsage *atomic.Int32
isClosed bool
isDeleted bool
}
var log = logger.NewNamed("commonspace.synctree").Sugar()
@ -63,7 +63,7 @@ type CreateDeps struct {
SyncService syncservice.SyncService
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
StatusService statusservice.StatusService
SyncStatus syncstatus.SyncStatusUpdater
HeadNotifiable HeadNotifiable
}
@ -77,7 +77,7 @@ type BuildDeps struct {
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
TreeUsage *atomic.Int32
StatusService statusservice.StatusService
SyncStatus syncstatus.SyncStatusUpdater
}
func newWrappedSyncClient(
@ -106,7 +106,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
deps.HeadNotifiable.UpdateHeads(id, heads)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
deps.StatusService.HeadsChange(id, heads)
deps.SyncStatus.HeadsChange(id, heads)
syncClient.BroadcastAsync(headUpdate)
return
}
@ -127,7 +127,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
deps.HeadNotifiable.UpdateHeads(id, heads)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
deps.StatusService.HeadsChange(id, heads)
deps.SyncStatus.HeadsChange(id, heads)
syncClient.BroadcastAsync(headUpdate)
return
}
@ -219,14 +219,14 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
deps.SyncService,
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
treeUsage: deps.TreeUsage,
listener: deps.Listener,
statusService: deps.StatusService,
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
treeUsage: deps.TreeUsage,
listener: deps.Listener,
syncStatus: deps.SyncStatus,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.StatusService)
syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.SyncStatus)
syncTree.SyncHandler = syncHandler
t = syncTree
syncTree.Lock()
@ -266,7 +266,7 @@ func (s *syncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.ID(), res.Heads)
}
s.statusService.HeadsChange(s.ID(), res.Heads)
s.syncStatus.HeadsChange(s.ID(), res.Heads)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate)
return

View File

@ -2,9 +2,9 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage/mock_storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener/mock_updatelistener"
@ -72,7 +72,7 @@ func Test_DeriveSyncTree(t *testing.T) {
SpaceId: spaceId,
Payload: expectedPayload,
SpaceStorage: spaceStorageMock,
StatusService: statusservice.NewNoOpStatusService(),
SyncStatus: syncstatus.NewNoOpSyncStatus(),
HeadNotifiable: headNotifiableMock,
}
objTreeMock.EXPECT().ID().Return("id")
@ -111,7 +111,7 @@ func Test_CreateSyncTree(t *testing.T) {
SpaceId: spaceId,
Payload: expectedPayload,
SpaceStorage: spaceStorageMock,
StatusService: statusservice.NewNoOpStatusService(),
SyncStatus: syncstatus.NewNoOpSyncStatus(),
HeadNotifiable: headNotifiableMock,
}
@ -128,12 +128,12 @@ func Test_BuildSyncTree(t *testing.T) {
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
objTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl))
tr := &syncTree{
ObjectTree: objTreeMock,
SyncHandler: nil,
syncClient: syncClientMock,
listener: updateListenerMock,
isClosed: false,
statusService: statusservice.NewNoOpStatusService(),
ObjectTree: objTreeMock,
SyncHandler: nil,
syncClient: syncClientMock,
listener: updateListenerMock,
isClosed: false,
syncStatus: syncstatus.NewNoOpSyncStatus(),
}
headUpdate := &treechangeproto.TreeSyncMessage{}

View File

@ -3,8 +3,8 @@ package synctree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice"
@ -14,33 +14,32 @@ import (
)
type syncTreeHandler struct {
objTree tree.ObjectTree
syncClient SyncClient
statusService statusservice.StatusService
handlerLock sync.Mutex
queue ReceiveQueue
objTree tree.ObjectTree
syncClient SyncClient
syncStatus syncstatus.SyncStatusUpdater
handlerLock sync.Mutex
queue ReceiveQueue
}
const maxQueueSize = 5
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient, statusService statusservice.StatusService) synchandler.SyncHandler {
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.SyncStatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncClient: syncClient,
statusService: statusService,
queue: newReceiveQueue(maxQueueSize),
objTree: objTree,
syncClient: syncClient,
syncStatus: syncStatus,
queue: newReceiveQueue(maxQueueSize),
}
}
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
// TODO: when implementing sync status check msg heads before sending into queue
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
if err != nil {
return
}
s.statusService.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.ReplyId)
if queueFull {

View File

@ -3,7 +3,7 @@ package synctree
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/mock_synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree/mock_objecttree"
@ -50,10 +50,10 @@ func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture {
receiveQueueMock := mock_synctree.NewMockReceiveQueue(ctrl)
syncHandler := &syncTreeHandler{
objTree: objectTreeMock,
syncClient: syncClientMock,
queue: receiveQueueMock,
statusService: statusservice.NewNoOpStatusService(),
objTree: objectTreeMock,
syncClient: syncClientMock,
queue: receiveQueueMock,
syncStatus: syncstatus.NewNoOpSyncStatus(),
}
return &syncHandlerFixture{
ctrl: ctrl,