files: fix syncer loop, check ctx.Done
This commit is contained in:
parent
ec900d6e9d
commit
555ff804fe
@ -79,8 +79,27 @@ func (f *badgerStorage) Delete(ctx context.Context, c cid.Cid) error {
|
||||
func (f *badgerStorage) ExistsCids(ctx context.Context, ks []cid.Cid) (exists []cid.Cid, err error) {
|
||||
err = f.db.View(func(txn *badger.Txn) error {
|
||||
for _, k := range ks {
|
||||
if _, e := txn.Get(key(k)); e == nil {
|
||||
_, e := txn.Get(key(k))
|
||||
if e == nil {
|
||||
exists = append(exists, k)
|
||||
} else if e != badger.ErrKeyNotFound {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (f *badgerStorage) NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) {
|
||||
notExists = bs[:0]
|
||||
err = f.db.View(func(txn *badger.Txn) error {
|
||||
for _, b := range bs {
|
||||
_, e := txn.Get(key(b.Cid()))
|
||||
if e == badger.ErrKeyNotFound {
|
||||
notExists = append(notExists, b)
|
||||
} else if e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -95,26 +95,32 @@ func (c *proxyStore) GetMany(ctx context.Context, ks []cid.Cid) <-chan blocks.Bl
|
||||
return results
|
||||
}
|
||||
|
||||
func (c *proxyStore) Add(ctx context.Context, bs []blocks.Block) error {
|
||||
func (c *proxyStore) Add(ctx context.Context, bs []blocks.Block) (err error) {
|
||||
if bs, err = c.cache.NotExistsBlocks(ctx, bs); err != nil {
|
||||
return
|
||||
}
|
||||
if len(bs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err = c.cache.Add(ctx, bs); err != nil {
|
||||
return
|
||||
}
|
||||
indexCids := badgerfilestore.NewCids()
|
||||
defer indexCids.Release()
|
||||
for _, b := range bs {
|
||||
indexCids.Add(fileblockstore.CtxGetSpaceId(ctx), badgerfilestore.OpAdd, b.Cid())
|
||||
}
|
||||
if err := c.index.Add(indexCids); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.cache.Add(ctx, bs)
|
||||
return c.index.Add(indexCids)
|
||||
}
|
||||
|
||||
func (c *proxyStore) Delete(ctx context.Context, k cid.Cid) error {
|
||||
if err := c.cache.Delete(ctx, k); err != nil {
|
||||
return err
|
||||
}
|
||||
indexCids := badgerfilestore.NewCids()
|
||||
defer indexCids.Release()
|
||||
indexCids.Add(fileblockstore.CtxGetSpaceId(ctx), badgerfilestore.OpDelete, k)
|
||||
if err := c.index.Add(indexCids); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.cache.Delete(ctx, k)
|
||||
return c.index.Add(indexCids)
|
||||
}
|
||||
|
||||
func (c *proxyStore) Close() (err error) {
|
||||
|
||||
@ -160,6 +160,18 @@ type testStore struct {
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (t *testStore) NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
notExists = bs[:0]
|
||||
for _, b := range bs {
|
||||
if _, ok := t.store[b.Cid().String()]; !ok {
|
||||
notExists = append(notExists, b)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *testStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
@ -126,7 +126,11 @@ func (c *client) put(ctx context.Context, t *task) (err error) {
|
||||
return rpcerr.Unwrap(err)
|
||||
}
|
||||
log.Debug("put cid", zap.String("cid", t.cid.String()))
|
||||
t.ready <- t
|
||||
select {
|
||||
case t.ready <- t:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
c.stat.Add(st, len(t.data))
|
||||
return
|
||||
}
|
||||
@ -184,7 +188,11 @@ func (c *client) readStream(stream fileproto.DRPCFile_GetBlocksClient) {
|
||||
if err != nil {
|
||||
log.Warn("cid receive error", zap.Error(err))
|
||||
} else {
|
||||
t.ready <- t
|
||||
select {
|
||||
case t.ready <- t:
|
||||
case <-t.ctx.Done():
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -269,7 +277,11 @@ func (c *client) Close() error {
|
||||
c.waitCIDMu.Lock()
|
||||
for id, t := range c.waitCIDs {
|
||||
t.err = ErrClientClosed
|
||||
t.ready <- t
|
||||
select {
|
||||
case t.ready <- t:
|
||||
case <-t.ctx.Done():
|
||||
}
|
||||
|
||||
delete(c.waitCIDs, id)
|
||||
}
|
||||
c.waitCIDMu.Unlock()
|
||||
|
||||
@ -77,9 +77,13 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error {
|
||||
}
|
||||
var errs []error
|
||||
for i := 0; i < len(tasks); i++ {
|
||||
t := <-readyCh
|
||||
if t.err != nil {
|
||||
errs = append(errs, t.err)
|
||||
select {
|
||||
case t := <-readyCh:
|
||||
if t.err != nil {
|
||||
errs = append(errs, t.err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return multierr.Combine(errs...)
|
||||
@ -90,8 +94,12 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error {
|
||||
if err := s.cm.Add(ctx, t); err != nil {
|
||||
return err
|
||||
}
|
||||
<-t.ready
|
||||
return t.err
|
||||
select {
|
||||
case t := <-t.ready:
|
||||
return t.err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) Close() (err error) {
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -31,14 +32,14 @@ func (s *syncer) run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) sync(ctx context.Context) (l int) {
|
||||
func (s *syncer) sync(ctx context.Context) (doneCount int32) {
|
||||
cids, err := s.ps.index.List(syncerOpBatch)
|
||||
if err != nil {
|
||||
log.Error("index list error", zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer cids.Release()
|
||||
l = cids.Len()
|
||||
l := cids.Len()
|
||||
log.Debug("remote file sync, got tasks to sync", zap.Int("count", l))
|
||||
if l == 0 {
|
||||
return
|
||||
@ -47,34 +48,35 @@ func (s *syncer) sync(ctx context.Context) (l int) {
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
var doneAtomic atomic.Int32
|
||||
for _, sOps := range cids.SpaceOps {
|
||||
if len(sOps.Load) > 0 {
|
||||
wg.Add(1)
|
||||
go func(opt badgerfilestore.SpaceCidOps) {
|
||||
defer wg.Done()
|
||||
s.load(ctx, opt)
|
||||
doneAtomic.Add(s.load(ctx, opt))
|
||||
}(sOps)
|
||||
}
|
||||
if len(sOps.Delete) > 0 {
|
||||
wg.Add(1)
|
||||
go func(opt badgerfilestore.SpaceCidOps) {
|
||||
defer wg.Done()
|
||||
s.delete(ctx, opt)
|
||||
doneAtomic.Add(s.delete(ctx, opt))
|
||||
}(sOps)
|
||||
}
|
||||
if len(sOps.Add) > 0 {
|
||||
wg.Add(1)
|
||||
go func(opt badgerfilestore.SpaceCidOps) {
|
||||
defer wg.Done()
|
||||
s.add(ctx, opt)
|
||||
doneAtomic.Add(s.add(ctx, opt))
|
||||
}(sOps)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
return doneAtomic.Load()
|
||||
}
|
||||
|
||||
func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) {
|
||||
func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) {
|
||||
ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId)
|
||||
res := s.ps.origin.GetMany(ctx, spaceOps.Load)
|
||||
doneCids := badgerfilestore.NewCids()
|
||||
@ -90,10 +92,12 @@ func (s *syncer) load(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps)
|
||||
log.Error("syncer: index.Done error", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("successfully loaded cids", zap.Int("count", doneCids.Len()))
|
||||
doneCount = int32(doneCids.Len())
|
||||
log.Info("successfully loaded cids", zap.Int32("count", doneCount))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) {
|
||||
func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) {
|
||||
doneCids := badgerfilestore.NewCids()
|
||||
defer doneCids.Release()
|
||||
res := s.ps.cache.GetMany(ctx, spaceOps.Add)
|
||||
@ -112,10 +116,12 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps)
|
||||
log.Error("syncer: index.Done error", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("successfully added cids", zap.Int("count", doneCids.Len()), zap.Stringers("cids", doneCids.SpaceOps[0].Add))
|
||||
doneCount = int32(doneCids.Len())
|
||||
log.Info("successfully added cids", zap.Int32("count", doneCount), zap.Stringers("cids", doneCids.SpaceOps[0].Add))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) {
|
||||
func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) (doneCount int32) {
|
||||
doneCids := badgerfilestore.NewCids()
|
||||
defer doneCids.Release()
|
||||
ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId)
|
||||
@ -129,5 +135,7 @@ func (s *syncer) delete(ctx context.Context, spaceOps badgerfilestore.SpaceCidOp
|
||||
if err := s.ps.index.Done(doneCids); err != nil {
|
||||
log.Error("syncer: index.Done error", zap.Error(err))
|
||||
}
|
||||
log.Info("successfully removed cids", zap.Int("count", doneCids.Len()))
|
||||
doneCount = int32(doneCids.Len())
|
||||
log.Info("successfully removed cids", zap.Int32("count", doneCount))
|
||||
return
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ type BlockStore interface {
|
||||
type BlockStoreLocal interface {
|
||||
BlockStore
|
||||
ExistsCids(ctx context.Context, ks []cid.Cid) (exists []cid.Cid, err error)
|
||||
NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error)
|
||||
}
|
||||
|
||||
type BlockStoreSpaceIds interface {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user