Stopped space from unloading if trees are still there

This commit is contained in:
mcrakhman 2022-12-02 19:45:43 +01:00
parent aaecd2ae6d
commit 494b7552ce
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 114 additions and 59 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/zeebo/errs"
@ -65,6 +66,9 @@ func NewSpaceId(id string, repKey uint64) string {
}
type Space interface {
ocache.ObjectLocker
ocache.ObjectLastUsage
Id() string
Init(ctx context.Context) error
@ -98,13 +102,29 @@ type space struct {
configuration nodeconf.Configuration
settingsDocument settingsdocument.SettingsDocument
isClosed atomic.Bool
isClosed atomic.Bool
treesUsed atomic.Int32
}
func (s *space) StartTree() {
s.treesUsed.Add(1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree")
}
func (s *space) CloseTree() {
s.treesUsed.Add(-1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree")
}
func (s *space) LastUsage() time.Time {
return s.syncService.LastUsage()
}
func (s *space) Locked() bool {
log.With(zap.Bool("locked", s.treesUsed.Load() > 1)).Debug("space lock status check")
return s.treesUsed.Load() > 1
}
func (s *space) Id() string {
return s.id
}
@ -131,6 +151,7 @@ func (s *space) Description() (desc SpaceDescription, err error) {
}
func (s *space) Init(ctx context.Context) (err error) {
log.With(zap.String("spaceId", s.id)).Debug("initializing space")
s.storage = newCommonStorage(s.storage)
header, err := s.storage.SpaceHeader()
@ -207,14 +228,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
return
}
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
TreeUsageController: s,
}
return synctree.DeriveSyncTree(ctx, deps)
}
@ -225,14 +247,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
return
}
deps := synctree.CreateDeps{
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
SpaceId: s.id,
Payload: payload,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
TreeUsageController: s,
}
return synctree.CreateSyncTree(ctx, deps)
}
@ -243,13 +266,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
return
}
deps := synctree.BuildDeps{
SpaceId: s.id,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
SpaceId: s.id,
StreamPool: s.syncService.StreamPool(),
Configuration: s.configuration,
HeadNotifiable: s.diffService,
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
TreeUsageController: s,
}
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"go.uber.org/zap"
"time"
)
@ -36,6 +37,7 @@ type syncService struct {
stopStreamLoop context.CancelFunc
connector nodeconf.ConfConnector
streamLoopDone chan struct{}
log *zap.SugaredLogger // TODO: change to logger
}
func NewSyncService(
@ -62,6 +64,7 @@ func newSyncService(
connector: connector,
clientFactory: clientFactory,
spaceId: spaceId,
log: log.With(zap.String("id", spaceId)),
streamLoopDone: make(chan struct{}),
}
}
@ -83,6 +86,7 @@ func (s *syncService) LastUsage() time.Time {
}
func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId)).Debug("handling message")
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
if err != nil {
return
@ -93,18 +97,21 @@ func (s *syncService) HandleMessage(ctx context.Context, senderId string, messag
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
defer close(s.streamLoopDone)
checkResponsiblePeers := func() {
s.log.Debug("dialing responsible peers")
respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return
}
for _, p := range respPeers {
if s.streamPool.HasActiveStream(p.Id()) {
s.log.Debug("has active stream for", zap.String("id", p.Id()))
continue
}
stream, err := s.clientFactory.Client(p).Stream(ctx)
if err != nil {
err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)
s.log.Errorf("failed to open stream: %v", err)
// so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace
continue
@ -113,9 +120,10 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil {
err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
s.log.Errorf("failed to send first message to stream: %v", err)
continue
}
s.log.Debug("continue reading stream for", zap.String("id", p.Id()))
s.streamPool.AddAndReadStreamAsync(stream)
}
}

View File

@ -30,15 +30,21 @@ type SyncTree interface {
synchandler.SyncHandler
}
type TreeUsageController interface {
StartTree()
CloseTree()
}
// SyncTree sends head updates to sync service and also sends new changes to update listener
type syncTree struct {
tree.ObjectTree
synchandler.SyncHandler
syncClient SyncClient
notifiable diffservice.HeadNotifiable
listener updatelistener.UpdateListener
isClosed bool
isDeleted bool
syncClient SyncClient
notifiable diffservice.HeadNotifiable
listener updatelistener.UpdateListener
usageController TreeUsageController
isClosed bool
isDeleted bool
}
var log = logger.NewNamed("commonspace.synctree").Sugar()
@ -49,25 +55,27 @@ var buildObjectTree = tree.BuildObjectTree
var createSyncClient = newSyncClient
type CreateDeps struct {
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
TreeUsageController TreeUsageController
}
type BuildDeps struct {
SpaceId string
StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
SpaceId string
StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
TreeUsageController TreeUsageController
}
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) {
@ -81,10 +89,11 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
sharedFactory,
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener,
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -94,6 +103,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree)
}
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
@ -111,10 +123,11 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
GetRequestFactory(),
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener,
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -125,6 +138,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree)
}
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate)
@ -211,10 +227,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
GetRequestFactory(),
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
listener: deps.Listener,
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -224,6 +241,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree)
}
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update
@ -317,6 +337,9 @@ func (s *syncTree) Close() (err error) {
if s.isClosed {
return ErrSyncTreeClosed
}
if s.usageController != nil {
s.usageController.CloseTree()
}
s.isClosed = true
return
}