WIP further space refactoring

This commit is contained in:
mcrakhman 2023-06-01 14:07:16 +02:00
parent b0fa43fb14
commit eeb87dd144
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
17 changed files with 248 additions and 608 deletions

View File

@ -44,7 +44,7 @@ func (st *objectDeletionState) Name() (name string) {
return CName
}
func NewObjectDeletionState() ObjectDeletionState {
func New() ObjectDeletionState {
return &objectDeletionState{
log: log,
queued: map[string]struct{}{},

View File

@ -1,4 +1,4 @@
package settingsstate
package deletionstate
import (
"github.com/anyproto/any-sync/app/logger"
@ -19,7 +19,7 @@ type fixture struct {
func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
spaceStorage := mock_spacestorage.NewMockSpaceStorage(ctrl)
delState := NewObjectDeletionState(logger.NewNamed("test"), spaceStorage).(*objectDeletionState)
delState := New(logger.NewNamed("test"), spaceStorage).(*objectDeletionState)
return &fixture{
ctrl: ctrl,
delState: delState,

View File

@ -6,9 +6,9 @@ import (
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
@ -22,7 +22,7 @@ type DiffSyncer interface {
Sync(ctx context.Context) error
RemoveObjects(ids []string)
UpdateHeads(id string, heads []string)
Init(deletionState settingsstate.ObjectDeletionState)
Init()
Close() error
}
@ -37,6 +37,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
credentialProvider: hs.credentialProvider,
log: log,
syncStatus: hs.syncStatus,
deletionState: hs.deletionState,
}
}
@ -48,14 +49,13 @@ type diffSyncer struct {
storage spacestorage.SpaceStorage
clientFactory spacesyncproto.ClientFactory
log logger.CtxLogger
deletionState settingsstate.ObjectDeletionState
deletionState deletionstate.ObjectDeletionState
credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusUpdater
treeSyncer treemanager.TreeSyncer
}
func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) {
d.deletionState = deletionState
func (d *diffSyncer) Init() {
d.deletionState.AddObserver(d.RemoveObjects)
d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId, d.treeManager)
}

View File

@ -7,10 +7,9 @@ import (
"github.com/anyproto/any-sync/app/logger"
config2 "github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
@ -18,6 +17,7 @@ import (
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/periodicsync"
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"sync/atomic"
@ -34,11 +34,13 @@ type TreeHeads struct {
}
type HeadSync interface {
app.ComponentRunnable
ExternalIds() []string
DebugAllHeads() (res []TreeHeads)
AllIds() []string
UpdateHeads(id string, heads []string)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
RemoveObjects(ids []string)
AllIds() []string
DebugAllHeads() (res []TreeHeads)
}
type headSync struct {
@ -50,16 +52,16 @@ type headSync struct {
storage spacestorage.SpaceStorage
diff ldiff.Diff
log logger.CtxLogger
syncer headsync.DiffSyncer
syncer DiffSyncer
configuration nodeconf.NodeConf
peerManager peermanager.PeerManager
treeManager treemanager.TreeManager
credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusProvider
deletionState settingsstate.ObjectDeletionState
deletionState deletionstate.ObjectDeletionState
}
func New() *headSync {
func New() HeadSync {
return &headSync{}
}
@ -77,7 +79,7 @@ func (h *headSync) Init(a *app.App) (err error) {
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
h.deletionState = a.MustComponent("deletionstate").(settingsstate.ObjectDeletionState)
h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
h.syncer = newDiffSyncer(h)
sync := func(ctx context.Context) (err error) {
// for clients cancelling the sync process
@ -87,6 +89,8 @@ func (h *headSync) Init(a *app.App) (err error) {
return h.syncer.Sync(ctx)
}
h.periodicSync = periodicsync.NewPeriodicSync(h.syncPeriod, time.Minute, sync, h.log)
// TODO: move to run?
h.syncer.Init()
return nil
}
@ -126,6 +130,13 @@ func (h *headSync) AllIds() []string {
return h.diff.Ids()
}
func (h *headSync) ExternalIds() []string {
settingsId := h.storage.SpaceSettingsId()
return slice.DiscardFromSlice(h.AllIds(), func(id string) bool {
return id == settingsId
})
}
func (h *headSync) DebugAllHeads() (res []TreeHeads) {
els := h.diff.Elements()
for _, el := range els {

View File

@ -2,20 +2,17 @@ package syncacl
import (
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
)
type SyncAcl struct {
list.AclList
synchandler.SyncHandler
messagePool objectsync.MessagePool
}
func NewSyncAcl(aclList list.AclList, messagePool objectsync.MessagePool) *SyncAcl {
func NewSyncAcl(aclList list.AclList) *SyncAcl {
return &SyncAcl{
AclList: aclList,
SyncHandler: nil,
messagePool: messagePool,
}
}

View File

@ -96,7 +96,7 @@ type testSyncHandler struct {
// createSyncHandler creates a sync handler when a tree is already created
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler {
factory := syncclient.NewRequestFactory()
syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
@ -108,7 +108,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler {
factory := syncclient.NewRequestFactory()
syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory)
syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory)
batcher := mb.New[protocolMsg](0)
return &testSyncHandler{

View File

@ -29,6 +29,9 @@ const CName = "common.commonspace.objectsync"
var log = logger.NewNamed(CName)
type ObjectSync interface {
LastUsage() time.Time
HandleMessage(ctx context.Context, hm HandleMessage) (err error)
CloseThread(id string) (err error)
app.ComponentRunnable
}
@ -88,10 +91,15 @@ func (s *objectSync) Close(ctx context.Context) (err error) {
return s.handleQueue.Close()
}
func NewObjectSync() ObjectSync {
func New() ObjectSync {
return &objectSync{}
}
func (s *objectSync) LastUsage() time.Time {
// TODO: add time
return time.Time{}
}
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
threadId := hm.Message.ObjectId
hm.ReceiveTime = time.Now()
@ -169,3 +177,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
}
return
}
func (s *objectSync) CloseThread(id string) (err error) {
return s.handleQueue.CloseThread(id)
}

View File

@ -32,7 +32,7 @@ type syncClient struct {
streamSender streamsender.StreamSender
}
func NewSyncClient() SyncClient {
func New() SyncClient {
return &syncClient{}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestate"
@ -40,12 +41,19 @@ type HistoryTreeOpts struct {
}
type TreeBuilder interface {
app.Component
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
SetOnCloseHandler(handler func(id string))
}
type TreeBuilderComponent interface {
app.Component
TreeBuilder
}
func New() TreeBuilderComponent {
return &treeBuilder{}
}
type treeBuilder struct {
@ -55,6 +63,7 @@ type treeBuilder struct {
peerManager peermanager.PeerManager
spaceStorage spacestorage.SpaceStorage
syncStatus syncstatus.StatusUpdater
objectSync objectsync.ObjectSync
log logger.CtxLogger
builder objecttree.BuildObjectTreeFunc
@ -62,7 +71,6 @@ type treeBuilder struct {
aclList list.AclList
treesUsed *atomic.Int32
isClosed *atomic.Bool
onClose func(id string)
}
func (t *treeBuilder) Init(a *app.App) (err error) {
@ -78,8 +86,8 @@ func (t *treeBuilder) Init(a *app.App) (err error) {
t.spaceStorage = state.SpaceStorage
t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync)
t.log = log.With(zap.String("spaceId", t.spaceId))
t.onClose = state.Actions.OnObjectDelete
return nil
}
@ -87,10 +95,6 @@ func (t *treeBuilder) Name() (name string) {
return CName
}
func (t *treeBuilder) SetOnCloseHandler(handler func(id string)) {
t.onClose = handler
}
func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (ot objecttree.ObjectTree, err error) {
if t.isClosed.Load() {
// TODO: change to real error
@ -189,3 +193,9 @@ func (t *treeBuilder) PutTree(ctx context.Context, payload treestorage.TreeStora
t.log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", t.treesUsed.Load()))
return
}
func (t *treeBuilder) onClose(id string) {
t.treesUsed.Add(-1)
log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.String("spaceId", t.spaceId))
_ = t.objectSync.CloseThread(id)
}

View File

@ -17,6 +17,10 @@ type RequestSender interface {
QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() RequestSender {
return &requestSender{}
}
type requestSender struct {
}

View File

@ -9,52 +9,53 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"sync/atomic"
)
const CName = "common.commonspace.settings"
type Settings interface {
DeleteTree(ctx context.Context, id string) (err error)
SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error)
DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error)
app.ComponentRunnable
}
func New() Settings {
return &settings{}
}
type settings struct {
account accountservice.Service
treeManager treemanager.TreeManager
storage spacestorage.SpaceStorage
configuration nodeconf.NodeConf
deletionState deletionstate.ObjectDeletionState
headsync headsync.HeadSync
spaceActions spacestate.SpaceActions
treeBuilder objecttreebuilder.TreeBuilder
account accountservice.Service
treeManager treemanager.TreeManager
storage spacestorage.SpaceStorage
configuration nodeconf.NodeConf
deletionState deletionstate.ObjectDeletionState
headsync headsync.HeadSync
treeBuilder objecttreebuilder.TreeBuilderComponent
spaceIsDeleted *atomic.Bool
settingsObject SettingsObject
}
func (s *settings) Run(ctx context.Context) (err error) {
return s.settingsObject.Init(ctx)
}
func (s *settings) Close(ctx context.Context) (err error) {
return s.settingsObject.Close()
}
func (s *settings) Init(a *app.App) (err error) {
s.account = a.MustComponent(accountservice.CName).(accountservice.Service)
s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
s.headsync = a.MustComponent(headsync.CName).(headsync.HeadSync)
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
s.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilder)
s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.spaceActions = sharedState.Actions
s.storage = sharedState.SpaceStorage
s.spaceIsDeleted = sharedState.SpaceIsDeleted
deps := Deps{
BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) {
@ -77,7 +78,7 @@ func (s *settings) Init(a *app.App) (err error) {
Configuration: s.configuration,
DeletionState: s.deletionState,
Provider: s.headsync,
OnSpaceDelete: s.spaceActions.OnSpaceDelete,
OnSpaceDelete: s.onSpaceDelete,
}
s.settingsObject = NewSettingsObject(deps, sharedState.SpaceId)
return nil
@ -86,3 +87,31 @@ func (s *settings) Init(a *app.App) (err error) {
func (s *settings) Name() (name string) {
return CName
}
func (s *settings) Run(ctx context.Context) (err error) {
return s.settingsObject.Init(ctx)
}
func (s *settings) Close(ctx context.Context) (err error) {
return s.settingsObject.Close()
}
func (s *settings) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsObject.DeleteObject(id)
}
func (s *settings) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) {
return s.settingsObject.SpaceDeleteRawChange()
}
func (s *settings) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) {
return s.settingsObject.DeleteSpace(ctx, deleteChange)
}
func (s *settings) onSpaceDelete() {
err := s.storage.SetSpaceDeleted()
if err != nil {
log.Warn("failed to set space deleted")
}
s.spaceIsDeleted.Swap(true)
}

View File

@ -1,135 +0,0 @@
//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory
package settingsstate
import (
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"go.uber.org/zap"
"sync"
)
type StateUpdateObserver func(ids []string)
type ObjectDeletionState interface {
AddObserver(observer StateUpdateObserver)
Add(ids map[string]struct{})
GetQueued() (ids []string)
Delete(id string) (err error)
Exists(id string) bool
Filter(ids []string) (filtered []string)
}
type objectDeletionState struct {
sync.RWMutex
log logger.CtxLogger
queued map[string]struct{}
deleted map[string]struct{}
stateUpdateObservers []StateUpdateObserver
storage spacestorage.SpaceStorage
}
func NewObjectDeletionState(log logger.CtxLogger, storage spacestorage.SpaceStorage) ObjectDeletionState {
return &objectDeletionState{
log: log,
queued: map[string]struct{}{},
deleted: map[string]struct{}{},
storage: storage,
}
}
func (st *objectDeletionState) AddObserver(observer StateUpdateObserver) {
st.Lock()
defer st.Unlock()
st.stateUpdateObservers = append(st.stateUpdateObservers, observer)
}
func (st *objectDeletionState) Add(ids map[string]struct{}) {
var added []string
st.Lock()
defer func() {
st.Unlock()
for _, ob := range st.stateUpdateObservers {
ob(added)
}
}()
for id := range ids {
if _, exists := st.deleted[id]; exists {
continue
}
if _, exists := st.queued[id]; exists {
continue
}
var status string
status, err := st.storage.TreeDeletedStatus(id)
if err != nil {
st.log.Warn("failed to get deleted status", zap.String("treeId", id), zap.Error(err))
continue
}
switch status {
case spacestorage.TreeDeletedStatusQueued:
st.queued[id] = struct{}{}
case spacestorage.TreeDeletedStatusDeleted:
st.deleted[id] = struct{}{}
default:
err := st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusQueued)
if err != nil {
st.log.Warn("failed to set deleted status", zap.String("treeId", id), zap.Error(err))
continue
}
st.queued[id] = struct{}{}
}
added = append(added, id)
}
}
func (st *objectDeletionState) GetQueued() (ids []string) {
st.RLock()
defer st.RUnlock()
ids = make([]string, 0, len(st.queued))
for id := range st.queued {
ids = append(ids, id)
}
return
}
func (st *objectDeletionState) Delete(id string) (err error) {
st.Lock()
defer st.Unlock()
delete(st.queued, id)
st.deleted[id] = struct{}{}
err = st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusDeleted)
if err != nil {
return
}
return
}
func (st *objectDeletionState) Exists(id string) bool {
st.RLock()
defer st.RUnlock()
return st.exists(id)
}
func (st *objectDeletionState) Filter(ids []string) (filtered []string) {
st.RLock()
defer st.RUnlock()
for _, id := range ids {
if !st.exists(id) {
filtered = append(filtered, id)
}
}
return
}
func (st *objectDeletionState) exists(id string) bool {
if _, exists := st.deleted[id]; exists {
return true
}
if _, exists := st.queued[id]; exists {
return true
}
return false
}

View File

@ -1,4 +1,4 @@
//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory
//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate StateBuilder,ChangeFactory
package settingsstate
import "github.com/anyproto/any-sync/commonspace/spacesyncproto"

View File

@ -3,36 +3,21 @@ package commonspace
import (
"context"
"errors"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/crypto"
"github.com/anyproto/any-sync/util/multiqueue"
"github.com/anyproto/any-sync/util/slice"
"github.com/cheggaaa/mb/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -55,25 +40,6 @@ type SpaceCreatePayload struct {
MasterKey crypto.PrivKey
}
type HandleMessage struct {
Id uint64
ReceiveTime time.Time
StartHandlingTime time.Time
Deadline time.Time
SenderId string
Message *spacesyncproto.ObjectSyncMessage
PeerCtx context.Context
}
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
return append(fields,
metric.SpaceId(m.Message.SpaceId),
metric.ObjectId(m.Message.ObjectId),
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
metric.TotalDur(time.Since(m.ReceiveTime)),
)
}
type SpaceDerivePayload struct {
SigningKey crypto.PrivKey
MasterKey crypto.PrivKey
@ -97,145 +63,72 @@ type Space interface {
Id() string
Init(ctx context.Context) error
StoredIds() []string
DebugAllHeads() []headsync.TreeHeads
Description() (SpaceDescription, error)
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
DeleteTree(ctx context.Context, id string) (err error)
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error)
DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error)
HeadSync() headsync.HeadSync
ObjectSync() objectsync.ObjectSync
TreeBuilder() objecttreebuilder.TreeBuilder
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage
HandleMessage(ctx context.Context, msg HandleMessage) (err error)
DeleteTree(ctx context.Context, id string) (err error)
SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error)
DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error)
HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error)
TryClose(objectTTL time.Duration) (close bool, err error)
Close() error
}
type space struct {
id string
mu sync.RWMutex
header *spacesyncproto.RawSpaceHeaderWithId
objectSync objectsync.ObjectSync
headSync headsync.HeadSync
syncStatus syncstatus.StatusUpdater
storage spacestorage.SpaceStorage
treeManager *objectManager
account accountservice.Service
aclList *syncacl.SyncAcl
configuration nodeconf.NodeConf
settingsObject settings.SettingsObject
peerManager peermanager.PeerManager
treeBuilder objecttree.BuildObjectTreeFunc
metric metric.Metric
state *spacestate.SpaceState
app *app.App
handleQueue multiqueue.MultiQueue[HandleMessage]
treeBuilder objecttreebuilder.TreeBuilderComponent
headSync headsync.HeadSync
objectSync objectsync.ObjectSync
syncStatus syncstatus.StatusProvider
settings settings.Settings
}
isClosed *atomic.Bool
isDeleted *atomic.Bool
treesUsed *atomic.Int32
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settings.DeleteTree(ctx, id)
}
func (s *space) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) {
return s.settings.SpaceDeleteRawChange(ctx)
}
func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) {
return s.settings.DeleteSpace(ctx, deleteChange)
}
func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error) {
return s.objectSync.HandleMessage(ctx, msg)
}
func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
return s.treeBuilder
}
func (s *space) Id() string {
return s.id
}
func (s *space) Description() (desc SpaceDescription, err error) {
root := s.aclList.Root()
settingsStorage, err := s.storage.TreeStorage(s.storage.SpaceSettingsId())
if err != nil {
return
}
settingsRoot, err := settingsStorage.Root()
if err != nil {
return
}
desc = SpaceDescription{
SpaceHeader: s.header,
AclId: root.Id,
AclPayload: root.Payload,
SpaceSettingsId: settingsRoot.Id,
SpaceSettingsPayload: settingsRoot.RawChange,
}
return
return s.state.SpaceId
}
func (s *space) Init(ctx context.Context) (err error) {
log.With(zap.String("spaceId", s.id)).Debug("initializing space")
s.storage = newCommonStorage(s.storage)
header, err := s.storage.SpaceHeader()
err = s.app.Start(ctx)
if err != nil {
return
}
s.header = header
initialIds, err := s.storage.StoredIds()
if err != nil {
return
}
aclStorage, err := s.storage.AclStorage()
if err != nil {
return
}
aclList, err := list.BuildAclListWithIdentity(s.account.Account(), aclStorage)
if err != nil {
return
}
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.SyncClient().MessagePool())
s.treeManager.AddObject(s.aclList)
deletionState := settingsstate.NewObjectDeletionState(log, s.storage)
deps := settings.Deps{
BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) {
res, err := s.BuildTree(ctx, id, BuildTreeOpts{
Listener: listener,
WaitTreeRemoteSync: false,
// space settings document should not have empty data
treeBuilder: objecttree.BuildObjectTree,
})
log.Debug("building settings tree", zap.String("id", id), zap.String("spaceId", s.id))
if err != nil {
return
}
t = res.(synctree.SyncTree)
return
},
Account: s.account,
TreeManager: s.treeManager,
Store: s.storage,
DeletionState: deletionState,
Provider: s.headSync,
Configuration: s.configuration,
OnSpaceDelete: s.onSpaceDelete,
}
s.settingsObject = settings.NewSettingsObject(deps, s.id)
s.headSync.Init(initialIds, deletionState)
err = s.settingsObject.Init(ctx)
if err != nil {
return
}
s.treeManager.AddObject(s.settingsObject)
s.syncStatus.Run()
s.handleQueue = multiqueue.New[HandleMessage](s.handleMessage, 100)
s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync)
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
return nil
}
func (s *space) ObjectSync() objectsync.ObjectSync {
return s.objectSync
}
func (s *space) HeadSync() headsync.HeadSync {
func (s *space) HeadSync() headsync.HeadSyncExternal {
return s.headSync
}
@ -244,249 +137,28 @@ func (s *space) SyncStatus() syncstatus.StatusUpdater {
}
func (s *space) Storage() spacestorage.SpaceStorage {
return s.storage
}
func (s *space) StoredIds() []string {
return slice.DiscardFromSlice(s.headSync.AllIds(), func(id string) bool {
return id == s.settingsObject.Id()
})
}
func (s *space) DebugAllHeads() []headsync.TreeHeads {
return s.headSync.DebugAllHeads()
}
func (s *space) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
root, err := objecttree.CreateObjectTreeRoot(payload, s.aclList)
if err != nil {
return
}
res = treestorage.TreeStorageCreatePayload{
RootRawChange: root,
Changes: []*treechangeproto.RawTreeChangeWithId{root},
Heads: []string{root.Id},
}
return
}
func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
deps := synctree.BuildDeps{
SpaceId: s.id,
SyncClient: s.objectSync.SyncClient(),
Configuration: s.configuration,
HeadNotifiable: s.headSync,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
OnClose: s.onObjectClose,
SyncStatus: s.syncStatus,
PeerGetter: s.peerManager,
BuildObjectTree: s.treeBuilder,
}
t, err = synctree.PutSyncTree(ctx, payload, deps)
if err != nil {
return
}
s.treesUsed.Add(1)
log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id))
return
}
type BuildTreeOpts struct {
Listener updatelistener.UpdateListener
WaitTreeRemoteSync bool
treeBuilder objecttree.BuildObjectTreeFunc
}
type HistoryTreeOpts struct {
BeforeId string
Include bool
BuildFullTree bool
}
func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
treeBuilder := opts.treeBuilder
if treeBuilder == nil {
treeBuilder = s.treeBuilder
}
deps := synctree.BuildDeps{
SpaceId: s.id,
SyncClient: s.objectSync.SyncClient(),
Configuration: s.configuration,
HeadNotifiable: s.headSync,
Listener: opts.Listener,
AclList: s.aclList,
SpaceStorage: s.storage,
OnClose: s.onObjectClose,
SyncStatus: s.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
PeerGetter: s.peerManager,
BuildObjectTree: treeBuilder,
}
s.treesUsed.Add(1)
log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id))
if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
s.treesUsed.Add(-1)
log.Debug("decrementing counter, load failed", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id), zap.Error(err))
return nil, err
}
return
}
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
params := objecttree.HistoryTreeParams{
AclList: s.aclList,
BeforeId: opts.BeforeId,
IncludeBeforeId: opts.Include,
BuildFullTree: opts.BuildFullTree,
}
params.TreeStorage, err = s.storage.TreeStorage(id)
if err != nil {
return
}
return objecttree.BuildHistoryTree(params)
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsObject.DeleteObject(id)
}
func (s *space) SpaceDeleteRawChange(ctx context.Context) (raw *treechangeproto.RawTreeChangeWithId, err error) {
return s.settingsObject.SpaceDeleteRawChange()
}
func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) {
return s.settingsObject.DeleteSpace(ctx, deleteChange)
}
func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
threadId := hm.Message.ObjectId
hm.ReceiveTime = time.Now()
if hm.Message.ReplyId != "" {
threadId += hm.Message.ReplyId
defer func() {
_ = s.handleQueue.CloseThread(threadId)
}()
}
if hm.PeerCtx == nil {
hm.PeerCtx = ctx
}
err = s.handleQueue.Add(ctx, threadId, hm)
if err == mb.ErrOverflowed {
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))
// skip overflowed error
return nil
}
return
}
func (s *space) handleMessage(msg HandleMessage) {
var err error
msg.StartHandlingTime = time.Now()
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
defer func() {
if s.metric == nil {
return
}
s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
zap.Error(err),
)...)
}()
if !msg.Deadline.IsZero() {
now := time.Now()
if now.After(msg.Deadline) {
log.InfoCtx(ctx, "skip message: deadline exceed")
err = context.DeadlineExceeded
return
}
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, msg.Deadline)
defer cancel()
}
if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
if msg.Message.ObjectId != "" {
// cleanup thread on error
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
}
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
}
}
func (s *space) onObjectClose(id string) {
s.treesUsed.Add(-1)
log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", s.treesUsed.Load()), zap.String("spaceId", s.id))
_ = s.handleQueue.CloseThread(id)
}
func (s *space) onSpaceDelete() {
err := s.storage.SetSpaceDeleted()
if err != nil {
log.Debug("failed to set space deleted")
}
s.isDeleted.Swap(true)
return s.state.SpaceStorage
}
func (s *space) Close() error {
if s.isClosed.Swap(true) {
log.Warn("call space.Close on closed space", zap.String("id", s.id))
if s.state.SpaceIsClosed.Swap(true) {
log.Warn("call space.Close on closed space", zap.String("id", s.state.SpaceId))
return nil
}
log.With(zap.String("id", s.id)).Debug("space is closing")
log := log.With(zap.String("spaceId", s.state.SpaceId))
log.Debug("space is closing")
var mError errs.Group
if err := s.handleQueue.Close(); err != nil {
mError.Add(err)
}
if err := s.headSync.Close(); err != nil {
mError.Add(err)
}
if err := s.objectSync.Close(); err != nil {
mError.Add(err)
}
if err := s.settingsObject.Close(); err != nil {
mError.Add(err)
}
if err := s.aclList.Close(); err != nil {
mError.Add(err)
}
if err := s.storage.Close(); err != nil {
mError.Add(err)
}
if err := s.syncStatus.Close(); err != nil {
mError.Add(err)
}
log.With(zap.String("id", s.id)).Debug("space closed")
return mError.Err()
err := s.app.Close(context.Background())
log.Debug("space closed")
return err
}
func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) {
if time.Now().Sub(s.objectSync.LastUsage()) < objectTTL {
return false, nil
}
locked := s.treesUsed.Load() > 1
log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check")
locked := s.state.TreesUsed.Load() > 1
log.With(zap.Int32("trees used", s.state.TreesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.state.SpaceId)).Debug("space lock status check")
if locked {
return false, nil
}

View File

@ -7,15 +7,24 @@ import (
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestsender"
"github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/streamsender"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
@ -73,6 +82,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
}
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.metric, _ = a.Component(metric.CName).(metric.Metric)
s.app = a
return nil
}
@ -140,8 +150,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
}
}
}
lastConfiguration := s.configurationService
var (
spaceIsClosed = &atomic.Bool{}
spaceIsDeleted = &atomic.Bool{}
@ -151,42 +159,57 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
return nil, err
}
spaceIsDeleted.Swap(isDeleted)
getter := NewObjectManager(st.Id(), s.treeManager, spaceIsClosed)
syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) {
// TODO: move it to the client package and add possibility to inject StatusProvider from the client
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
}
var builder objecttree.BuildObjectTreeFunc
if s.config.KeepTreeDataInMemory {
builder = objecttree.BuildObjectTree
} else {
builder = objecttree.BuildEmptyDataObjectTree
}
peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id)
aclStorage, err := st.AclStorage()
if err != nil {
return nil, err
}
aclList, err := list.BuildAclListWithIdentity(s.account.Account(), aclStorage)
if err != nil {
return nil, err
}
aclList = syncacl.NewSyncAcl(aclList)
state := &spacestate.SpaceState{
SpaceId: st.Id(),
SpaceIsDeleted: spaceIsDeleted,
SpaceIsClosed: spaceIsClosed,
TreesUsed: &atomic.Int32{},
AclList: aclList,
SpaceStorage: st,
}
if s.config.KeepTreeDataInMemory {
state.TreeBuilderFunc = objecttree.BuildObjectTree
} else {
state.TreeBuilderFunc = objecttree.BuildEmptyDataObjectTree
}
var syncStatus syncstatus.StatusProvider
if !s.configurationService.IsResponsible(st.Id()) {
// TODO: move it to the client package and add possibility to inject StatusProvider from the client
syncStatus = syncstatus.NewSyncStatusProvider()
} else {
syncStatus = syncstatus.NewNoOpSyncStatus()
}
//lastConfiguration := s.configurationService
//
//peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id)
//if err != nil {
// return nil, err
//}
spaceApp := s.app.ChildApp()
spaceApp.Register(state).
Register(syncStatus).
Register(NewObjectManager(s.treeManager)).
Register(streamsender.New()).
Register(requestsender.New()).
Register(deletionstate.New()).
Register(settings.New()).
Register(syncclient.New()).
Register(objecttreebuilder.New()).
Register(objectsync.New()).
Register(headsync.New())
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, s.credentialProvider, log)
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter, st)
sp := &space{
id: id,
objectSync: objectSync,
headSync: headSync,
syncStatus: syncStatus,
treeManager: getter,
account: s.account,
configuration: lastConfiguration,
peerManager: peerManager,
storage: st,
treesUsed: &atomic.Int32{},
treeBuilder: builder,
isClosed: spaceIsClosed,
isDeleted: spaceIsDeleted,
metric: s.metric,
state: state,
app: spaceApp,
}
return sp, nil
}

View File

@ -10,11 +10,6 @@ import (
const CName = "common.commonspace.shareddata"
type SpaceActions interface {
OnObjectDelete(id string)
OnSpaceDelete()
}
type SpaceState struct {
SpaceId string
SpaceIsDeleted *atomic.Bool
@ -23,7 +18,6 @@ type SpaceState struct {
AclList list.AclList
SpaceStorage spacestorage.SpaceStorage
TreeBuilderFunc objecttree.BuildObjectTreeFunc
Actions SpaceActions
}
func (s *SpaceState) Init(a *app.App) (err error) {

View File

@ -8,7 +8,30 @@ import (
const CName = "common.commonspace.streamsender"
type StreamSender interface {
app.ComponentRunnable
app.Component
SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() StreamSender {
return &streamSender{}
}
type streamSender struct {
}
func (s *streamSender) Init(a *app.App) (err error) {
return
}
func (s *streamSender) Name() (name string) {
return CName
}
func (s *streamSender) SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}
func (s *streamSender) Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}