diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 8111895a..aa255f51 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -146,5 +146,6 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene func (s *space) Close() error { s.diffService.Close() - return s.syncService.Close() + s.syncService.Close() + return s.storage.Close() } diff --git a/common/commonspace/storage/storage.go b/common/commonspace/storage/storage.go index 0dc9df99..eada36f7 100644 --- a/common/commonspace/storage/storage.go +++ b/common/commonspace/storage/storage.go @@ -19,6 +19,7 @@ type SpaceStorage interface { ACLStorage() (storage.ListStorage, error) SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error) StoredIds() ([]string, error) + Close() error } type SpaceStorageCreatePayload struct { diff --git a/node/nodespace/service.go b/node/nodespace/service.go index 0920ef42..2bb60781 100644 --- a/node/nodespace/service.go +++ b/node/nodespace/service.go @@ -29,17 +29,21 @@ type Service interface { type service struct { conf config.Space spaceCache ocache.OCache + closeWaiter *ocache.CloseWaiter commonSpace commonspace.Service spaceStorageProvider storage.SpaceStorageProvider } func (s *service) Init(a *app.App) (err error) { + s.closeWaiter = ocache.NewCloseWaiter(func(ctx context.Context, id string) (value ocache.Object, err error) { + return s.commonSpace.GetSpace(ctx, id) + }) s.conf = a.MustComponent(config.CName).(*config.Config).Space s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service) s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider) s.spaceCache = ocache.New( func(ctx context.Context, id string) (value ocache.Object, err error) { - return s.commonSpace.GetSpace(ctx, id) + return s.closeWaiter.Load(ctx, id) }, ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(time.Minute), @@ -66,7 +70,7 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e if err != nil { return nil, err } - return v.(commonspace.Space), nil + return v.(*ocache.CloseWrapper).Value.(commonspace.Space), nil } func (s *service) Close(ctx context.Context) (err error) { diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index 52a4b52f..4f0d0a49 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -8,8 +8,13 @@ import ( "github.com/gogo/protobuf/proto" "path" "sync" + "time" ) +var defPogrebOptions = &pogreb.Options{ + BackgroundCompactionInterval: time.Minute * 5, +} + type spaceStorage struct { objDb *pogreb.DB keys spaceKeys @@ -18,7 +23,7 @@ type spaceStorage struct { func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) { dbPath := path.Join(rootPath, spaceId) - objDb, err := pogreb.Open(dbPath, nil) + objDb, err := pogreb.Open(dbPath, defPogrebOptions) if err != nil { return } @@ -51,11 +56,17 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) { // TODO: add payload verification dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id) - db, err := pogreb.Open(dbPath, nil) + db, err := pogreb.Open(dbPath, defPogrebOptions) if err != nil { return } + defer func() { + if err != nil { + db.Close() + } + }() + keys := spaceKeys{} has, err := db.Has(keys.HeaderKey()) if err != nil { @@ -145,3 +156,7 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) { err = nil return } + +func (s *spaceStorage) Close() (err error) { + return s.objDb.Close() +} diff --git a/pkg/ocache/closewaiter.go b/pkg/ocache/closewaiter.go new file mode 100644 index 00000000..52043c70 --- /dev/null +++ b/pkg/ocache/closewaiter.go @@ -0,0 +1,60 @@ +package ocache + +import ( + "context" + "sync" +) + +type CloseWrapper struct { + Value Object + ch chan struct{} +} + +func (c *CloseWrapper) Close() (err error) { + err = c.Value.Close() + close(c.ch) + return err +} + +type CloseWaiter struct { + load func(ctx context.Context, id string) (value Object, err error) + + mx sync.Mutex + closeMap map[string]chan struct{} +} + +func NewCloseWaiter(load func(ctx context.Context, id string) (value Object, err error)) *CloseWaiter { + return &CloseWaiter{ + load: load, + closeMap: make(map[string]chan struct{}), + } +} + +func (l *CloseWaiter) Load(ctx context.Context, id string) (value Object, err error) { + // this uses assumption of ocache, that for each id load function cannot be called simultaneously + var ch chan struct{} + l.mx.Lock() + if c, exists := l.closeMap[id]; exists { + ch = c + } + l.mx.Unlock() + if ch != nil { + <-ch + } + + value, err = l.load(ctx, id) + if err != nil { + return + } + + ch = make(chan struct{}) + l.mx.Lock() + l.closeMap[id] = ch + l.mx.Unlock() + + value = &CloseWrapper{ + Value: value, + ch: ch, + } + return +}