This commit is contained in:
Sergey Cherepanov 2023-01-19 15:25:34 +03:00
commit 7fdd4f215e
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
17 changed files with 375 additions and 50 deletions

View File

@ -2,38 +2,51 @@ package commonspace
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/commonspace/object/acl/syncacl"
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/settings" "golang.org/x/exp/slices"
) )
type commonSpaceGetter struct { type commonGetter struct {
spaceId string treegetter.TreeGetter
aclList *syncacl.SyncAcl spaceId string
treeGetter treegetter.TreeGetter reservedObjects []syncobjectgetter.SyncObject
settings settings.SettingsObject
} }
func newCommonSpaceGetter(spaceId string, aclList *syncacl.SyncAcl, treeGetter treegetter.TreeGetter, settings settings.SettingsObject) syncobjectgetter.SyncObjectGetter { func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter {
return &commonSpaceGetter{ return &commonGetter{
TreeGetter: getter,
spaceId: spaceId, spaceId: spaceId,
aclList: aclList,
treeGetter: treeGetter,
settings: settings,
} }
} }
func (c *commonSpaceGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
if c.aclList.Id() == objectId { c.reservedObjects = append(c.reservedObjects, object)
obj = c.aclList }
return
func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
if obj := c.getReservedObject(treeId); obj != nil {
return obj.(objecttree.ObjectTree), nil
} }
if c.settings.Id() == objectId { return c.TreeGetter.GetTree(ctx, spaceId, treeId)
obj = c.settings.(syncobjectgetter.SyncObject) }
return
func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject {
pos := slices.IndexFunc(c.reservedObjects, func(object syncobjectgetter.SyncObject) bool {
return object.Id() == id
})
if pos == -1 {
return nil
} }
t, err := c.treeGetter.GetTree(ctx, c.spaceId, objectId) return c.reservedObjects[pos]
}
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {
if obj := c.getReservedObject(objectId); obj != nil {
return obj, nil
}
t, err := c.TreeGetter.GetTree(ctx, c.spaceId, objectId)
if err != nil { if err != nil {
return return
} }

View File

@ -6,6 +6,7 @@ import (
) )
type SyncObject interface { type SyncObject interface {
Id() string
synchandler.SyncHandler synchandler.SyncHandler
} }

View File

@ -53,8 +53,10 @@ func NewChangeFromRoot(id string, ch *treechangeproto.RootChange, signature []by
AclHeadId: ch.AclHeadId, AclHeadId: ch.AclHeadId,
Id: id, Id: id,
IsSnapshot: true, IsSnapshot: true,
Timestamp: ch.Timestamp,
Identity: string(ch.Identity), Identity: string(ch.Identity),
Signature: signature, Signature: signature,
Data: []byte(ch.ChangeType),
} }
} }

View File

@ -151,7 +151,7 @@ func (c *changeBuilder) BuildContent(payload BuilderContent) (ch *Change, rawIdC
AclHeadId: payload.AclHeadId, AclHeadId: payload.AclHeadId,
SnapshotBaseId: payload.SnapshotBaseId, SnapshotBaseId: payload.SnapshotBaseId,
CurrentReadKeyHash: payload.CurrentReadKeyHash, CurrentReadKeyHash: payload.CurrentReadKeyHash,
Timestamp: int64(time.Now().Nanosecond()), Timestamp: time.Now().UnixNano(),
Identity: payload.Identity, Identity: payload.Identity,
IsSnapshot: payload.IsSnapshot, IsSnapshot: payload.IsSnapshot,
} }

View File

@ -0,0 +1,52 @@
package objecttree
import (
"errors"
)
var ErrLoadBeforeRoot = errors.New("can't load before root")
type HistoryTree interface {
ReadableObjectTree
}
type historyTree struct {
*objectTree
}
func (h *historyTree) rebuildFromStorage(beforeId string, include bool) (err error) {
ot := h.objectTree
ot.treeBuilder.Reset()
if beforeId == ot.Id() && !include {
return ErrLoadBeforeRoot
}
heads := []string{beforeId}
if beforeId == "" {
heads, err = ot.treeStorage.Heads()
if err != nil {
return
}
} else if !include {
beforeChange, err := ot.treeBuilder.loadChange(beforeId)
if err != nil {
return err
}
heads = beforeChange.PreviousIds
}
ot.tree, err = ot.treeBuilder.build(heads, nil, nil)
if err != nil {
return
}
ot.aclList.RLock()
defer ot.aclList.RUnlock()
state := ot.aclList.AclState()
if len(ot.keys) != len(state.UserReadKeys()) {
for key, value := range state.UserReadKeys() {
ot.keys[key] = value
}
}
return
}

