From ff2baaeb79e9537a3fa0305ffde384a6275eacc9 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 14 Oct 2022 13:32:40 +0200 Subject: [PATCH] Fix keys and remove close waiter --- node/nodespace/service.go | 8 ++--- node/storage/keys.go | 30 +++++++++++++++--- node/storage/spacestorage.go | 1 - pkg/ocache/closewaiter.go | 60 ------------------------------------ 4 files changed, 27 insertions(+), 72 deletions(-) delete mode 100644 pkg/ocache/closewaiter.go diff --git a/node/nodespace/service.go b/node/nodespace/service.go index 2bb60781..0920ef42 100644 --- a/node/nodespace/service.go +++ b/node/nodespace/service.go @@ -29,21 +29,17 @@ type Service interface { type service struct { conf config.Space spaceCache ocache.OCache - closeWaiter *ocache.CloseWaiter commonSpace commonspace.Service spaceStorageProvider storage.SpaceStorageProvider } func (s *service) Init(a *app.App) (err error) { - s.closeWaiter = ocache.NewCloseWaiter(func(ctx context.Context, id string) (value ocache.Object, err error) { - return s.commonSpace.GetSpace(ctx, id) - }) s.conf = a.MustComponent(config.CName).(*config.Config).Space s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.Service) s.spaceStorageProvider = a.MustComponent(storage.CName).(storage.SpaceStorageProvider) s.spaceCache = ocache.New( func(ctx context.Context, id string) (value ocache.Object, err error) { - return s.closeWaiter.Load(ctx, id) + return s.commonSpace.GetSpace(ctx, id) }, ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(time.Minute), @@ -70,7 +66,7 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e if err != nil { return nil, err } - return v.(*ocache.CloseWrapper).Value.(commonspace.Space), nil + return v.(commonspace.Space), nil } func (s *service) Close(ctx context.Context) (err error) { diff --git a/node/storage/keys.go b/node/storage/keys.go index b3782d8b..0d4b8de5 100644 --- a/node/storage/keys.go +++ b/node/storage/keys.go @@ -1,7 +1,7 @@ package storage import ( - "fmt" + "bytes" "strings" ) @@ -10,15 +10,15 @@ type treeKeys struct { } func (t treeKeys) HeadsKey() []byte { - return []byte(fmt.Sprintf("t/%s/heads", t.id)) + return joinStringsToBytes("t", t.id, "heads") } func (t treeKeys) RootKey() []byte { - return []byte(fmt.Sprintf("t/%s", t.id)) + return joinStringsToBytes("t", t.id) } func (t treeKeys) RawChangeKey(id string) []byte { - return []byte(fmt.Sprintf("t/%s/%s", t.id, id)) + return joinStringsToBytes("t", t.id, id) } type spaceKeys struct { @@ -36,5 +36,25 @@ func (s spaceKeys) ACLKey() []byte { } func isRootKey(key string) bool { - return strings.HasPrefix(key, "t/") && len(strings.Split(key, "/")) == 2 + return strings.HasPrefix(key, "t/") && strings.Count(key, "/") == 2 +} + +func joinStringsToBytes(strs ...string) []byte { + var ( + b bytes.Buffer + totalLen int + ) + for _, s := range strs { + totalLen += len(s) + } + // adding separators + totalLen += len(strs) - 1 + b.Grow(totalLen) + for idx, s := range strs { + if idx > 0 { + b.WriteString("/") + } + b.WriteString(s) + } + return b.Bytes() } diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index 4f0d0a49..c2faab6c 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -54,7 +54,6 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS } func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreatePayload) (store spacestorage.SpaceStorage, err error) { - // TODO: add payload verification dbPath := path.Join(rootPath, payload.SpaceHeaderWithId.Id) db, err := pogreb.Open(dbPath, defPogrebOptions) if err != nil { diff --git a/pkg/ocache/closewaiter.go b/pkg/ocache/closewaiter.go deleted file mode 100644 index 52043c70..00000000 --- a/pkg/ocache/closewaiter.go +++ /dev/null @@ -1,60 +0,0 @@ -package ocache - -import ( - "context" - "sync" -) - -type CloseWrapper struct { - Value Object - ch chan struct{} -} - -func (c *CloseWrapper) Close() (err error) { - err = c.Value.Close() - close(c.ch) - return err -} - -type CloseWaiter struct { - load func(ctx context.Context, id string) (value Object, err error) - - mx sync.Mutex - closeMap map[string]chan struct{} -} - -func NewCloseWaiter(load func(ctx context.Context, id string) (value Object, err error)) *CloseWaiter { - return &CloseWaiter{ - load: load, - closeMap: make(map[string]chan struct{}), - } -} - -func (l *CloseWaiter) Load(ctx context.Context, id string) (value Object, err error) { - // this uses assumption of ocache, that for each id load function cannot be called simultaneously - var ch chan struct{} - l.mx.Lock() - if c, exists := l.closeMap[id]; exists { - ch = c - } - l.mx.Unlock() - if ch != nil { - <-ch - } - - value, err = l.load(ctx, id) - if err != nil { - return - } - - ch = make(chan struct{}) - l.mx.Lock() - l.closeMap[id] = ch - l.mx.Unlock() - - value = &CloseWrapper{ - Value: value, - ch: ch, - } - return -}