Add close waiter and close logic for db
This commit is contained in:
parent
933b1e3a4b
commit
003a8688d8
@ -146,5 +146,6 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
|
|||||||
|
|
||||||
func (s *space) Close() error {
|
func (s *space) Close() error {
|
||||||
s.diffService.Close()
|
s.diffService.Close()
|
||||||
return s.syncService.Close()
|
s.syncService.Close()
|
||||||
|
return s.storage.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,6 +19,7 @@ type SpaceStorage interface {
|
|||||||
ACLStorage() (storage.ListStorage, error)
|
ACLStorage() (storage.ListStorage, error)
|
||||||
SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error)
|
SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error)
|
||||||
StoredIds() ([]string, error)
|
StoredIds() ([]string, error)
|
||||||
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type SpaceStorageCreatePayload struct {
|
type SpaceStorageCreatePayload struct {
|
||||||
|
|||||||
@ -29,17 +29,21 @@ type Service interface {
|
|||||||
type service struct {
|
type service struct {
|
||||||
conf config.Space
|
conf config.Space
|
||||||
spaceCache ocache.OCache
|
spaceCache ocache.OCache
|
||||||
|
closeWaiter *ocache.CloseWaiter
|
||||||
commonSpace commonspace.Service
|
commonSpace commonspace.Service
|
||||||
spaceStorageProvider storage.SpaceStorageProvider
|
spaceStorageProvider storage.SpaceStorageProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Init(a *app.App) (err error) {
|
func (s *service) Init(a *app.App) (err error) {
|
||||||
|
s.closeWaiter = ocache.NewCloseWaiter(func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
|
return s.commonSpace.GetSpace(ctx, id)
|
||||||
|
})
|
||||||
s.conf = a.MustComponent(config.CName).(*config.Config).Space
|
s.conf = a.MustComponent(config.CName).(*config.Config).Space
|
||||||
s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service)
|
s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service)
|
||||||
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider)
|
||||||
s.spaceCache = ocache.New(
|
s.spaceCache = ocache.New(
|
||||||
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
return s.commonSpace.GetSpace(ctx, id)
|
return s.closeWaiter.Load(ctx, id)
|
||||||
},
|
},
|
||||||
ocache.WithLogger(log.Sugar()),
|
ocache.WithLogger(log.Sugar()),
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute),
|
||||||
@ -66,7 +70,7 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return v.(commonspace.Space), nil
|
return v.(*ocache.CloseWrapper).Value.(commonspace.Space), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Close(ctx context.Context) (err error) {
|
func (s *service) Close(ctx context.Context) (err error) {
|
||||||
|
|||||||
@ -8,8 +8,13 @@ import (
|
|||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var defPogrebOptions = &pogreb.Options{
|
||||||
|
BackgroundCompactionInterval: time.Minute * 5,
|
||||||
|
}
|
||||||
|
|
||||||
type spaceStorage struct {
|
type spaceStorage struct {
|
||||||
objDb *pogreb.DB
|
objDb *pogreb.DB
|
||||||
keys spaceKeys
|
keys spaceKeys
|
||||||
@ -18,7 +23,7 @@ type spaceStorage struct {
|
|||||||
|
|
||||||
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
|
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
|
||||||
dbPath := path.Join(rootPath, spaceId)
|
dbPath := path.Join(rootPath, spaceId)
|
||||||
objDb, err := pogreb.Open(dbPath, nil)
|
objDb, err := pogreb.Open(dbPath, defPogrebOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -51,11 +56,17 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
|
|||||||
func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) {
|
func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) {
|
||||||
// TODO: add payload verification
|
// TODO: add payload verification
|
||||||
dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id)
|
dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id)
|
||||||
db, err := pogreb.Open(dbPath, nil)
|
db, err := pogreb.Open(dbPath, defPogrebOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
keys := spaceKeys{}
|
keys := spaceKeys{}
|
||||||
has, err := db.Has(keys.HeaderKey())
|
has, err := db.Has(keys.HeaderKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -145,3 +156,7 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) {
|
|||||||
err = nil
|
err = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *spaceStorage) Close() (err error) {
|
||||||
|
return s.objDb.Close()
|
||||||
|
}
|
||||||
|
|||||||
60
pkg/ocache/closewaiter.go
Normal file
60
pkg/ocache/closewaiter.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package ocache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CloseWrapper struct {
|
||||||
|
Value Object
|
||||||
|
ch chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CloseWrapper) Close() (err error) {
|
||||||
|
err = c.Value.Close()
|
||||||
|
close(c.ch)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseWaiter struct {
|
||||||
|
load func(ctx context.Context, id string) (value Object, err error)
|
||||||
|
|
||||||
|
mx sync.Mutex
|
||||||
|
closeMap map[string]chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCloseWaiter(load func(ctx context.Context, id string) (value Object, err error)) *CloseWaiter {
|
||||||
|
return &CloseWaiter{
|
||||||
|
load: load,
|
||||||
|
closeMap: make(map[string]chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *CloseWaiter) Load(ctx context.Context, id string) (value Object, err error) {
|
||||||
|
// this uses assumption of ocache, that for each id load function cannot be called simultaneously
|
||||||
|
var ch chan struct{}
|
||||||
|
l.mx.Lock()
|
||||||
|
if c, exists := l.closeMap[id]; exists {
|
||||||
|
ch = c
|
||||||
|
}
|
||||||
|
l.mx.Unlock()
|
||||||
|
if ch != nil {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err = l.load(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ch = make(chan struct{})
|
||||||
|
l.mx.Lock()
|
||||||
|
l.closeMap[id] = ch
|
||||||
|
l.mx.Unlock()
|
||||||
|
|
||||||
|
value = &CloseWrapper{
|
||||||
|
Value: value,
|
||||||
|
ch: ch,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user