Stopped space from unloading if trees are still there

This commit is contained in:
mcrakhman 2022-12-02 19:45:43 +01:00 committed by Mikhail Iudin
parent c85ae1a335
commit af5077d1c2
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
3 changed files with 114 additions and 59 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -65,6 +66,9 @@ func NewSpaceId(id string, repKey uint64) string {
} }
type Space interface { type Space interface {
ocache.ObjectLocker
ocache.ObjectLastUsage
Id() string Id() string
Init(ctx context.Context) error Init(ctx context.Context) error
@ -99,12 +103,28 @@ type space struct {
settingsDocument settingsdocument.SettingsDocument settingsDocument settingsdocument.SettingsDocument
isClosed atomic.Bool isClosed atomic.Bool
treesUsed atomic.Int32
}
func (s *space) StartTree() {
s.treesUsed.Add(1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("starting tree")
}
func (s *space) CloseTree() {
s.treesUsed.Add(-1)
log.With(zap.Int32("trees used", s.treesUsed.Load())).Debug("closing tree")
} }
func (s *space) LastUsage() time.Time { func (s *space) LastUsage() time.Time {
return s.syncService.LastUsage() return s.syncService.LastUsage()
} }
func (s *space) Locked() bool {
log.With(zap.Bool("locked", s.treesUsed.Load() > 1)).Debug("space lock status check")
return s.treesUsed.Load() > 1
}
func (s *space) Id() string { func (s *space) Id() string {
return s.id return s.id
} }
@ -131,6 +151,7 @@ func (s *space) Description() (desc SpaceDescription, err error) {
} }
func (s *space) Init(ctx context.Context) (err error) { func (s *space) Init(ctx context.Context) (err error) {
log.With(zap.String("spaceId", s.id)).Debug("initializing space")
s.storage = newCommonStorage(s.storage) s.storage = newCommonStorage(s.storage)
header, err := s.storage.SpaceHeader() header, err := s.storage.SpaceHeader()
@ -215,6 +236,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s,
} }
return synctree.DeriveSyncTree(ctx, deps) return synctree.DeriveSyncTree(ctx, deps)
} }
@ -233,6 +255,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s,
} }
return synctree.CreateSyncTree(ctx, deps) return synctree.CreateSyncTree(ctx, deps)
} }
@ -250,6 +273,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsageController: s,
} }
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"go.uber.org/zap"
"time" "time"
) )
@ -36,6 +37,7 @@ type syncService struct {
stopStreamLoop context.CancelFunc stopStreamLoop context.CancelFunc
connector nodeconf.ConfConnector connector nodeconf.ConfConnector
streamLoopDone chan struct{} streamLoopDone chan struct{}
log *zap.SugaredLogger // TODO: change to logger
} }
func NewSyncService( func NewSyncService(
@ -62,6 +64,7 @@ func newSyncService(
connector: connector, connector: connector,
clientFactory: clientFactory, clientFactory: clientFactory,
spaceId: spaceId, spaceId: spaceId,
log: log.With(zap.String("id", spaceId)),
streamLoopDone: make(chan struct{}), streamLoopDone: make(chan struct{}),
} }
} }
@ -83,6 +86,7 @@ func (s *syncService) LastUsage() time.Time {
} }
func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId)).Debug("handling message")
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId) obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
if err != nil { if err != nil {
return return
@ -93,18 +97,21 @@ func (s *syncService) HandleMessage(ctx context.Context, senderId string, messag
func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
defer close(s.streamLoopDone) defer close(s.streamLoopDone)
checkResponsiblePeers := func() { checkResponsiblePeers := func() {
s.log.Debug("dialing responsible peers")
respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId) respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId)
if err != nil { if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return return
} }
for _, p := range respPeers { for _, p := range respPeers {
if s.streamPool.HasActiveStream(p.Id()) { if s.streamPool.HasActiveStream(p.Id()) {
s.log.Debug("has active stream for", zap.String("id", p.Id()))
continue continue
} }
stream, err := s.clientFactory.Client(p).Stream(ctx) stream, err := s.clientFactory.Client(p).Stream(ctx)
if err != nil { if err != nil {
err = rpcerr.Unwrap(err) err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err) s.log.Errorf("failed to open stream: %v", err)
// so here probably the request is failed because there is no such space, // so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace // but diffService should handle such cases by sending pushSpace
continue continue
@ -113,9 +120,10 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil { if err != nil {
err = rpcerr.Unwrap(err) err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) s.log.Errorf("failed to send first message to stream: %v", err)
continue continue
} }
s.log.Debug("continue reading stream for", zap.String("id", p.Id()))
s.streamPool.AddAndReadStreamAsync(stream) s.streamPool.AddAndReadStreamAsync(stream)
} }
} }

View File

@ -28,6 +28,11 @@ type SyncTree interface {
synchandler.SyncHandler synchandler.SyncHandler
} }
type TreeUsageController interface {
StartTree()
CloseTree()
}
// SyncTree sends head updates to sync service and also sends new changes to update listener // SyncTree sends head updates to sync service and also sends new changes to update listener
type syncTree struct { type syncTree struct {
tree.ObjectTree tree.ObjectTree
@ -35,6 +40,7 @@ type syncTree struct {
syncClient SyncClient syncClient SyncClient
notifiable diffservice.HeadNotifiable notifiable diffservice.HeadNotifiable
listener updatelistener.UpdateListener listener updatelistener.UpdateListener
usageController TreeUsageController
isClosed bool isClosed bool
isDeleted bool isDeleted bool
} }
@ -55,6 +61,7 @@ type CreateDeps struct {
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
AclList list.ACLList AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeUsageController TreeUsageController
} }
type BuildDeps struct { type BuildDeps struct {
@ -66,6 +73,7 @@ type BuildDeps struct {
AclList list.ACLList AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage TreeStorage storage.TreeStorage
TreeUsageController TreeUsageController
} }
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) { func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error) {
@ -82,6 +90,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -92,6 +101,9 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -112,6 +124,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -123,6 +136,9 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t SyncTree, err error
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncClient.CreateHeadUpdate(t, nil) headUpdate := syncClient.CreateHeadUpdate(t, nil)
err = syncClient.BroadcastAsync(headUpdate) err = syncClient.BroadcastAsync(headUpdate)
@ -207,6 +223,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
ObjectTree: objTree, ObjectTree: objTree,
syncClient: syncClient, syncClient: syncClient,
notifiable: deps.HeadNotifiable, notifiable: deps.HeadNotifiable,
usageController: deps.TreeUsageController,
listener: deps.Listener, listener: deps.Listener,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient) syncHandler := newSyncTreeHandler(syncTree, syncClient)
@ -217,6 +234,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if syncTree.listener != nil { if syncTree.listener != nil {
syncTree.listener.Rebuild(syncTree) syncTree.listener.Rebuild(syncTree)
} }
if syncTree.usageController != nil {
syncTree.usageController.StartTree()
}
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// here we will have different behaviour based on who is sending this update // here we will have different behaviour based on who is sending this update
@ -310,6 +330,9 @@ func (s *syncTree) Close() (err error) {
if s.isClosed { if s.isClosed {
return ErrSyncTreeClosed return ErrSyncTreeClosed
} }
if s.usageController != nil {
s.usageController.CloseTree()
}
s.isClosed = true s.isClosed = true
return return
} }