Simplify space locked solution
This commit is contained in:
parent
af5077d1c2
commit
0d0bd82dc7
@ -106,16 +106,6 @@ type space struct {
|
|||||||
treesUsed atomic.Int32
|
treesUsed atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *space) StartTree() {
|
|
||||||
s.treesUsed.Add(1)
|
|
||||||
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *space) CloseTree() {
|
|
||||||
s.treesUsed.Add(-1)
|
|
||||||
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *space) LastUsage() time.Time {
|
func (s *space) LastUsage() time.Time {
|
||||||
return s.syncService.LastUsage()
|
return s.syncService.LastUsage()
|
||||||
}
|
}
|
||||||
@ -228,15 +218,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
deps := synctree.CreateDeps{
|
deps := synctree.CreateDeps{
|
||||||
SpaceId: s.id,
|
SpaceId: s.id,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
StreamPool: s.syncService.StreamPool(),
|
StreamPool: s.syncService.StreamPool(),
|
||||||
Configuration: s.configuration,
|
Configuration: s.configuration,
|
||||||
HeadNotifiable: s.diffService,
|
HeadNotifiable: s.diffService,
|
||||||
Listener: listener,
|
Listener: listener,
|
||||||
AclList: s.aclList,
|
AclList: s.aclList,
|
||||||
SpaceStorage: s.storage,
|
SpaceStorage: s.storage,
|
||||||
TreeUsageController: s,
|
TreeUsage: &s.treesUsed,
|
||||||
}
|
}
|
||||||
return synctree.DeriveSyncTree(ctx, deps)
|
return synctree.DeriveSyncTree(ctx, deps)
|
||||||
}
|
}
|
||||||
@ -247,15 +237,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
deps := synctree.CreateDeps{
|
deps := synctree.CreateDeps{
|
||||||
SpaceId: s.id,
|
SpaceId: s.id,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
StreamPool: s.syncService.StreamPool(),
|
StreamPool: s.syncService.StreamPool(),
|
||||||
Configuration: s.configuration,
|
Configuration: s.configuration,
|
||||||
HeadNotifiable: s.diffService,
|
HeadNotifiable: s.diffService,
|
||||||
Listener: listener,
|
Listener: listener,
|
||||||
AclList: s.aclList,
|
AclList: s.aclList,
|
||||||
SpaceStorage: s.storage,
|
SpaceStorage: s.storage,
|
||||||
TreeUsageController: s,
|
TreeUsage: &s.treesUsed,
|
||||||
}
|
}
|
||||||
return synctree.CreateSyncTree(ctx, deps)
|
return synctree.CreateSyncTree(ctx, deps)
|
||||||
}
|
}
|
||||||
@ -266,14 +256,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
deps := synctree.BuildDeps{
|
deps := synctree.BuildDeps{
|
||||||
SpaceId: s.id,
|
SpaceId: s.id,
|
||||||
StreamPool: s.syncService.StreamPool(),
|
StreamPool: s.syncService.StreamPool(),
|
||||||
Configuration: s.configuration,
|
Configuration: s.configuration,
|
||||||
HeadNotifiable: s.diffService,
|
HeadNotifiable: s.diffService,
|
||||||
Listener: listener,
|
Listener: listener,
|
||||||
AclList: s.aclList,
|
AclList: s.aclList,
|
||||||
SpaceStorage: s.storage,
|
SpaceStorage: s.storage,
|
||||||
TreeUsageController: s,
|
TreeUsage: &s.treesUsed,
|
||||||
}
|
}
|
||||||
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
|
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package synctree
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
|
||||||
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||||
@ -16,6 +17,8 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -28,21 +31,16 @@ type SyncTree interface {
|
|||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type TreeUsageController interface {
|
|
||||||
StartTree()
|
|
||||||
CloseTree()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
||||||
type syncTree struct {
|
type syncTree struct {
|
||||||
tree.ObjectTree
|
tree.ObjectTree
|
||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
syncClient SyncClient
|
syncClient SyncClient
|
||||||
notifiable diffservice.HeadNotifiable
|
notifiable diffservice.HeadNotifiable
|
||||||
listener updatelistener.UpdateListener
|
listener updatelistener.UpdateListener
|
||||||
usageController TreeUsageController
|
treeUsage *atomic.Int32
|
||||||
isClosed bool
|
isClosed bool
|
||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
||||||
@ -53,27 +51,27 @@ var buildObjectTree = tree.BuildObjectTree
|
|||||||
var createSyncClient = newSyncClient
|
var createSyncClient = newSyncClient
|
||||||
|
|
||||||
type CreateDeps struct {
|
type CreateDeps struct {
|
||||||
SpaceId string
|
SpaceId string
|
||||||
Payload tree.ObjectTreeCreatePayload
|
Payload tree.ObjectTreeCreatePayload
|
||||||
Configuration nodeconf.Configuration
|
Configuration nodeconf.Configuration
|
||||||
HeadNotifiable diffservice.HeadNotifiable
|
HeadNotifiable diffservice.HeadNotifiable
|
||||||
StreamPool syncservice.StreamPool
|
StreamPool syncservice.StreamPool
|
||||||
Listener updatelistener.UpdateListener
|
Listener updatelistener.UpdateListener
|
||||||
AclList list.ACLList
|
AclList list.ACLList
|
||||||
SpaceStorage spacestorage.SpaceStorage
|
SpaceStorage spacestorage.SpaceStorage
|
||||||
TreeUsageController TreeUsageController
|
TreeUsage *atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type BuildDeps struct {
|
type BuildDeps struct {
|
||||||
SpaceId string
|
SpaceId string
|
||||||
StreamPool syncservice.StreamPool
|
StreamPool syncservice.StreamPool
|
||||||
Configuration nodeconf.Configuration
|
Configuration nodeconf.Configuration
|
||||||
HeadNotifiable diffservice.HeadNotifiable
|
HeadNotifiable diffservice.HeadNotifiable
|
||||||
Listener updatelistener.UpdateListener
|
Listener updatelistener.UpdateListener
|
||||||
AclList list.ACLList
|
AclList list.ACLList
|
||||||
SpaceStorage spacestorage.SpaceStorage
|
SpaceStorage spacestorage.SpaceStorage
|
||||||
TreeStorage storage.TreeStorage
|
TreeStorage storage.TreeStorage
|
||||||
TreeUsageController TreeUsageController
|
TreeUsage *atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) {
|
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) {
|
||||||
@ -87,11 +85,11 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
|||||||
sharedFactory,
|
sharedFactory,
|
||||||
deps.Configuration)
|
deps.Configuration)
|
||||||
syncTree := &syncTree{
|
syncTree := &syncTree{
|
||||||
ObjectTree: objTree,
|
ObjectTree: objTree,
|
||||||
syncClient: syncClient,
|
syncClient: syncClient,
|
||||||
notifiable: deps.HeadNotifiable,
|
notifiable: deps.HeadNotifiable,
|
||||||
usageController: deps.TreeUsageController,
|
treeUsage: deps.TreeUsage,
|
||||||
listener: deps.Listener,
|
listener: deps.Listener,
|
||||||
}
|
}
|
||||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||||
syncTree.SyncHandler = syncHandler
|
syncTree.SyncHandler = syncHandler
|
||||||
@ -101,9 +99,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
|||||||
if syncTree.listener != nil {
|
if syncTree.listener != nil {
|
||||||
syncTree.listener.Rebuild(syncTree)
|
syncTree.listener.Rebuild(syncTree)
|
||||||
}
|
}
|
||||||
if syncTree.usageController != nil {
|
syncTree.treeUsage.Add(1)
|
||||||
syncTree.usageController.StartTree()
|
|
||||||
}
|
|
||||||
|
|
||||||
headUpdate := syncClient.CreateHeadUpdate(t, nil)
|
headUpdate := syncClient.CreateHeadUpdate(t, nil)
|
||||||
err = syncClient.BroadcastAsync(headUpdate)
|
err = syncClient.BroadcastAsync(headUpdate)
|
||||||
@ -121,11 +117,11 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
|||||||
GetRequestFactory(),
|
GetRequestFactory(),
|
||||||
deps.Configuration)
|
deps.Configuration)
|
||||||
syncTree := &syncTree{
|
syncTree := &syncTree{
|
||||||
ObjectTree: objTree,
|
ObjectTree: objTree,
|
||||||
syncClient: syncClient,
|
syncClient: syncClient,
|
||||||
notifiable: deps.HeadNotifiable,
|
notifiable: deps.HeadNotifiable,
|
||||||
usageController: deps.TreeUsageController,
|
treeUsage: deps.TreeUsage,
|
||||||
listener: deps.Listener,
|
listener: deps.Listener,
|
||||||
}
|
}
|
||||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||||
syncTree.SyncHandler = syncHandler
|
syncTree.SyncHandler = syncHandler
|
||||||
@ -136,9 +132,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
|
|||||||
if syncTree.listener != nil {
|
if syncTree.listener != nil {
|
||||||
syncTree.listener.Rebuild(syncTree)
|
syncTree.listener.Rebuild(syncTree)
|
||||||
}
|
}
|
||||||
if syncTree.usageController != nil {
|
syncTree.treeUsage.Add(1)
|
||||||
syncTree.usageController.StartTree()
|
|
||||||
}
|
|
||||||
|
|
||||||
headUpdate := syncClient.CreateHeadUpdate(t, nil)
|
headUpdate := syncClient.CreateHeadUpdate(t, nil)
|
||||||
err = syncClient.BroadcastAsync(headUpdate)
|
err = syncClient.BroadcastAsync(headUpdate)
|
||||||
@ -188,6 +182,10 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if resp.GetContent().GetFullSyncResponse() == nil {
|
||||||
|
err = fmt.Errorf("expected to get full sync response, but got something else")
|
||||||
|
return
|
||||||
|
}
|
||||||
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
||||||
|
|
||||||
payload := storage.TreeStorageCreatePayload{
|
payload := storage.TreeStorageCreatePayload{
|
||||||
@ -197,6 +195,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// basically building tree with in-memory storage and validating that it was without errors
|
// basically building tree with in-memory storage and validating that it was without errors
|
||||||
|
log.With(zap.String("id", id)).Debug("validating tree")
|
||||||
err = tree.ValidateRawTree(payload, deps.AclList)
|
err = tree.ValidateRawTree(payload, deps.AclList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -220,11 +219,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
GetRequestFactory(),
|
GetRequestFactory(),
|
||||||
deps.Configuration)
|
deps.Configuration)
|
||||||
syncTree := &syncTree{
|
syncTree := &syncTree{
|
||||||
ObjectTree: objTree,
|
ObjectTree: objTree,
|
||||||
syncClient: syncClient,
|
syncClient: syncClient,
|
||||||
notifiable: deps.HeadNotifiable,
|
notifiable: deps.HeadNotifiable,
|
||||||
usageController: deps.TreeUsageController,
|
treeUsage: deps.TreeUsage,
|
||||||
listener: deps.Listener,
|
listener: deps.Listener,
|
||||||
}
|
}
|
||||||
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
syncHandler := newSyncTreeHandler(syncTree, syncClient)
|
||||||
syncTree.SyncHandler = syncHandler
|
syncTree.SyncHandler = syncHandler
|
||||||
@ -234,9 +233,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
if syncTree.listener != nil {
|
if syncTree.listener != nil {
|
||||||
syncTree.listener.Rebuild(syncTree)
|
syncTree.listener.Rebuild(syncTree)
|
||||||
}
|
}
|
||||||
if syncTree.usageController != nil {
|
syncTree.treeUsage.Add(1)
|
||||||
syncTree.usageController.StartTree()
|
|
||||||
}
|
|
||||||
|
|
||||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||||
// here we will have different behaviour based on who is sending this update
|
// here we will have different behaviour based on who is sending this update
|
||||||
@ -330,9 +327,7 @@ func (s *syncTree) Close() (err error) {
|
|||||||
if s.isClosed {
|
if s.isClosed {
|
||||||
return ErrSyncTreeClosed
|
return ErrSyncTreeClosed
|
||||||
}
|
}
|
||||||
if s.usageController != nil {
|
s.treeUsage.Add(-1)
|
||||||
s.usageController.CloseTree()
|
|
||||||
}
|
|
||||||
s.isClosed = true
|
s.isClosed = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -209,6 +209,23 @@ func (s *service) registerClientCommands() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}}
|
}}
|
||||||
|
s.clientCommands["all-spaces"] = Command{Cmd: func(server peers.Peer, params []string) (res string, err error) {
|
||||||
|
if len(params) != 0 {
|
||||||
|
err = ErrIncorrectParamsCount
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp, err := client.AllSpaces(context.Background(), server.Address, &apiproto.AllSpacesRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for treeIdx, spaceId := range resp.SpaceIds {
|
||||||
|
res += spaceId
|
||||||
|
if treeIdx != len(resp.SpaceIds)-1 {
|
||||||
|
res += "\n"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) registerNodeCommands() {
|
func (s *service) registerNodeCommands() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user