Simplify space locked solution

This commit is contained in:
mcrakhman 2022-12-03 11:48:18 +01:00
parent 494b7552ce
commit 9ae5710854
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 87 additions and 92 deletions

View File

@ -106,16 +106,6 @@ type space struct {
treesUsed atomic.Int32 treesUsed atomic.Int32
} }
func (s *space) StartTree() {
s.treesUsed.Add(1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree")
}
func (s *space) CloseTree() {
s.treesUsed.Add(-1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree")
}
func (s *space) LastUsage() time.Time { func (s *space) LastUsage() time.Time {
return s.syncService.LastUsage() return s.syncService.LastUsage()
} }
@ -228,15 +218,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
return return
} }
deps := synctree.CreateDeps{ deps := synctree.CreateDeps{
SpaceId: s.id, SpaceId: s.id,
Payload: payload, Payload: payload,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.diffService, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s, TreeUsage: &s.treesUsed,
} }
return synctree.DeriveSyncTree(ctx, deps) return synctree.DeriveSyncTree(ctx, deps)
} }
@ -247,15 +237,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
return return
} }
deps := synctree.CreateDeps{ deps := synctree.CreateDeps{
SpaceId: s.id, SpaceId: s.id,
Payload: payload, Payload: payload,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.diffService, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s, TreeUsage: &s.treesUsed,
} }
return synctree.CreateSyncTree(ctx, deps) return synctree.CreateSyncTree(ctx, deps)
} }
@ -266,14 +256,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return return
} }
deps := synctree.BuildDeps{ deps := synctree.BuildDeps{
SpaceId: s.id, SpaceId: s.id,
StreamPool: s.syncService.StreamPool(), StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration, Configuration: s.configuration,
HeadNotifiable: s.diffService, HeadNotifiable: s.diffService,
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s, TreeUsage: &s.treesUsed,
} }
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
} }

View File

@ -18,6 +18,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
) )
var ( var (
@ -30,21 +31,16 @@ type SyncTree interface {
synchandler.SyncHandler synchandler.SyncHandler
} }
type TreeUsageController interface {
StartTree()
CloseTree()
}
// SyncTree sends head updates to sync service and also sends new changes to update listener // SyncTree sends head updates to sync service and also sends new changes to update listener
type syncTree struct { type syncTree struct {
tree.ObjectTree tree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
syncClient SyncClient syncClient SyncClient
notifiable diffservice.HeadNotifiable notifiable diffservice.HeadNotifiable
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
usageController TreeUsageController treeUsage *atomic.Int32
isClosed bool isClosed bool
isDeleted bool isDeleted bool
} }
var log = logger.NewNamed("commonspace.synctree").Sugar() var log = logger.NewNamed("commonspace.synctree").Sugar()
@ -55,27 +51,27 @@ var buildObjectTree = tree.BuildObjectTree
var createSyncClient = newSyncClient var createSyncClient = newSyncClient
type CreateDeps struct { type CreateDeps struct {
SpaceId string SpaceId string
Payload tree.ObjectTreeCreatePayload Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
AclList list.ACLList AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeUsageController TreeUsageController TreeUsage *atomic.Int32
} }
type BuildDeps struct { type BuildDeps struct {
SpaceId string SpaceId string
StreamPool syncservice.StreamPool StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
AclList list.ACLList AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage TreeStorage storage.TreeStorage
TreeUsageController TreeUsageController TreeUsage *atomic.Int32
} }
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) { func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) {
@ -89,11 +85,11 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
sharedFactory, sharedFactory,
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController, treeUsage: deps.TreeUsage,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
@ -103,9 +99,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil { syncTree.treeUsage.Add(1)
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -123,11 +117,11 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
GetRequestFactory(), GetRequestFactory(),
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController, treeUsage: deps.TreeUsage,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
@ -138,9 +132,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil { syncTree.treeUsage.Add(1)
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -227,11 +219,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
GetRequestFactory(), GetRequestFactory(),
deps.Configuration) deps.Configuration)
syncTree := &syncTree{ syncTree := &syncTree{
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController, treeUsage: deps.TreeUsage,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
@ -241,9 +233,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil { syncTree.treeUsage.Add(1)
syncTree.usageController.StartTree()
}
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update // here we will have different behaviour based on who is sending this update
@ -337,9 +327,7 @@ func (s *syncTree) Close() (err error) {
if s.isClosed { if s.isClosed {
return ErrSyncTreeClosed return ErrSyncTreeClosed
} }
if s.usageController != nil { s.treeUsage.Add(-1)
s.usageController.CloseTree()
}
s.isClosed = true s.isClosed = true
return return
} }

View File

@ -209,6 +209,23 @@ func (s *service) registerClientCommands() {
} }
return return
}} }}
s.clientCommands["all-spaces"] = Command{Cmd: func(server peers.Peer, params []string) (res string, err error) {
if len(params) != 0 {
err = ErrIncorrectParamsCount
return
}
resp, err := client.AllSpaces(context.Background(), server.Address, &apiproto.AllSpacesRequest{})
if err != nil {
return
}
for treeIdx, spaceId := range resp.SpaceIds {
res += spaceId
if treeIdx != len(resp.SpaceIds)-1 {
res += "\n"
}
}
return
}}
} }
func (s *service) registerNodeCommands() { func (s *service) registerNodeCommands() {