diff --git a/commonspace/commongetter.go b/commonspace/commongetter.go index dd374d8b..d5f67538 100644 --- a/commonspace/commongetter.go +++ b/commonspace/commongetter.go @@ -2,38 +2,51 @@ package commonspace import ( "context" - "github.com/anytypeio/any-sync/commonspace/object/acl/syncacl" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" + "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" - "github.com/anytypeio/any-sync/commonspace/settings" + "golang.org/x/exp/slices" ) -type commonSpaceGetter struct { - spaceId string - aclList *syncacl.SyncAcl - treeGetter treegetter.TreeGetter - settings settings.SettingsObject +type commonGetter struct { + treegetter.TreeGetter + spaceId string + reservedObjects []syncobjectgetter.SyncObject } -func newCommonSpaceGetter(spaceId string, aclList *syncacl.SyncAcl, treeGetter treegetter.TreeGetter, settings settings.SettingsObject) syncobjectgetter.SyncObjectGetter { - return &commonSpaceGetter{ +func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter { + return &commonGetter{ + TreeGetter: getter, spaceId: spaceId, - aclList: aclList, - treeGetter: treeGetter, - settings: settings, } } -func (c *commonSpaceGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { - if c.aclList.Id() == objectId { - obj = c.aclList - return +func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) { + c.reservedObjects = append(c.reservedObjects, object) +} + +func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { + if obj := c.getReservedObject(treeId); obj != nil { + return obj.(objecttree.ObjectTree), nil } - if c.settings.Id() == objectId { - obj = c.settings.(syncobjectgetter.SyncObject) - return + return c.TreeGetter.GetTree(ctx, spaceId, treeId) +} + +func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject { + pos := slices.IndexFunc(c.reservedObjects, func(object syncobjectgetter.SyncObject) bool { + return object.Id() == id + }) + if pos == -1 { + return nil } - t, err := c.treeGetter.GetTree(ctx, c.spaceId, objectId) + return c.reservedObjects[pos] +} + +func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { + if obj := c.getReservedObject(objectId); obj != nil { + return obj, nil + } + t, err := c.TreeGetter.GetTree(ctx, c.spaceId, objectId) if err != nil { return } diff --git a/commonspace/object/syncobjectgetter/syncobjectgetter.go b/commonspace/object/syncobjectgetter/syncobjectgetter.go index 182d1fca..7e40b47a 100644 --- a/commonspace/object/syncobjectgetter/syncobjectgetter.go +++ b/commonspace/object/syncobjectgetter/syncobjectgetter.go @@ -6,6 +6,7 @@ import ( ) type SyncObject interface { + Id() string synchandler.SyncHandler } diff --git a/commonspace/object/tree/objecttree/change.go b/commonspace/object/tree/objecttree/change.go index 9b31e66b..8ded2d43 100644 --- a/commonspace/object/tree/objecttree/change.go +++ b/commonspace/object/tree/objecttree/change.go @@ -53,8 +53,10 @@ func NewChangeFromRoot(id string, ch *treechangeproto.RootChange, signature []by AclHeadId: ch.AclHeadId, Id: id, IsSnapshot: true, + Timestamp: ch.Timestamp, Identity: string(ch.Identity), Signature: signature, + Data: []byte(ch.ChangeType), } } diff --git a/commonspace/object/tree/objecttree/changebuilder.go b/commonspace/object/tree/objecttree/changebuilder.go index bd53c293..03f84b4a 100644 --- a/commonspace/object/tree/objecttree/changebuilder.go +++ b/commonspace/object/tree/objecttree/changebuilder.go @@ -151,7 +151,7 @@ func (c *changeBuilder) BuildContent(payload BuilderContent) (ch *Change, rawIdC AclHeadId: payload.AclHeadId, SnapshotBaseId: payload.SnapshotBaseId, CurrentReadKeyHash: payload.CurrentReadKeyHash, - Timestamp: int64(time.Now().Nanosecond()), + Timestamp: time.Now().UnixNano(), Identity: payload.Identity, IsSnapshot: payload.IsSnapshot, } diff --git a/commonspace/object/tree/objecttree/historytree.go b/commonspace/object/tree/objecttree/historytree.go new file mode 100644 index 00000000..c81143fc --- /dev/null +++ b/commonspace/object/tree/objecttree/historytree.go @@ -0,0 +1,52 @@ +package objecttree + +import ( + "errors" +) + +var ErrLoadBeforeRoot = errors.New("can't load before root") + +type HistoryTree interface { + ReadableObjectTree +} + +type historyTree struct { + *objectTree +} + +func (h *historyTree) rebuildFromStorage(beforeId string, include bool) (err error) { + ot := h.objectTree + ot.treeBuilder.Reset() + if beforeId == ot.Id() && !include { + return ErrLoadBeforeRoot + } + + heads := []string{beforeId} + if beforeId == "" { + heads, err = ot.treeStorage.Heads() + if err != nil { + return + } + } else if !include { + beforeChange, err := ot.treeBuilder.loadChange(beforeId) + if err != nil { + return err + } + heads = beforeChange.PreviousIds + } + + ot.tree, err = ot.treeBuilder.build(heads, nil, nil) + if err != nil { + return + } + ot.aclList.RLock() + defer ot.aclList.RUnlock() + state := ot.aclList.AclState() + + if len(ot.keys) != len(state.UserReadKeys()) { + for key, value := range state.UserReadKeys() { + ot.keys[key] = value + } + } + return +} diff --git a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go index 6e743ba5..1198794c 100644 --- a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go +++ b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go @@ -125,6 +125,21 @@ func (mr *MockObjectTreeMockRecorder) Delete() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockObjectTree)(nil).Delete)) } +// GetChange mocks base method. +func (m *MockObjectTree) GetChange(arg0 string) (*objecttree.Change, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChange", arg0) + ret0, _ := ret[0].(*objecttree.Change) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChange indicates an expected call of GetChange. +func (mr *MockObjectTreeMockRecorder) GetChange(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChange", reflect.TypeOf((*MockObjectTree)(nil).GetChange), arg0) +} + // HasChanges mocks base method. func (m *MockObjectTree) HasChanges(arg0 ...string) bool { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 176d7192..9e5141b0 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -5,7 +5,7 @@ import ( "context" "errors" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" - list2 "github.com/anytypeio/any-sync/commonspace/object/acl/list" + list "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/keychain" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" @@ -23,6 +23,7 @@ type RWLocker interface { var ( ErrHasInvalidChanges = errors.New("the change is invalid") ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") + ErrNoChangeInTree = errors.New("no such change in tree") ) type AddResultSummary int @@ -43,7 +44,7 @@ type RawChangesPayload struct { type ChangeIterateFunc = func(change *Change) bool type ChangeConvertFunc = func(decrypted []byte) (any, error) -type ObjectTree interface { +type ReadableObjectTree interface { RWLocker Id() string @@ -51,11 +52,17 @@ type ObjectTree interface { UnmarshalledHeader() *Change Heads() []string Root() *Change - HasChanges(...string) bool - DebugDump(parser DescriptionParser) (string, error) + HasChanges(...string) bool + GetChange(string) (*Change, error) + + DebugDump(parser DescriptionParser) (string, error) IterateRoot(convert ChangeConvertFunc, iterate ChangeIterateFunc) error IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error +} + +type ObjectTree interface { + ReadableObjectTree SnapshotPath() []string ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*treechangeproto.RawTreeChangeWithId, error) @@ -75,7 +82,7 @@ type objectTree struct { validator ObjectTreeValidator rawChangeLoader *rawChangeLoader treeBuilder *treeBuilder - aclList list2.AclList + aclList list.AclList id string rawRoot *treechangeproto.RawTreeChangeWithId @@ -101,13 +108,13 @@ type objectTreeDeps struct { treeStorage treestorage.TreeStorage validator ObjectTreeValidator rawChangeLoader *rawChangeLoader - aclList list2.AclList + aclList list.AclList } func defaultObjectTreeDeps( rootChange *treechangeproto.RawTreeChangeWithId, treeStorage treestorage.TreeStorage, - aclList list2.AclList) objectTreeDeps { + aclList list.AclList) objectTreeDeps { keychain := keychain.NewKeychain() changeBuilder := NewChangeBuilder(keychain, rootChange) @@ -155,6 +162,13 @@ func (ot *objectTree) Storage() treestorage.TreeStorage { return ot.treeStorage } +func (ot *objectTree) GetChange(id string) (*Change, error) { + if ch, ok := ot.tree.attached[id]; ok { + return ch, nil + } + return nil, ErrNoChangeInTree +} + func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) { payload, err := ot.prepareBuilderContent(content) if err != nil { @@ -208,7 +222,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt canWrite := state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Writer) || state.HasPermission(content.Identity, aclrecordproto.AclUserPermissions_Admin) if !canWrite { - err = list2.ErrInsufficientPermissions + err = list.ErrInsufficientPermissions return } @@ -471,7 +485,7 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate } readKey, exists := ot.keys[c.ReadKeyHash] if !exists { - err = list2.ErrNoReadKey + err = list.ErrNoReadKey return } diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index 03f4322b..24506bf0 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -111,6 +111,24 @@ func prepareAclList(t *testing.T) list.AclList { return aclList } +func prepareTreeDeps(aclList list.AclList) (*mockChangeCreator, objectTreeDeps) { + changeCreator := &mockChangeCreator{} + treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id) + root, _ := treeStorage.Root() + changeBuilder := &mockChangeBuilder{ + originalBuilder: NewChangeBuilder(nil, root), + } + deps := objectTreeDeps{ + changeBuilder: changeBuilder, + treeBuilder: newTreeBuilder(treeStorage, changeBuilder), + treeStorage: treeStorage, + rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder), + validator: &mockChangeValidator{}, + aclList: aclList, + } + return changeCreator, deps +} + func prepareTreeContext(t *testing.T, aclList list.AclList) testTreeContext { changeCreator := &mockChangeCreator{} treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id) @@ -542,4 +560,87 @@ func TestObjectTree(t *testing.T) { assert.Equal(t, ch, raw, "the changes in the storage should be the same") } }) + + t.Run("test history tree not include", func(t *testing.T) { + changeCreator, deps := prepareTreeDeps(aclList) + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"), + changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"), + changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"), + changeCreator.createRaw("4", aclList.Head().Id, "0", false, "2"), + changeCreator.createRaw("5", aclList.Head().Id, "0", false, "1"), + changeCreator.createRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"), + } + deps.treeStorage.TransactionAdd(rawChanges, []string{"6"}) + hTree, err := buildHistoryTree(deps, HistoryTreeParams{ + BeforeId: "6", + IncludeBeforeId: false, + }) + require.NoError(t, err) + // check tree heads + assert.Equal(t, []string{"3", "4", "5"}, hTree.Heads()) + + // check tree iterate + var iterChangesId []string + err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool { + iterChangesId = append(iterChangesId, change.Id) + return true + }) + require.NoError(t, err, "iterate should be without error") + assert.Equal(t, []string{"0", "1", "2", "3", "4", "5"}, iterChangesId) + assert.Equal(t, "0", hTree.Root().Id) + }) + + t.Run("test history tree include", func(t *testing.T) { + changeCreator, deps := prepareTreeDeps(aclList) + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"), + changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"), + changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"), + changeCreator.createRaw("4", aclList.Head().Id, "0", false, "2"), + changeCreator.createRaw("5", aclList.Head().Id, "0", false, "1"), + changeCreator.createRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"), + } + deps.treeStorage.TransactionAdd(rawChanges, []string{"6"}) + hTree, err := buildHistoryTree(deps, HistoryTreeParams{ + BeforeId: "6", + IncludeBeforeId: true, + }) + require.NoError(t, err) + // check tree heads + assert.Equal(t, []string{"6"}, hTree.Heads()) + + // check tree iterate + var iterChangesId []string + err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool { + iterChangesId = append(iterChangesId, change.Id) + return true + }) + require.NoError(t, err, "iterate should be without error") + assert.Equal(t, []string{"0", "1", "2", "3", "4", "5", "6"}, iterChangesId) + assert.Equal(t, "0", hTree.Root().Id) + }) + + t.Run("test history tree root", func(t *testing.T) { + _, deps := prepareTreeDeps(aclList) + hTree, err := buildHistoryTree(deps, HistoryTreeParams{ + BeforeId: "0", + IncludeBeforeId: true, + }) + require.NoError(t, err) + // check tree heads + assert.Equal(t, []string{"0"}, hTree.Heads()) + + // check tree iterate + var iterChangesId []string + err = hTree.IterateFrom(hTree.Root().Id, nil, func(change *Change) bool { + iterChangesId = append(iterChangesId, change.Id) + return true + }) + require.NoError(t, err, "iterate should be without error") + assert.Equal(t, []string{"0"}, iterChangesId) + assert.Equal(t, "0", hTree.Root().Id) + }) } diff --git a/commonspace/object/tree/objecttree/treebuilder.go b/commonspace/object/tree/objecttree/treebuilder.go index e82952b0..ab5592c7 100644 --- a/commonspace/object/tree/objecttree/treebuilder.go +++ b/commonspace/object/tree/objecttree/treebuilder.go @@ -41,16 +41,20 @@ func (tb *treeBuilder) Reset() { } func (tb *treeBuilder) Build(theirHeads []string, newChanges []*Change) (*Tree, error) { - var proposedHeads []string - tb.cache = make(map[string]*Change) heads, err := tb.treeStorage.Heads() if err != nil { return nil, err } + return tb.build(heads, theirHeads, newChanges) +} + +func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []*Change) (*Tree, error) { + var proposedHeads []string + tb.cache = make(map[string]*Change) // TODO: we can actually get this from tree (though not sure, that there would always be // an invariant where the tree has the closest common snapshot of heads) - // so if optimization is critical we can change this to inject from tree directly + // so if optimization is critical we can change this to inject from tree directly, // but then we have to be sure that invariant stays true oldBreakpoint, err := tb.findBreakpoint(heads, true) if err != nil { diff --git a/commonspace/object/tree/objecttree/objecttreefactory.go b/commonspace/object/tree/objecttree/treefactory.go similarity index 73% rename from commonspace/object/tree/objecttree/objecttreefactory.go rename to commonspace/object/tree/objecttree/treefactory.go index 1bd5a885..0f8a337d 100644 --- a/commonspace/object/tree/objecttree/objecttreefactory.go +++ b/commonspace/object/tree/objecttree/treefactory.go @@ -41,6 +41,15 @@ func BuildObjectTree(treeStorage treestorage.TreeStorage, aclList list.AclList) return buildObjectTree(deps) } +func BuildHistoryTree(params HistoryTreeParams) (HistoryTree, error) { + rootChange, err := params.TreeStorage.Root() + if err != nil { + return nil, err + } + deps := defaultObjectTreeDeps(rootChange, params.TreeStorage, params.AclList) + return buildHistoryTree(deps, params) +} + func CreateDerivedObjectTree( payload ObjectTreeCreatePayload, aclList list.AclList, @@ -118,7 +127,6 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { aclList: deps.aclList, changeBuilder: deps.changeBuilder, rawChangeLoader: deps.rawChangeLoader, - tree: nil, keys: make(map[uint64]*symmetric.Key), newChangesBuf: make([]*Change, 0, 10), difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), @@ -146,3 +154,44 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { return objTree, nil } + +type HistoryTreeParams struct { + TreeStorage treestorage.TreeStorage + AclList list.AclList + BeforeId string + IncludeBeforeId bool +} + +func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht HistoryTree, err error) { + objTree := &objectTree{ + treeStorage: deps.treeStorage, + treeBuilder: deps.treeBuilder, + validator: deps.validator, + aclList: deps.aclList, + changeBuilder: deps.changeBuilder, + rawChangeLoader: deps.rawChangeLoader, + keys: make(map[uint64]*symmetric.Key), + newChangesBuf: make([]*Change, 0, 10), + difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), + notSeenIdxBuf: make([]int, 0, 10), + newSnapshotsBuf: make([]*Change, 0, 10), + } + + hTree := &historyTree{objectTree: objTree} + err = hTree.rebuildFromStorage(params.BeforeId, params.IncludeBeforeId) + if err != nil { + return nil, err + } + objTree.id = objTree.treeStorage.Id() + objTree.rawRoot, err = objTree.treeStorage.Root() + if err != nil { + return nil, err + } + + header, err := objTree.changeBuilder.ConvertFromRaw(objTree.rawRoot, false) + if err != nil { + return nil, err + } + objTree.root = header + return hTree, nil +} diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index abdbfb10..e2817539 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -9,6 +9,7 @@ import ( reflect "reflect" objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" + updatelistener "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" treestorage "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -249,6 +250,21 @@ func (mr *MockSyncTreeMockRecorder) Delete() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockSyncTree)(nil).Delete)) } +// GetChange mocks base method. +func (m *MockSyncTree) GetChange(arg0 string) (*objecttree.Change, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChange", arg0) + ret0, _ := ret[0].(*objecttree.Change) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChange indicates an expected call of GetChange. +func (mr *MockSyncTreeMockRecorder) GetChange(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChange", reflect.TypeOf((*MockSyncTree)(nil).GetChange), arg0) +} + // HandleMessage mocks base method. func (m *MockSyncTree) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error { m.ctrl.T.Helper() @@ -415,6 +431,18 @@ func (mr *MockSyncTreeMockRecorder) Root() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncTree)(nil).Root)) } +// SetListener mocks base method. +func (m *MockSyncTree) SetListener(arg0 updatelistener.UpdateListener) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetListener", arg0) +} + +// SetListener indicates an expected call of SetListener. +func (mr *MockSyncTreeMockRecorder) SetListener(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetListener", reflect.TypeOf((*MockSyncTree)(nil).SetListener), arg0) +} + // SnapshotPath mocks base method. func (m *MockSyncTree) SnapshotPath() []string { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 2b7e27f6..a168437d 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -30,9 +30,14 @@ type HeadNotifiable interface { UpdateHeads(id string, heads []string) } +type ListenerSetter interface { + SetListener(listener updatelistener.UpdateListener) +} + type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler + ListenerSetter Ping(ctx context.Context) (err error) } @@ -210,6 +215,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy return } +func (s *syncTree) SetListener(listener updatelistener.UpdateListener) { + // this should be called under lock + s.listener = listener +} + func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { if err = s.checkAlive(); err != nil { return diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index d9c9cc27..5031dbf2 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -20,7 +20,7 @@ type ObjectSync interface { MessagePool() MessagePool ActionQueue() ActionQueue - Init(getter syncobjectgetter.SyncObjectGetter) + Init() Close() (err error) } @@ -35,7 +35,10 @@ type objectSync struct { cancelSync context.CancelFunc } -func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) { +func NewObjectSync( + spaceId string, + streamManager StreamManager, + objectGetter syncobjectgetter.SyncObjectGetter) (objectSync ObjectSync) { msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return objectSync.HandleMessage(ctx, senderId, message) }) @@ -43,6 +46,7 @@ func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync Obje objectSync = newObjectSync( spaceId, msgPool, + objectGetter, syncCtx, cancel) return @@ -51,20 +55,21 @@ func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync Obje func newObjectSync( spaceId string, streamPool MessagePool, + objectGetter syncobjectgetter.SyncObjectGetter, syncCtx context.Context, cancel context.CancelFunc, ) *objectSync { return &objectSync{ - streamPool: streamPool, - spaceId: spaceId, + objectGetter: objectGetter, + streamPool: streamPool, + spaceId: spaceId, syncCtx: syncCtx, cancelSync: cancel, actionQueue: NewDefaultActionQueue(), } } -func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) { - s.objectGetter = objectGetter +func (s *objectSync) Init() { s.actionQueue.Run() } diff --git a/commonspace/settings/deleteloop.go b/commonspace/settings/deleteloop.go index b479020c..fe244ab6 100644 --- a/commonspace/settings/deleteloop.go +++ b/commonspace/settings/deleteloop.go @@ -2,8 +2,11 @@ package settings import ( "context" + "time" ) +const deleteLoopInterval = time.Second * 20 + type deleteLoop struct { deleteCtx context.Context deleteCancel context.CancelFunc @@ -30,12 +33,17 @@ func (dl *deleteLoop) Run() { func (dl *deleteLoop) loop() { defer close(dl.loopDone) dl.deleteFunc() + ticker := time.NewTicker(deleteLoopInterval) + defer ticker.Stop() for { select { case <-dl.deleteCtx.Done(): return case <-dl.deleteChan: dl.deleteFunc() + ticker.Reset(deleteLoopInterval) + case <-ticker.C: + dl.deleteFunc() } } } diff --git a/commonspace/space.go b/commonspace/space.go index 9dda2ece..43127f00 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -14,7 +14,6 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" - "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" @@ -86,6 +85,7 @@ type Space interface { PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) DeleteTree(ctx context.Context, id string) (err error) + BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) HeadSync() headsync.HeadSync ObjectSync() objectsync.ObjectSync @@ -104,7 +104,7 @@ type space struct { headSync headsync.HeadSync syncStatus syncstatus.StatusUpdater storage spacestorage.SpaceStorage - cache treegetter.TreeGetter + cache *commonGetter account accountservice.Service aclList *syncacl.SyncAcl configuration nodeconf.Configuration @@ -171,6 +171,7 @@ func (s *space) Init(ctx context.Context) (err error) { return } s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) + s.cache.AddObject(s.aclList) deletionState := deletionstate.NewDeletionState(s.storage) deps := settings.Deps{ @@ -191,9 +192,8 @@ func (s *space) Init(ctx context.Context) (err error) { DeletionState: deletionState, } s.settingsObject = settings.NewSettingsObject(deps, s.id) - - objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject) - s.objectSync.Init(objectGetter) + s.cache.AddObject(s.settingsObject) + s.objectSync.Init() s.headSync.Init(initialIds, deletionState) err = s.settingsObject.Init(ctx) if err != nil { @@ -289,6 +289,11 @@ type BuildTreeOpts struct { WaitTreeRemoteSync bool } +type HistoryTreeOpts struct { + BeforeId string + Include bool +} + func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { if s.isClosed.Load() { err = ErrSpaceClosed @@ -310,6 +315,24 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } +func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) { + if s.isClosed.Load() { + err = ErrSpaceClosed + return + } + + params := objecttree.HistoryTreeParams{ + AclList: s.aclList, + BeforeId: opts.BeforeId, + IncludeBeforeId: opts.Include, + } + params.TreeStorage, err = s.storage.TreeStorage(id) + if err != nil { + return + } + return objecttree.BuildHistoryTree(params) +} + func (s *space) DeleteTree(ctx context.Context, id string) (err error) { return s.settingsObject.DeleteObject(id) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 52e44a4a..5606d01e 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -118,7 +118,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) - + getter := newCommonGetter(st.Id(), s.treeGetter) syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... if !lastConfiguration.IsResponsible(st.Id()) { @@ -126,21 +126,20 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) } - // TODO: [che] remove *5 - headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log) + headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log) streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) if err != nil { return nil, err } - objectSync := objectsync.NewObjectSync(streamManager, id) + objectSync := objectsync.NewObjectSync(id, streamManager, getter) sp := &space{ id: id, objectSync: objectSync, headSync: headSync, syncStatus: syncStatus, - cache: s.treeGetter, + cache: getter, account: s.account, configuration: lastConfiguration, storage: st, diff --git a/net/rpc/server/baseserver.go b/net/rpc/server/baseserver.go index e7282904..0f0138a2 100644 --- a/net/rpc/server/baseserver.go +++ b/net/rpc/server/baseserver.go @@ -49,6 +49,7 @@ func (s *BaseDrpcServer) Run(ctx context.Context, params Params) (err error) { return err } tlsList := params.Converter(tcpList, params.TimeoutMillis) + s.listeners = append(s.listeners, tlsList) go s.serve(ctx, tlsList) } return