Fix keys and remove close waiter
This commit is contained in:
parent
e0b01e2672
commit
ff2baaeb79
@ -29,21 +29,17 @@ type Service interface {
|
|||||||
type service struct {
|
type service struct {
|
||||||
conf config.Space
|
conf config.Space
|
||||||
spaceCache ocache.OCache
|
spaceCache ocache.OCache
|
||||||
closeWaiter *ocache.CloseWaiter
|
|
||||||
commonSpace commonspace.Service
|
commonSpace commonspace.Service
|
||||||
spaceStorageProvider storage.SpaceStorageProvider
|
spaceStorageProvider storage.SpaceStorageProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Init(a *app.App) (err error) {
|
func (s *service) Init(a *app.App) (err error) {
|
||||||
s.closeWaiter = ocache.NewCloseWaiter(func(ctx context.Context, id string) (value ocache.Object, err error) {
|
|
||||||
return s.commonSpace.GetSpace(ctx, id)
|
|
||||||
})
|
|
||||||
s.conf = a.MustComponent(config.CName).(*config.Config).Space
|
s.conf = a.MustComponent(config.CName).(*config.Config).Space
|
||||||
s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service)
|
s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service)
|
||||||
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
||||||
s.spaceCache = ocache.New(
|
s.spaceCache = ocache.New(
|
||||||
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
return s.closeWaiter.Load(ctx, id)
|
return s.commonSpace.GetSpace(ctx, id)
|
||||||
},
|
},
|
||||||
ocache.WithLogger(log.Sugar()),
|
ocache.WithLogger(log.Sugar()),
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute),
|
||||||
@ -70,7 +66,7 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return v.(*ocache.CloseWrapper).Value.(commonspace.Space), nil
|
return v.(commonspace.Space), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Close(ctx context.Context) (err error) {
|
func (s *service) Close(ctx context.Context) (err error) {
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"bytes"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -10,15 +10,15 @@ type treeKeys struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t treeKeys) HeadsKey() []byte {
|
func (t treeKeys) HeadsKey() []byte {
|
||||||
return []byte(fmt.Sprintf("t/%s/heads", t.id))
|
return joinStringsToBytes("t", t.id, "heads")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t treeKeys) RootKey() []byte {
|
func (t treeKeys) RootKey() []byte {
|
||||||
return []byte(fmt.Sprintf("t/%s", t.id))
|
return joinStringsToBytes("t", t.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t treeKeys) RawChangeKey(id string) []byte {
|
func (t treeKeys) RawChangeKey(id string) []byte {
|
||||||
return []byte(fmt.Sprintf("t/%s/%s", t.id, id))
|
return joinStringsToBytes("t", t.id, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
type spaceKeys struct {
|
type spaceKeys struct {
|
||||||
@ -36,5 +36,25 @@ func (s spaceKeys) ACLKey() []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func isRootKey(key string) bool {
|
func isRootKey(key string) bool {
|
||||||
return strings.HasPrefix(key, "t/") && len(strings.Split(key, "/")) == 2
|
return strings.HasPrefix(key, "t/") && strings.Count(key, "/") == 2
|
||||||
|
}
|
||||||
|
|
||||||
|
func joinStringsToBytes(strs ...string) []byte {
|
||||||
|
var (
|
||||||
|
b bytes.Buffer
|
||||||
|
totalLen int
|
||||||
|
)
|
||||||
|
for _, s := range strs {
|
||||||
|
totalLen += len(s)
|
||||||
|
}
|
||||||
|
// adding separators
|
||||||
|
totalLen += len(strs) - 1
|
||||||
|
b.Grow(totalLen)
|
||||||
|
for idx, s := range strs {
|
||||||
|
if idx > 0 {
|
||||||
|
b.WriteString("/")
|
||||||
|
}
|
||||||
|
b.WriteString(s)
|
||||||
|
}
|
||||||
|
return b.Bytes()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -54,7 +54,6 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) {
|
func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) {
|
||||||
// TODO: add payload verification
|
|
||||||
dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id)
|
dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id)
|
||||||
db, err := pogreb.Open(dbPath, defPogrebOptions)
|
db, err := pogreb.Open(dbPath, defPogrebOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -1,60 +0,0 @@
|
|||||||
package ocache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type CloseWrapper struct {
|
|
||||||
Value Object
|
|
||||||
ch chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CloseWrapper) Close() (err error) {
|
|
||||||
err = c.Value.Close()
|
|
||||||
close(c.ch)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
type CloseWaiter struct {
|
|
||||||
load func(ctx context.Context, id string) (value Object, err error)
|
|
||||||
|
|
||||||
mx sync.Mutex
|
|
||||||
closeMap map[string]chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCloseWaiter(load func(ctx context.Context, id string) (value Object, err error)) *CloseWaiter {
|
|
||||||
return &CloseWaiter{
|
|
||||||
load: load,
|
|
||||||
closeMap: make(map[string]chan struct{}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *CloseWaiter) Load(ctx context.Context, id string) (value Object, err error) {
|
|
||||||
// this uses assumption of ocache, that for each id load function cannot be called simultaneously
|
|
||||||
var ch chan struct{}
|
|
||||||
l.mx.Lock()
|
|
||||||
if c, exists := l.closeMap[id]; exists {
|
|
||||||
ch = c
|
|
||||||
}
|
|
||||||
l.mx.Unlock()
|
|
||||||
if ch != nil {
|
|
||||||
<-ch
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err = l.load(ctx, id)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ch = make(chan struct{})
|
|
||||||
l.mx.Lock()
|
|
||||||
l.closeMap[id] = ch
|
|
||||||
l.mx.Unlock()
|
|
||||||
|
|
||||||
value = &CloseWrapper{
|
|
||||||
Value: value,
|
|
||||||
ch: ch,
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user