From 73774d99c22dd0880a1fdb2c259f16ae2b964a49 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 22 Dec 2022 14:50:20 +0300 Subject: [PATCH] files: fix syncer loop, check ctx.Done --- .../badgerfilestore/badgerfilestore.go | 21 +++++++++++- client/filestorage/proxystore.go | 24 ++++++++------ client/filestorage/proxystore_test.go | 12 +++++++ client/filestorage/rpcstore/client.go | 18 +++++++++-- client/filestorage/rpcstore/store.go | 18 ++++++++--- client/filestorage/syncer.go | 32 ++++++++++++------- .../fileblockstore/fileblockstore.go | 1 + 7 files changed, 96 insertions(+), 30 deletions(-) diff --git a/client/filestorage/badgerfilestore/badgerfilestore.go b/client/filestorage/badgerfilestore/badgerfilestore.go index 7860841a..3a693020 100644 --- a/client/filestorage/badgerfilestore/badgerfilestore.go +++ b/client/filestorage/badgerfilestore/badgerfilestore.go @@ -79,8 +79,27 @@ func (f *badgerStorage) Delete(ctx context.Context, c cid.Cid) error { func (f *badgerStorage) ExistsCids(ctx context.Context, ks []cid.Cid) (exists []cid.Cid, err error) { err = f.db.View(func(txn *badger.Txn) error { for _, k := range ks { - if _, e := txn.Get(key(k)); e == nil { + _, e := txn.Get(key(k)) + if e == nil { exists = append(exists, k) + } else if e != badger.ErrKeyNotFound { + return e + } + } + return nil + }) + return +} + +func (f *badgerStorage) NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) { + notExists = bs[:0] + err = f.db.View(func(txn *badger.Txn) error { + for _, b := range bs { + _, e := txn.Get(key(b.Cid())) + if e == badger.ErrKeyNotFound { + notExists = append(notExists, b) + } else if e != nil { + return e } } return nil diff --git a/client/filestorage/proxystore.go b/client/filestorage/proxystore.go index ad726288..4a9a3173 100644 --- a/client/filestorage/proxystore.go +++ b/client/filestorage/proxystore.go @@ -95,26 +95,32 @@ func (c *proxyStore) GetMany(ctx context.Context, ks []cid.Cid) <-chan blocks.Bl return results } -func (c *proxyStore) Add(ctx context.Context, bs []blocks.Block) error { +func (c *proxyStore) Add(ctx context.Context, bs []blocks.Block) (err error) { + if bs, err = c.cache.NotExistsBlocks(ctx, bs); err != nil { + return + } + if len(bs) == 0 { + return nil + } + if err = c.cache.Add(ctx, bs); err != nil { + return + } indexCids := badgerfilestore.NewCids() defer indexCids.Release() for _, b := range bs { indexCids.Add(fileblockstore.CtxGetSpaceId(ctx), badgerfilestore.OpAdd, b.Cid()) } - if err := c.index.Add(indexCids); err != nil { - return err - } - return c.cache.Add(ctx, bs) + return c.index.Add(indexCids) } func (c *proxyStore) Delete(ctx context.Context, k cid.Cid) error { + if err := c.cache.Delete(ctx, k); err != nil { + return err + } indexCids := badgerfilestore.NewCids() defer indexCids.Release() indexCids.Add(fileblockstore.CtxGetSpaceId(ctx), badgerfilestore.OpDelete, k) - if err := c.index.Add(indexCids); err != nil { - return err - } - return c.cache.Delete(ctx, k) + return c.index.Add(indexCids) } func (c *proxyStore) Close() (err error) { diff --git a/client/filestorage/proxystore_test.go b/client/filestorage/proxystore_test.go index c33c86bd..7ef77911 100644 --- a/client/filestorage/proxystore_test.go +++ b/client/filestorage/proxystore_test.go @@ -160,6 +160,18 @@ type testStore struct { mu sync.Mutex } +func (t *testStore) NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) { + t.mu.Lock() + defer t.mu.Unlock() + notExists = bs[:0] + for _, b := range bs { + if _, ok := t.store[b.Cid().String()]; !ok { + notExists = append(notExists, b) + } + } + return +} + func (t *testStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) { t.mu.Lock() defer t.mu.Unlock() diff --git a/client/filestorage/rpcstore/client.go b/client/filestorage/rpcstore/client.go index 0b0327e6..76776ee5 100644 --- a/client/filestorage/rpcstore/client.go +++ b/client/filestorage/rpcstore/client.go @@ -126,7 +126,11 @@ func (c *client) put(ctx context.Context, t *task) (err error) { return rpcerr.Unwrap(err) } log.Debug("put cid", zap.String("cid", t.cid.String())) - t.ready <- t + select { + case t.ready <- t: + case <-ctx.Done(): + return ctx.Err() + } c.stat.Add(st, len(t.data)) return } @@ -184,7 +188,11 @@ func (c *client) readStream(stream fileproto.DRPCFile_GetBlocksClient) { if err != nil { log.Warn("cid receive error", zap.Error(err)) } else { - t.ready <- t + select { + case t.ready <- t: + case <-t.ctx.Done(): + } + } } } @@ -269,7 +277,11 @@ func (c *client) Close() error { c.waitCIDMu.Lock() for id, t := range c.waitCIDs { t.err = ErrClientClosed - t.ready <- t + select { + case t.ready <- t: + case <-t.ctx.Done(): + } + delete(c.waitCIDs, id) } c.waitCIDMu.Unlock() diff --git a/client/filestorage/rpcstore/store.go b/client/filestorage/rpcstore/store.go index 89cf2592..70cf2cee 100644 --- a/client/filestorage/rpcstore/store.go +++ b/client/filestorage/rpcstore/store.go @@ -77,9 +77,13 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error { } var errs []error for i := 0; i < len(tasks); i++ { - t := <-readyCh - if t.err != nil { - errs = append(errs, t.err) + select { + case t := <-readyCh: + if t.err != nil { + errs = append(errs, t.err) + } + case <-ctx.Done(): + return ctx.Err() } } return multierr.Combine(errs...) @@ -90,8 +94,12 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error { if err := s.cm.Add(ctx, t); err != nil { return err } - <-t.ready - return t.err + select { + case t := <-t.ready: + return t.err + case <-ctx.Done(): + return ctx.Err() + } } func (s *store) Close() (err error) { diff --git a/client/filestorage/syncer.go b/client/filestorage/syncer.go index ee4d2d23..30ff9d15 100644 --- a/client/filestorage/syncer.go +++ b/client/filestorage/syncer.go @@ -7,6 +7,7 @@ import ( blocks "github.com/ipfs/go-block-format" "go.uber.org/zap" "sync" + "sync/atomic" "time" ) @@ -31,14 +32,14 @@ func (s *syncer) run(ctx context.Context) { } } -func (s *syncer) sync(ctx context.Context) (l int) { +func (s *syncer) sync(ctx context.Context) (doneCount int32) { cids, err := s.ps.index.List(syncerOpBatch) if err != nil { log.Error("index list error", zap.Error(err)) return } defer cids.Release() - l = cids.Len() + l := cids.Len() log.Debug("remote file sync, got tasks to sync", zap.Int("count", l)) if l == 0 { return @@ -47,34 +48,35 @@ func (s *syncer) sync(ctx context.Context) (l int) { ctx, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() var wg sync.WaitGroup + var doneAtomic atomic.Int32 for _, sOps := range cids.SpaceOps { if len(sOps.Load) > 0 { wg.Add(1) go func(opt badgerfilestore.SpaceCidOps) { defer wg.Done() - s.load(ctx, opt) + doneAtomic.Add(s.load(ctx, opt)) }(sOps) } if len(sOps.Delete) > 0 { wg.Add(1) go func(opt badgerfilestore.SpaceCidOps) { defer wg.Done() - s.delete(ctx, opt) + doneAtomic.Add(s.delete(ctx, opt)) }(sOps) } if len(sOps.Add) > 0 { wg.Add(1) go func(opt badgerfilestore.SpaceCidOps) { defer wg.Done() - s.add(ctx, opt) + doneAtomic.Add(s.add(ctx, opt)) }(sOps) } } wg.Wait() - return + return doneAtomic.Load() } -func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) { +func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) { ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) res := s.ps.origin.GetMany(ctx, spaceOps.Load) doneCids := badgerfilestore.NewCids() @@ -90,10 +92,12 @@ func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) log.Error("syncer: index.Done error", zap.Error(err)) return } - log.Info("successfully loaded cids", zap.Int("count", doneCids.Len())) + doneCount = int32(doneCids.Len()) + log.Info("successfully loaded cids", zap.Int32("count", doneCount)) + return } -func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) { +func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) { doneCids := badgerfilestore.NewCids() defer doneCids.Release() res := s.ps.cache.GetMany(ctx, spaceOps.Add) @@ -112,10 +116,12 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) log.Error("syncer: index.Done error", zap.Error(err)) return } - log.Info("successfully added cids", zap.Int("count", doneCids.Len()), zap.Stringers("cids", doneCids.SpaceOps[0].Add)) + doneCount = int32(doneCids.Len()) + log.Info("successfully added cids", zap.Int32("count", doneCount), zap.Stringers("cids", doneCids.SpaceOps[0].Add)) + return } -func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) { +func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) { doneCids := badgerfilestore.NewCids() defer doneCids.Release() ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) @@ -129,5 +135,7 @@ func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOp if err := s.ps.index.Done(doneCids); err != nil { log.Error("syncer: index.Done error", zap.Error(err)) } - log.Info("successfully removed cids", zap.Int("count", doneCids.Len())) + doneCount = int32(doneCids.Len()) + log.Info("successfully removed cids", zap.Int32("count", doneCount)) + return } diff --git a/common/commonfile/fileblockstore/fileblockstore.go b/common/commonfile/fileblockstore/fileblockstore.go index 0eb08a68..9cfa522e 100644 --- a/common/commonfile/fileblockstore/fileblockstore.go +++ b/common/commonfile/fileblockstore/fileblockstore.go @@ -33,6 +33,7 @@ type BlockStore interface { type BlockStoreLocal interface { BlockStore ExistsCids(ctx context.Context, ks []cid.Cid) (exists []cid.Cid, err error) + NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) } type BlockStoreSpaceIds interface {