View File

@ -125,6 +125,21 @@ func (mr *MockObjectTreeMockRecorder) Delete() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockObjectTree)(nil).Delete)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockObjectTree)(nil).Delete))
} }
// GetChange mocks base method.
func (m *MockObjectTree) GetChange(arg0 string) (*objecttree.Change, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetChange", arg0)
ret0, _ := ret[0].(*objecttree.Change)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetChange indicates an expected call of GetChange.
func (mr *MockObjectTreeMockRecorder) GetChange(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChange", reflect.TypeOf((*MockObjectTree)(nil).GetChange), arg0)
}
// HasChanges mocks base method. // HasChanges mocks base method.
func (m *MockObjectTree) HasChanges(arg0 ...string) bool { func (m *MockObjectTree) HasChanges(arg0 ...string) bool {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -5,7 +5,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
list2 "github.com/anytypeio/any-sync/commonspace/object/acl/list" list "github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/keychain" "github.com/anytypeio/any-sync/commonspace/object/keychain"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
@ -23,6 +23,7 @@ type RWLocker interface {
var ( var (
ErrHasInvalidChanges = errors.New("the change is invalid") ErrHasInvalidChanges = errors.New("the change is invalid")
ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
ErrNoChangeInTree = errors.New("no such change in tree")
) )
type AddResultSummary int type AddResultSummary int
@ -43,7 +44,7 @@ type RawChangesPayload struct {
type ChangeIterateFunc = func(change *Change) bool type ChangeIterateFunc = func(change *Change) bool
type ChangeConvertFunc = func(decrypted []byte) (any, error) type ChangeConvertFunc = func(decrypted []byte) (any, error)
type ObjectTree interface { type ReadableObjectTree interface {
RWLocker RWLocker
Id() string Id() string
@ -51,11 +52,17 @@ type ObjectTree interface {
UnmarshalledHeader() *Change UnmarshalledHeader() *Change
Heads() []string Heads() []string
Root() *Change Root() *Change
HasChanges(...string) bool
DebugDump(parser DescriptionParser) (string, error)
HasChanges(...string) bool
GetChange(string) (*Change, error)
DebugDump(parser DescriptionParser) (string, error)
IterateRoot(convert ChangeConvertFunc, iterate ChangeIterateFunc) error IterateRoot(convert ChangeConvertFunc, iterate ChangeIterateFunc) error
IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
}
type ObjectTree interface {
ReadableObjectTree
SnapshotPath() []string SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*treechangeproto.RawTreeChangeWithId, error) ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*treechangeproto.RawTreeChangeWithId, error)
@ -75,7 +82,7 @@ type objectTree struct {
validator ObjectTreeValidator validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader rawChangeLoader *rawChangeLoader
treeBuilder *treeBuilder treeBuilder *treeBuilder
aclList list2.AclList aclList list.AclList
id string id string
rawRoot *treechangeproto.RawTreeChangeWithId rawRoot *treechangeproto.RawTreeChangeWithId
@ -101,13 +108,13 @@ type objectTreeDeps struct {
treeStorage treestorage.TreeStorage treeStorage treestorage.TreeStorage
validator ObjectTreeValidator validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader rawChangeLoader *rawChangeLoader
aclList list2.AclList aclList list.AclList
} }
func defaultObjectTreeDeps( func defaultObjectTreeDeps(
rootChange *treechangeproto.RawTreeChangeWithId, rootChange *treechangeproto.RawTreeChangeWithId,
treeStorage treestorage.TreeStorage, treeStorage treestorage.TreeStorage,
aclList list2.AclList) objectTreeDeps { aclList list.AclList) objectTreeDeps {
keychain := keychain.NewKeychain() keychain := keychain.NewKeychain()
changeBuilder := NewChangeBuilder(keychain, rootChange) changeBuilder := NewChangeBuilder(keychain, rootChange)
@ -155,6 +162,13 @@ func (ot *objectTree) Storage() treestorage.TreeStorage {
return ot.treeStorage return ot.treeStorage
} }
func (ot *objectTree) GetChange(id string) (*Change, error) {
if ch, ok := ot.tree.attached[id]; ok {
return ch, nil
}
return nil, ErrNoChangeInTree
}
func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) { func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) {
payload, err := ot.prepareBuilderContent(content) payload, err := ot.prepareBuilderContent(content)
if err != nil { if err != nil {
@ -208,7 +222,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt
canWrite := state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Writer) || canWrite := state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Writer) ||
state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Admin) state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Admin)
if !canWrite { if !canWrite {
err = list2.ErrInsufficientPermissions err = list.ErrInsufficientPermissions
return return
} }
@ -471,7 +485,7 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate
} }
readKey, exists := ot.keys[c.ReadKeyHash] readKey, exists := ot.keys[c.ReadKeyHash]
if !exists { if !exists {
err = list2.ErrNoReadKey err = list.ErrNoReadKey
return return
} }

View File

@ -111,6 +111,24 @@ func prepareAclList(t *testing.T) list.AclList {
return aclList return aclList
} }
func prepareTreeDeps(aclList list.AclList) (*mockChangeCreator, objectTreeDeps) {
changeCreator := &mockChangeCreator{}
treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id)
root, _ := treeStorage.Root()
changeBuilder := &mockChangeBuilder{
originalBuilder: NewChangeBuilder(nil, root),
}
deps := objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(treeStorage, changeBuilder),
treeStorage: treeStorage,
rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder),
validator: &mockChangeValidator{},
aclList: aclList,
}
return changeCreator, deps
}
func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext { func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext {
changeCreator := &mockChangeCreator{} changeCreator := &mockChangeCreator{}
treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id) treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id)
@ -542,4 +560,87 @@ func TestObjectTree(t *testing.T) {
assert.Equal(t, ch, raw, "the changes in the storage should be the same") assert.Equal(t, ch, raw, "the changes in the storage should be the same")
} }
}) })
t.Run("test history tree not include", func(t *testing.T) {
changeCreator, deps := prepareTreeDeps(aclList)
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"),
changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"),
changeCreator.createRaw("4", aclList.Head().Id, "0", false, "2"),
changeCreator.createRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.createRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.TransactionAdd(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: false,
})
require.NoError(t, err)
// check tree heads
assert.Equal(t, []string{"3", "4", "5"}, hTree.Heads())
// check tree iterate
var iterChangesId []string
err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool {
iterChangesId = append(iterChangesId, change.Id)
return true
})
require.NoError(t, err, "iterate should be without error")
assert.Equal(t, []string{"0", "1", "2", "3", "4", "5"}, iterChangesId)
assert.Equal(t, "0", hTree.Root().Id)
})
t.Run("test history tree include", func(t *testing.T) {
changeCreator, deps := prepareTreeDeps(aclList)
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"),
changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"),
changeCreator.createRaw("4", aclList.Head().Id, "0", false, "2"),
changeCreator.createRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.createRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.TransactionAdd(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: true,
})
require.NoError(t, err)
// check tree heads
assert.Equal(t, []string{"6"}, hTree.Heads())
// check tree iterate
var iterChangesId []string
err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool {
iterChangesId = append(iterChangesId, change.Id)
return true
})
require.NoError(t, err, "iterate should be without error")
assert.Equal(t, []string{"0", "1", "2", "3", "4", "5", "6"}, iterChangesId)
assert.Equal(t, "0", hTree.Root().Id)
})
t.Run("test history tree root", func(t *testing.T) {
_, deps := prepareTreeDeps(aclList)
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "0",
IncludeBeforeId: true,
})
require.NoError(t, err)
// check tree heads
assert.Equal(t, []string{"0"}, hTree.Heads())
// check tree iterate
var iterChangesId []string
err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool {
iterChangesId = append(iterChangesId, change.Id)
return true
})
require.NoError(t, err, "iterate should be without error")
assert.Equal(t, []string{"0"}, iterChangesId)
assert.Equal(t, "0", hTree.Root().Id)
})
} }

