Change cache logic
This commit is contained in:
parent
533880e9f1
commit
131f4a6968
7
common/commonspace/cache/treecache.go
vendored
7
common/commonspace/cache/treecache.go
vendored
@ -3,6 +3,7 @@ package cache
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
||||||
)
|
)
|
||||||
@ -23,8 +24,6 @@ type TreeResult struct {
|
|||||||
type BuildFunc = func(ctx context.Context, id string, listener synctree.UpdateListener) (tree.ObjectTree, error)
|
type BuildFunc = func(ctx context.Context, id string, listener synctree.UpdateListener) (tree.ObjectTree, error)
|
||||||
|
|
||||||
type TreeCache interface {
|
type TreeCache interface {
|
||||||
GetTree(ctx context.Context, id string) (TreeResult, error)
|
app.ComponentRunnable
|
||||||
SetBuildFunc(f BuildFunc)
|
GetTree(ctx context.Context, spaceId, treeId string) (TreeResult, error)
|
||||||
|
|
||||||
Close() error
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -119,7 +119,7 @@ func (d *diffService) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
|
|
||||||
func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) {
|
func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) {
|
||||||
for _, tId := range trees {
|
for _, tId := range trees {
|
||||||
_, _ = d.cache.GetTree(ctx, tId)
|
_, _ = d.cache.GetTree(ctx, d.spaceId, tId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -30,7 +30,7 @@ func New() Service {
|
|||||||
type Service interface {
|
type Service interface {
|
||||||
CreateSpace(ctx context.Context, cache cache.TreeCache, payload SpaceCreatePayload) (Space, error)
|
CreateSpace(ctx context.Context, cache cache.TreeCache, payload SpaceCreatePayload) (Space, error)
|
||||||
DeriveSpace(ctx context.Context, cache cache.TreeCache, payload SpaceDerivePayload) (Space, error)
|
DeriveSpace(ctx context.Context, cache cache.TreeCache, payload SpaceDerivePayload) (Space, error)
|
||||||
GetSpace(ctx context.Context, id string, cache cache.TreeCache) (sp Space, err error)
|
GetSpace(ctx context.Context, id string) (sp Space, err error)
|
||||||
app.Component
|
app.Component
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,12 +38,14 @@ type service struct {
|
|||||||
config config.Space
|
config config.Space
|
||||||
configurationService nodeconf.Service
|
configurationService nodeconf.Service
|
||||||
storageProvider storage.SpaceStorageProvider
|
storageProvider storage.SpaceStorageProvider
|
||||||
|
cache cache.TreeCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Init(a *app.App) (err error) {
|
func (s *service) Init(a *app.App) (err error) {
|
||||||
s.config = a.MustComponent(config.CName).(*config.Config).Space
|
s.config = a.MustComponent(config.CName).(*config.Config).Space
|
||||||
s.storageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
s.storageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
||||||
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
||||||
|
s.cache = a.MustComponent(cache.CName).(cache.TreeCache)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,7 +129,7 @@ func (s *service) CreateSpace(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.GetSpace(ctx, spaceId, cache)
|
return s.GetSpace(ctx, spaceId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) DeriveSpace(
|
func (s *service) DeriveSpace(
|
||||||
@ -219,22 +221,22 @@ func (s *service) DeriveSpace(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.GetSpace(ctx, spaceId, cache)
|
return s.GetSpace(ctx, spaceId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) GetSpace(ctx context.Context, id string, cache cache.TreeCache) (Space, error) {
|
func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {
|
||||||
st, err := s.storageProvider.SpaceStorage(id)
|
st, err := s.storageProvider.SpaceStorage(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
lastConfiguration := s.configurationService.GetLast()
|
lastConfiguration := s.configurationService.GetLast()
|
||||||
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, lastConfiguration, cache, log)
|
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, lastConfiguration, s.cache, log)
|
||||||
syncService := syncservice.NewSyncService(id, diffService, cache, lastConfiguration)
|
syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration)
|
||||||
sp := &space{
|
sp := &space{
|
||||||
id: id,
|
id: id,
|
||||||
syncService: syncService,
|
syncService: syncService,
|
||||||
diffService: diffService,
|
diffService: diffService,
|
||||||
cache: cache,
|
cache: s.cache,
|
||||||
storage: st,
|
storage: st,
|
||||||
}
|
}
|
||||||
if err := sp.Init(ctx); err != nil {
|
if err := sp.Init(ctx); err != nil {
|
||||||
|
|||||||
@ -73,8 +73,6 @@ func (s *space) Init(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
s.diffService.Init(initialIds)
|
s.diffService.Init(initialIds)
|
||||||
s.syncService.Init()
|
s.syncService.Init()
|
||||||
// basically this provides access for the external cache to use space's tree building functions
|
|
||||||
s.cache.SetBuildFunc(s.BuildTree)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,6 +146,5 @@ func (s *space) BuildTree(ctx context.Context, id string, listener synctree.Upda
|
|||||||
|
|
||||||
func (s *space) Close() error {
|
func (s *space) Close() error {
|
||||||
s.diffService.Close()
|
s.diffService.Close()
|
||||||
s.cache.Close()
|
|
||||||
return s.syncService.Close()
|
return s.syncService.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type syncHandler struct {
|
type syncHandler struct {
|
||||||
|
spaceId string
|
||||||
treeCache cache.TreeCache
|
treeCache cache.TreeCache
|
||||||
syncClient SyncClient
|
syncClient SyncClient
|
||||||
}
|
}
|
||||||
@ -18,8 +19,9 @@ type SyncHandler interface {
|
|||||||
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
|
HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
|
func newSyncHandler(spaceId string, treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
|
||||||
return &syncHandler{
|
return &syncHandler{
|
||||||
|
spaceId: spaceId,
|
||||||
treeCache: treeCache,
|
treeCache: treeCache,
|
||||||
syncClient: syncClient,
|
syncClient: syncClient,
|
||||||
}
|
}
|
||||||
@ -48,7 +50,7 @@ func (s *syncHandler) HandleHeadUpdate(
|
|||||||
fullRequest *spacesyncproto.ObjectFullSyncRequest
|
fullRequest *spacesyncproto.ObjectFullSyncRequest
|
||||||
result tree.AddResult
|
result tree.AddResult
|
||||||
)
|
)
|
||||||
res, err := s.treeCache.GetTree(ctx, msg.TreeId)
|
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -100,7 +102,7 @@ func (s *syncHandler) HandleFullSyncRequest(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
res, err := s.treeCache.GetTree(ctx, msg.TreeId)
|
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -136,7 +138,7 @@ func (s *syncHandler) HandleFullSyncResponse(
|
|||||||
senderId string,
|
senderId string,
|
||||||
response *spacesyncproto.ObjectFullSyncResponse,
|
response *spacesyncproto.ObjectFullSyncResponse,
|
||||||
msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
res, err := s.treeCache.GetTree(ctx, msg.TreeId)
|
res, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,7 +49,7 @@ func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.T
|
|||||||
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
return syncHandler.HandleMessage(ctx, senderId, message)
|
return syncHandler.HandleMessage(ctx, senderId, message)
|
||||||
})
|
})
|
||||||
syncHandler = newSyncHandler(cache, streamPool)
|
syncHandler = newSyncHandler(spaceId, cache, streamPool)
|
||||||
return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration)
|
return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,8 +3,10 @@ package nodecache
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/node/nodespace"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -12,9 +14,45 @@ import (
|
|||||||
var log = logger.NewNamed("treecache")
|
var log = logger.NewNamed("treecache")
|
||||||
var ErrCacheObjectWithoutTree = errors.New("cache object contains no tree")
|
var ErrCacheObjectWithoutTree = errors.New("cache object contains no tree")
|
||||||
|
|
||||||
|
type ctxKey int
|
||||||
|
|
||||||
|
const spaceKey ctxKey = 0
|
||||||
|
|
||||||
type treeCache struct {
|
type treeCache struct {
|
||||||
gcttl int
|
gcttl int
|
||||||
cache ocache.OCache
|
cache ocache.OCache
|
||||||
|
nodeService nodespace.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeCache) Run(ctx context.Context) (err error) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeCache) Close(ctx context.Context) (err error) {
|
||||||
|
return c.cache.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeCache) Init(a *app.App) (err error) {
|
||||||
|
c.nodeService = a.MustComponent(nodespace.CName).(nodespace.Service)
|
||||||
|
c.cache = ocache.New(
|
||||||
|
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
|
spaceId := ctx.Value(spaceKey).(string)
|
||||||
|
space, err := c.nodeService.GetSpace(ctx, spaceId)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return space.BuildTree(ctx, id, nil)
|
||||||
|
},
|
||||||
|
ocache.WithLogger(log.Sugar()),
|
||||||
|
ocache.WithGCPeriod(time.Minute),
|
||||||
|
ocache.WithTTL(time.Duration(c.gcttl)*time.Second),
|
||||||
|
ocache.WithRefCounter(false),
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *treeCache) Name() (name string) {
|
||||||
|
return cache.CName
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNodeCache(ttl int) cache.TreeCache {
|
func NewNodeCache(ttl int) cache.TreeCache {
|
||||||
@ -23,24 +61,9 @@ func NewNodeCache(ttl int) cache.TreeCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *treeCache) SetBuildFunc(buildFunc cache.BuildFunc) {
|
func (c *treeCache) GetTree(ctx context.Context, spaceId, id string) (res cache.TreeResult, err error) {
|
||||||
c.cache = ocache.New(
|
|
||||||
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
|
||||||
return buildFunc(ctx, id, nil)
|
|
||||||
},
|
|
||||||
ocache.WithLogger(log.Sugar()),
|
|
||||||
ocache.WithGCPeriod(time.Minute),
|
|
||||||
ocache.WithTTL(time.Duration(c.gcttl)*time.Second),
|
|
||||||
ocache.WithRefCounter(false),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *treeCache) Close() (err error) {
|
|
||||||
return c.cache.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *treeCache) GetTree(ctx context.Context, id string) (res cache.TreeResult, err error) {
|
|
||||||
var cacheRes ocache.Object
|
var cacheRes ocache.Object
|
||||||
|
ctx = context.WithValue(ctx, spaceKey, spaceId)
|
||||||
cacheRes, err = c.cache.Get(ctx, id)
|
cacheRes, err = c.cache.Get(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cache.TreeResult{}, err
|
return cache.TreeResult{}, err
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/node/nodespace/nodecache"
|
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -40,7 +39,7 @@ func (s *service) Init(a *app.App) (err error) {
|
|||||||
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
||||||
s.spaceCache = ocache.New(
|
s.spaceCache = ocache.New(
|
||||||
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
return s.commonSpace.GetSpace(ctx, id, nodecache.NewNodeCache(s.conf.GCTTL))
|
return s.commonSpace.GetSpace(ctx, id)
|
||||||
},
|
},
|
||||||
ocache.WithLogger(log.Sugar()),
|
ocache.WithLogger(log.Sugar()),
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user