Change object creation to do it through cache
This commit is contained in:
parent
fd690d5758
commit
0f257cbc0c
@ -53,8 +53,7 @@ func (s *service) CreateDocument(spaceId string) (id string, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
id, err = textdocument.CreateTextDocument(context.Background(), space, s.account)
|
||||
return
|
||||
return textdocument.CreateTextDocument(context.Background(), space, s.account)
|
||||
}
|
||||
|
||||
func (s *service) DeleteDocument(spaceId, documentId string) (err error) {
|
||||
|
||||
@ -33,7 +33,12 @@ func CreateTextDocument(
|
||||
SpaceId: space.Id(),
|
||||
Identity: account.Account().Identity,
|
||||
}
|
||||
return space.CreateTree(ctx, payload)
|
||||
obj, err := space.CreateTree(ctx, payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
id = obj.Id()
|
||||
return
|
||||
}
|
||||
|
||||
func NewTextDocument(ctx context.Context, space commonspace.Space, id string, listener updatelistener.UpdateListener, account accountservice.Service) (doc TextDocument, err error) {
|
||||
|
||||
@ -21,6 +21,19 @@ type ObjectTreeCreatePayload struct {
|
||||
IsEncrypted bool
|
||||
}
|
||||
|
||||
func CreateObjectTreeRoot(payload ObjectTreeCreatePayload, aclList list.AclList) (root *treechangeproto.RawTreeChangeWithId, err error) {
|
||||
bytes := make([]byte, 32)
|
||||
_, err = rand.Read(bytes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return createObjectTreeRoot(payload, time.Now().UnixNano(), bytes, aclList)
|
||||
}
|
||||
|
||||
func DeriveObjectTreeRoot(payload ObjectTreeCreatePayload, aclList list.AclList) (root *treechangeproto.RawTreeChangeWithId, err error) {
|
||||
return createObjectTreeRoot(payload, 0, nil, aclList)
|
||||
}
|
||||
|
||||
func BuildObjectTree(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) {
|
||||
rootChange, err := treeStorage.Root()
|
||||
if err != nil {
|
||||
@ -55,6 +68,29 @@ func createObjectTree(
|
||||
seed []byte,
|
||||
aclList list.AclList,
|
||||
createStorage treestorage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
|
||||
raw, err := createObjectTreeRoot(payload, timestamp, seed, aclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create storage
|
||||
st, err := createStorage(treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: raw,
|
||||
Changes: []*treechangeproto.RawTreeChangeWithId{raw},
|
||||
Heads: []string{raw.Id},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return BuildObjectTree(st, aclList)
|
||||
}
|
||||
|
||||
func createObjectTreeRoot(
|
||||
payload ObjectTreeCreatePayload,
|
||||
timestamp int64,
|
||||
seed []byte,
|
||||
aclList list.AclList) (root *treechangeproto.RawTreeChangeWithId, err error) {
|
||||
aclList.RLock()
|
||||
aclHeadId := aclList.Head().Id
|
||||
aclList.RUnlock()
|
||||
@ -72,22 +108,8 @@ func createObjectTree(
|
||||
Seed: seed,
|
||||
}
|
||||
|
||||
_, raw, err := NewChangeBuilder(keychain.NewKeychain(), nil).BuildInitialContent(cnt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create storage
|
||||
st, err := createStorage(treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: raw,
|
||||
Changes: []*treechangeproto.RawTreeChangeWithId{raw},
|
||||
Heads: []string{raw.Id},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return BuildObjectTree(st, aclList)
|
||||
_, root, err = NewChangeBuilder(keychain.NewKeychain(), nil).BuildInitialContent(cnt)
|
||||
return
|
||||
}
|
||||
|
||||
func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
|
||||
|
||||
@ -51,22 +51,9 @@ type syncTree struct {
|
||||
|
||||
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
||||
|
||||
var createDerivedObjectTree = objecttree.CreateDerivedObjectTree
|
||||
var createObjectTree = objecttree.CreateObjectTree
|
||||
var buildObjectTree = objecttree.BuildObjectTree
|
||||
var createSyncClient = newWrappedSyncClient
|
||||
|
||||
type CreateDeps struct {
|
||||
SpaceId string
|
||||
Payload objecttree.ObjectTreeCreatePayload
|
||||
Configuration nodeconf.Configuration
|
||||
ObjectSync objectsync.ObjectSync
|
||||
AclList list.AclList
|
||||
SpaceStorage spacestorage.SpaceStorage
|
||||
SyncStatus syncstatus.StatusUpdater
|
||||
HeadNotifiable HeadNotifiable
|
||||
}
|
||||
|
||||
type BuildDeps struct {
|
||||
SpaceId string
|
||||
ObjectSync objectsync.ObjectSync
|
||||
@ -89,49 +76,6 @@ func newWrappedSyncClient(
|
||||
return newQueuedClient(syncClient, objectSync.ActionQueue())
|
||||
}
|
||||
|
||||
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) {
|
||||
objTree, err := createDerivedObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
sharedFactory,
|
||||
deps.ObjectSync,
|
||||
deps.Configuration)
|
||||
|
||||
id = objTree.Id()
|
||||
heads := objTree.Heads()
|
||||
|
||||
deps.HeadNotifiable.UpdateHeads(id, heads)
|
||||
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
|
||||
deps.SyncStatus.HeadsChange(id, heads)
|
||||
syncClient.BroadcastAsync(headUpdate)
|
||||
return
|
||||
}
|
||||
|
||||
func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) {
|
||||
objTree, err := createObjectTree(deps.Payload, deps.AclList, deps.SpaceStorage.CreateTreeStorage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
syncClient := createSyncClient(
|
||||
deps.SpaceId,
|
||||
sharedFactory,
|
||||
deps.ObjectSync,
|
||||
deps.Configuration)
|
||||
|
||||
id = objTree.Id()
|
||||
heads := objTree.Heads()
|
||||
|
||||
deps.HeadNotifiable.UpdateHeads(id, heads)
|
||||
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
|
||||
deps.SyncStatus.HeadsChange(id, heads)
|
||||
syncClient.BroadcastAsync(headUpdate)
|
||||
return
|
||||
}
|
||||
|
||||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
@ -208,6 +152,14 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
||||
return buildSyncTree(ctx, true, deps)
|
||||
}
|
||||
|
||||
func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) {
|
||||
deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return buildSyncTree(ctx, true, deps)
|
||||
}
|
||||
|
||||
func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t SyncTree, err error) {
|
||||
objTree, err := buildObjectTree(deps.TreeStorage, deps.AclList)
|
||||
if err != nil {
|
||||
|
||||
@ -2,17 +2,13 @@ package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/acl/list"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/acl/list/mock_list"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/objecttree/mock_objecttree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/synctree/mock_synctree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/synctree/updatelistener"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/synctree/updatelistener/mock_updatelistener"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectsync"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacestorage/mock_spacestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncstatus"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
"github.com/golang/mock/gomock"
|
||||
@ -44,81 +40,6 @@ func syncClientFuncCreator(client SyncClient) func(spaceId string, factory Reque
|
||||
}
|
||||
}
|
||||
|
||||
func Test_DeriveSyncTree(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
||||
aclListMock := mock_list.NewMockAclList(ctrl)
|
||||
objTreeMock := newTestObjMock(mock_objecttree.NewMockObjectTree(ctrl))
|
||||
spaceStorageMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||
headNotifiableMock := mock_synctree.NewMockHeadNotifiable(ctrl)
|
||||
spaceId := "spaceId"
|
||||
expectedPayload := objecttree.ObjectTreeCreatePayload{SpaceId: spaceId}
|
||||
createDerivedObjectTree = func(payload objecttree.ObjectTreeCreatePayload, l list.AclList, create treestorage.TreeStorageCreatorFunc) (objTree objecttree.ObjectTree, err error) {
|
||||
require.Equal(t, l, aclListMock)
|
||||
require.Equal(t, expectedPayload, payload)
|
||||
return objTreeMock, nil
|
||||
}
|
||||
createSyncClient = syncClientFuncCreator(syncClientMock)
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"h1"})
|
||||
headNotifiableMock.EXPECT().UpdateHeads("id", []string{"h1"})
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||
deps := CreateDeps{
|
||||
AclList: aclListMock,
|
||||
SpaceId: spaceId,
|
||||
Payload: expectedPayload,
|
||||
SpaceStorage: spaceStorageMock,
|
||||
SyncStatus: syncstatus.NewNoOpSyncStatus(),
|
||||
HeadNotifiable: headNotifiableMock,
|
||||
}
|
||||
objTreeMock.EXPECT().Id().Return("id")
|
||||
|
||||
_, err := DeriveSyncTree(ctx, deps)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func Test_CreateSyncTree(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
|
||||
aclListMock := mock_list.NewMockAclList(ctrl)
|
||||
objTreeMock := newTestObjMock(mock_objecttree.NewMockObjectTree(ctrl))
|
||||
spaceStorageMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||
headNotifiableMock := mock_synctree.NewMockHeadNotifiable(ctrl)
|
||||
spaceId := "spaceId"
|
||||
expectedPayload := objecttree.ObjectTreeCreatePayload{SpaceId: spaceId}
|
||||
createObjectTree = func(payload objecttree.ObjectTreeCreatePayload, l list.AclList, create treestorage.TreeStorageCreatorFunc) (objTree objecttree.ObjectTree, err error) {
|
||||
require.Equal(t, l, aclListMock)
|
||||
require.Equal(t, expectedPayload, payload)
|
||||
return objTreeMock, nil
|
||||
}
|
||||
|
||||
createSyncClient = syncClientFuncCreator(syncClientMock)
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"h1"})
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
headNotifiableMock.EXPECT().UpdateHeads("id", []string{"h1"})
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Any(), gomock.Nil()).Return(headUpdate)
|
||||
syncClientMock.EXPECT().BroadcastAsync(gomock.Eq(headUpdate)).Return(nil)
|
||||
objTreeMock.EXPECT().Id().Return("id")
|
||||
deps := CreateDeps{
|
||||
AclList: aclListMock,
|
||||
SpaceId: spaceId,
|
||||
Payload: expectedPayload,
|
||||
SpaceStorage: spaceStorageMock,
|
||||
SyncStatus: syncstatus.NewNoOpSyncStatus(),
|
||||
HeadNotifiable: headNotifiableMock,
|
||||
}
|
||||
|
||||
_, err := CreateSyncTree(ctx, deps)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func Test_BuildSyncTree(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
@ -12,6 +12,8 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/synctree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/synctree/updatelistener"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/treegetter"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectsync"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settings"
|
||||
@ -44,6 +46,10 @@ type SpaceCreatePayload struct {
|
||||
ReplicationKey uint64
|
||||
}
|
||||
|
||||
type spaceCtxKey int
|
||||
|
||||
const treePayloadKey spaceCtxKey = 0
|
||||
|
||||
const SpaceTypeDerived = "derived.space"
|
||||
|
||||
type SpaceDerivePayload struct {
|
||||
@ -76,8 +82,8 @@ type Space interface {
|
||||
|
||||
SpaceSyncRpc() RpcHandler
|
||||
|
||||
DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (string, error)
|
||||
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (string, error)
|
||||
DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (objecttree.ObjectTree, error)
|
||||
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (objecttree.ObjectTree, error)
|
||||
BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (objecttree.ObjectTree, error)
|
||||
DeleteTree(ctx context.Context, id string) (err error)
|
||||
|
||||
@ -219,40 +225,43 @@ func (s *space) DebugAllHeads() []headsync.TreeHeads {
|
||||
return s.headSync.DebugAllHeads()
|
||||
}
|
||||
|
||||
func (s *space) DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (id string, err error) {
|
||||
func (s *space) DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (t objecttree.ObjectTree, err error) {
|
||||
if s.isClosed.Load() {
|
||||
err = ErrSpaceClosed
|
||||
return
|
||||
}
|
||||
deps := synctree.CreateDeps{
|
||||
SpaceId: s.id,
|
||||
Payload: payload,
|
||||
ObjectSync: s.objectSync,
|
||||
Configuration: s.configuration,
|
||||
AclList: s.aclList,
|
||||
SpaceStorage: s.storage,
|
||||
SyncStatus: s.syncStatus,
|
||||
HeadNotifiable: s.headSync,
|
||||
root, err := objecttree.DeriveObjectTreeRoot(payload, s.aclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return synctree.DeriveSyncTree(ctx, deps)
|
||||
res := treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: root,
|
||||
Changes: []*treechangeproto.RawTreeChangeWithId{root},
|
||||
Heads: []string{root.Id},
|
||||
}
|
||||
ctx = context.WithValue(ctx, treePayloadKey, res)
|
||||
// here we must be sure that the object is created synchronously,
|
||||
// so there won't be any conflicts, therefore we do it through cache
|
||||
return s.cache.GetTree(ctx, s.id, root.Id)
|
||||
}
|
||||
|
||||
func (s *space) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (id string, err error) {
|
||||
func (s *space) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (t objecttree.ObjectTree, err error) {
|
||||
if s.isClosed.Load() {
|
||||
err = ErrSpaceClosed
|
||||
return
|
||||
}
|
||||
deps := synctree.CreateDeps{
|
||||
SpaceId: s.id,
|
||||
Payload: payload,
|
||||
ObjectSync: s.objectSync,
|
||||
Configuration: s.configuration,
|
||||
AclList: s.aclList,
|
||||
SpaceStorage: s.storage,
|
||||
SyncStatus: s.syncStatus,
|
||||
HeadNotifiable: s.headSync,
|
||||
root, err := objecttree.CreateObjectTreeRoot(payload, s.aclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return synctree.CreateSyncTree(ctx, deps)
|
||||
|
||||
res := treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: root,
|
||||
Changes: []*treechangeproto.RawTreeChangeWithId{root},
|
||||
Heads: []string{root.Id},
|
||||
}
|
||||
ctx = context.WithValue(ctx, treePayloadKey, res)
|
||||
return s.cache.GetTree(ctx, s.id, root.Id)
|
||||
}
|
||||
|
||||
func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) {
|
||||
@ -260,6 +269,10 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
|
||||
err = ErrSpaceClosed
|
||||
return
|
||||
}
|
||||
if payload, exists := ctx.Value(treePayloadKey).(treestorage.TreeStorageCreatePayload); exists {
|
||||
return s.putTree(ctx, payload, listener)
|
||||
}
|
||||
|
||||
deps := synctree.BuildDeps{
|
||||
SpaceId: s.id,
|
||||
ObjectSync: s.objectSync,
|
||||
@ -306,3 +319,28 @@ func (s *space) Close() error {
|
||||
|
||||
return mError.Err()
|
||||
}
|
||||
|
||||
func (s *space) putTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) {
|
||||
if s.isClosed.Load() {
|
||||
err = ErrSpaceClosed
|
||||
return
|
||||
}
|
||||
deps := synctree.BuildDeps{
|
||||
SpaceId: s.id,
|
||||
ObjectSync: s.objectSync,
|
||||
Configuration: s.configuration,
|
||||
HeadNotifiable: s.headSync,
|
||||
Listener: listener,
|
||||
AclList: s.aclList,
|
||||
SpaceStorage: s.storage,
|
||||
TreeUsage: &s.treesUsed,
|
||||
SyncStatus: s.syncStatus,
|
||||
}
|
||||
t, err = synctree.PutSyncTree(ctx, payload, deps)
|
||||
// this can happen only for derived trees, when we've synced same tree already
|
||||
if err == treestorage.ErrTreeExists {
|
||||
ctx = context.WithValue(ctx, treePayloadKey, nil)
|
||||
return synctree.BuildSyncTreeOrGetRemote(ctx, payload.RootRawChange.Id, deps)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user