View File

@ -41,16 +41,20 @@ func (tb *treeBuilder) Reset() {
} }
func (tb *treeBuilder) Build(theirHeads []string, newChanges []*Change) (*Tree, error) { func (tb *treeBuilder) Build(theirHeads []string, newChanges []*Change) (*Tree, error) {
var proposedHeads []string
tb.cache = make(map[string]*Change)
heads, err := tb.treeStorage.Heads() heads, err := tb.treeStorage.Heads()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return tb.build(heads, theirHeads, newChanges)
}
func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []*Change) (*Tree, error) {
var proposedHeads []string
tb.cache = make(map[string]*Change)
// TODO: we can actually get this from tree (though not sure, that there would always be // TODO: we can actually get this from tree (though not sure, that there would always be
// an invariant where the tree has the closest common snapshot of heads) // an invariant where the tree has the closest common snapshot of heads)
// so if optimization is critical we can change this to inject from tree directly // so if optimization is critical we can change this to inject from tree directly,
// but then we have to be sure that invariant stays true // but then we have to be sure that invariant stays true
oldBreakpoint, err := tb.findBreakpoint(heads, true) oldBreakpoint, err := tb.findBreakpoint(heads, true)
if err != nil { if err != nil {

View File

@ -41,6 +41,15 @@ func BuildObjectTree(treeStorage treestorage.TreeStorage, aclList list.AclList)
return buildObjectTree(deps) return buildObjectTree(deps)
} }
func BuildHistoryTree(params HistoryTreeParams) (HistoryTree, error) {
rootChange, err := params.TreeStorage.Root()
if err != nil {
return nil, err
}
deps := defaultObjectTreeDeps(rootChange, params.TreeStorage, params.AclList)
return buildHistoryTree(deps, params)
}
func CreateDerivedObjectTree( func CreateDerivedObjectTree(
payload ObjectTreeCreatePayload, payload ObjectTreeCreatePayload,
aclList list.AclList, aclList list.AclList,
@ -118,7 +127,6 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
aclList: deps.aclList, aclList: deps.aclList,
changeBuilder: deps.changeBuilder, changeBuilder: deps.changeBuilder,
rawChangeLoader: deps.rawChangeLoader, rawChangeLoader: deps.rawChangeLoader,
tree: nil,
keys: make(map[uint64]*symmetric.Key), keys: make(map[uint64]*symmetric.Key),
newChangesBuf: make([]*Change, 0, 10), newChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
@ -146,3 +154,44 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
return objTree, nil return objTree, nil
} }
type HistoryTreeParams struct {
TreeStorage treestorage.TreeStorage
AclList list.AclList
BeforeId string
IncludeBeforeId bool
}
func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht HistoryTree, err error) {
objTree := &objectTree{
treeStorage: deps.treeStorage,
treeBuilder: deps.treeBuilder,
validator: deps.validator,
aclList: deps.aclList,
changeBuilder: deps.changeBuilder,
rawChangeLoader: deps.rawChangeLoader,
keys: make(map[uint64]*symmetric.Key),
newChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
}
hTree := &historyTree{objectTree: objTree}
err = hTree.rebuildFromStorage(params.BeforeId, params.IncludeBeforeId)
if err != nil {
return nil, err
}
objTree.id = objTree.treeStorage.Id()
objTree.rawRoot, err = objTree.treeStorage.Root()
if err != nil {
return nil, err
}
header, err := objTree.changeBuilder.ConvertFromRaw(objTree.rawRoot, false)
if err != nil {
return nil, err
}
objTree.root = header
return hTree, nil
}

View File

@ -9,6 +9,7 @@ import (
reflect "reflect" reflect "reflect"
objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
updatelistener "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto" spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -249,6 +250,21 @@ func (mr *MockSyncTreeMockRecorder) Delete() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockSyncTree)(nil).Delete)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockSyncTree)(nil).Delete))
} }
// GetChange mocks base method.
func (m *MockSyncTree) GetChange(arg0 string) (*objecttree.Change, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetChange", arg0)
ret0, _ := ret[0].(*objecttree.Change)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetChange indicates an expected call of GetChange.
func (mr *MockSyncTreeMockRecorder) GetChange(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChange", reflect.TypeOf((*MockSyncTree)(nil).GetChange), arg0)
}
// HandleMessage mocks base method. // HandleMessage mocks base method.
func (m *MockSyncTree) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error { func (m *MockSyncTree) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -415,6 +431,18 @@ func (mr *MockSyncTreeMockRecorder) Root() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncTree)(nil).Root)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncTree)(nil).Root))
} }
// SetListener mocks base method.
func (m *MockSyncTree) SetListener(arg0 updatelistener.UpdateListener) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetListener", arg0)
}
// SetListener indicates an expected call of SetListener.
func (mr *MockSyncTreeMockRecorder) SetListener(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetListener", reflect.TypeOf((*MockSyncTree)(nil).SetListener), arg0)
}
// SnapshotPath mocks base method. // SnapshotPath mocks base method.
func (m *MockSyncTree) SnapshotPath() []string { func (m *MockSyncTree) SnapshotPath() []string {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -30,9 +30,14 @@ type HeadNotifiable interface {
UpdateHeads(id string, heads []string) UpdateHeads(id string, heads []string)
} }
type ListenerSetter interface {
SetListener(listener updatelistener.UpdateListener)
}
type SyncTree interface { type SyncTree interface {
objecttree.ObjectTree objecttree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
ListenerSetter
Ping(ctx context.Context) (err error) Ping(ctx context.Context) (err error)
} }
@ -210,6 +215,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
return return
} }
func (s *syncTree) SetListener(listener updatelistener.UpdateListener) {
// this should be called under lock
s.listener = listener
}
func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) {
if err = s.checkAlive(); err != nil { if err = s.checkAlive(); err != nil {
return return

View File

@ -20,7 +20,7 @@ type ObjectSync interface {
MessagePool() MessagePool MessagePool() MessagePool
ActionQueue() ActionQueue ActionQueue() ActionQueue
Init(getter syncobjectgetter.SyncObjectGetter) Init()
Close() (err error) Close() (err error)
} }
@ -35,7 +35,10 @@ type objectSync struct {
cancelSync context.CancelFunc cancelSync context.CancelFunc
} }
func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) { func NewObjectSync(
spaceId string,
streamManager StreamManager,
objectGetter syncobjectgetter.SyncObjectGetter) (objectSync ObjectSync) {
msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return objectSync.HandleMessage(ctx, senderId, message) return objectSync.HandleMessage(ctx, senderId, message)
}) })
@ -43,6 +46,7 @@ func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync Obje
objectSync = newObjectSync( objectSync = newObjectSync(
spaceId, spaceId,
msgPool, msgPool,
objectGetter,
syncCtx, syncCtx,
cancel) cancel)
return return
@ -51,20 +55,21 @@ func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync Obje
func newObjectSync( func newObjectSync(
spaceId string, spaceId string,
streamPool MessagePool, streamPool MessagePool,
objectGetter syncobjectgetter.SyncObjectGetter,
syncCtx context.Context, syncCtx context.Context,
cancel context.CancelFunc, cancel context.CancelFunc,
) *objectSync { ) *objectSync {
return &objectSync{ return &objectSync{
streamPool: streamPool, objectGetter: objectGetter,
spaceId: spaceId, streamPool: streamPool,
spaceId: spaceId,
syncCtx: syncCtx, syncCtx: syncCtx,
cancelSync: cancel, cancelSync: cancel,
actionQueue: NewDefaultActionQueue(), actionQueue: NewDefaultActionQueue(),
} }
} }
func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) { func (s *objectSync) Init() {
s.objectGetter = objectGetter
s.actionQueue.Run() s.actionQueue.Run()
} }

View File

@ -2,8 +2,11 @@ package settings
import ( import (
"context" "context"
"time"
) )
const deleteLoopInterval = time.Second * 20
type deleteLoop struct { type deleteLoop struct {
deleteCtx context.Context deleteCtx context.Context
deleteCancel context.CancelFunc deleteCancel context.CancelFunc
@ -30,12 +33,17 @@ func (dl *deleteLoop) Run() {
func (dl *deleteLoop) loop() { func (dl *deleteLoop) loop() {
defer close(dl.loopDone) defer close(dl.loopDone)
dl.deleteFunc() dl.deleteFunc()
ticker := time.NewTicker(deleteLoopInterval)
defer ticker.Stop()
for { for {
select { select {
case <-dl.deleteCtx.Done(): case <-dl.deleteCtx.Done():
return return
case <-dl.deleteChan: case <-dl.deleteChan:
dl.deleteFunc() dl.deleteFunc()
ticker.Reset(deleteLoopInterval)
case <-ticker.C:
dl.deleteFunc()
} }
} }
} }

View File

@ -14,7 +14,6 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
@ -86,6 +85,7 @@ type Space interface {
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
DeleteTree(ctx context.Context, id string) (err error) DeleteTree(ctx context.Context, id string) (err error)
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
HeadSync() headsync.HeadSync HeadSync() headsync.HeadSync
ObjectSync() objectsync.ObjectSync ObjectSync() objectsync.ObjectSync
@ -104,7 +104,7 @@ type space struct {
headSync headsync.HeadSync headSync headsync.HeadSync
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
cache treegetter.TreeGetter cache *commonGetter
account accountservice.Service account accountservice.Service
aclList *syncacl.SyncAcl aclList *syncacl.SyncAcl
configuration nodeconf.Configuration configuration nodeconf.Configuration
@ -171,6 +171,7 @@ func (s *space) Init(ctx context.Context) (err error) {
return return
} }
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool())
s.cache.AddObject(s.aclList)
deletionState := deletionstate.NewDeletionState(s.storage) deletionState := deletionstate.NewDeletionState(s.storage)
deps := settings.Deps{ deps := settings.Deps{
@ -191,9 +192,8 @@ func (s *space) Init(ctx context.Context) (err error) {
DeletionState: deletionState, DeletionState: deletionState,
} }
s.settingsObject = settings.NewSettingsObject(deps, s.id) s.settingsObject = settings.NewSettingsObject(deps, s.id)
s.cache.AddObject(s.settingsObject)
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject) s.objectSync.Init()
s.objectSync.Init(objectGetter)
s.headSync.Init(initialIds, deletionState) s.headSync.Init(initialIds, deletionState)
err = s.settingsObject.Init(ctx) err = s.settingsObject.Init(ctx)
if err != nil { if err != nil {
@ -289,6 +289,11 @@ type BuildTreeOpts struct {
WaitTreeRemoteSync bool WaitTreeRemoteSync bool
} }
type HistoryTreeOpts struct {
BeforeId string
Include bool
}
func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) {
if s.isClosed.Load() { if s.isClosed.Load() {
err = ErrSpaceClosed err = ErrSpaceClosed
@ -310,6 +315,24 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
} }
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
params := objecttree.HistoryTreeParams{
AclList: s.aclList,
BeforeId: opts.BeforeId,
IncludeBeforeId: opts.Include,
}
params.TreeStorage, err = s.storage.TreeStorage(id)
if err != nil {
return
}
return objecttree.BuildHistoryTree(params)
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) { func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsObject.DeleteObject(id) return s.settingsObject.DeleteObject(id)
} }

View File

@ -118,7 +118,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast() lastConfiguration := s.configurationService.GetLast()
confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool)
getter := newCommonGetter(st.Id(), s.treeGetter)
syncStatus := syncstatus.NewNoOpSyncStatus() syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but... // this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) { if !lastConfiguration.IsResponsible(st.Id()) {
@ -126,21 +126,20 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
} }
// TODO: [che] remove *5 headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log)
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log)
streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
objectSync := objectsync.NewObjectSync(streamManager, id) objectSync := objectsync.NewObjectSync(id, streamManager, getter)
sp := &space{ sp := &space{
id: id, id: id,
objectSync: objectSync, objectSync: objectSync,
headSync: headSync, headSync: headSync,
syncStatus: syncStatus, syncStatus: syncStatus,
cache: s.treeGetter, cache: getter,
account: s.account, account: s.account,
configuration: lastConfiguration, configuration: lastConfiguration,
storage: st, storage: st,

View File

@ -49,6 +49,7 @@ func (s *BaseDrpcServer) Run(ctx context.Context, params Params) (err error) {
return err return err
} }
tlsList := params.Converter(tcpList, params.TimeoutMillis) tlsList := params.Converter(tcpList, params.TimeoutMillis)
s.listeners = append(s.listeners, tlsList)
go s.serve(ctx, tlsList) go s.serve(ctx, tlsList)
} }
return return