From 9c1f0acf8b6874fb7e8deec359c113196275ac39 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Tue, 3 Jan 2023 15:08:07 +0300 Subject: [PATCH] files: rpcstore.AddAsync + lazy connect in clientmgr --- client/filestorage/proxystore.go | 3 +- client/filestorage/proxystore_test.go | 13 +++++ client/filestorage/rpcstore/clientmgr.go | 26 +++++++--- client/filestorage/rpcstore/service.go | 5 +- client/filestorage/rpcstore/store.go | 65 +++++++++++++++--------- client/filestorage/syncer.go | 23 ++------- 6 files changed, 83 insertions(+), 52 deletions(-) diff --git a/client/filestorage/proxystore.go b/client/filestorage/proxystore.go index 4a9a3173..a4b98b20 100644 --- a/client/filestorage/proxystore.go +++ b/client/filestorage/proxystore.go @@ -3,6 +3,7 @@ package filestorage import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" + "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -13,7 +14,7 @@ import ( type proxyStore struct { cache fileblockstore.BlockStoreLocal - origin fileblockstore.BlockStore + origin rpcstore.RpcStore index *badgerfilestore.FileBadgerIndex } diff --git a/client/filestorage/proxystore_test.go b/client/filestorage/proxystore_test.go index 7ef77911..3aef49fb 100644 --- a/client/filestorage/proxystore_test.go +++ b/client/filestorage/proxystore_test.go @@ -219,6 +219,19 @@ func (t *testStore) Add(ctx context.Context, bs []blocks.Block) error { return nil } +func (t *testStore) AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid) { + successCh = make(chan cid.Cid, len(bs)) + go func() { + defer close(successCh) + for _, b := range bs { + if err := t.Add(ctx, []blocks.Block{b}); err == nil { + successCh <- b.Cid() + } + } + }() + return successCh +} + func (t *testStore) Delete(ctx context.Context, c cid.Cid) error { t.mu.Lock() defer t.mu.Unlock() diff --git a/client/filestorage/rpcstore/clientmgr.go b/client/filestorage/rpcstore/clientmgr.go index 04131949..8e6157a3 100644 --- a/client/filestorage/rpcstore/clientmgr.go +++ b/client/filestorage/rpcstore/clientmgr.go @@ -32,7 +32,8 @@ func newClientManager(s *service) *clientManager { ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(0), ), - s: s, + checkPeersCh: make(chan struct{}), + s: s, } cm.ctx, cm.ctxCancel = context.WithCancel(context.Background()) go cm.checkPeerLoop() @@ -41,16 +42,27 @@ func newClientManager(s *service) *clientManager { // clientManager manages clients, removes unused ones, and adds new ones if necessary type clientManager struct { - mb *mb.MB[*task] - ctx context.Context - ctxCancel context.CancelFunc - ocache ocache.OCache + mb *mb.MB[*task] + ctx context.Context + ctxCancel context.CancelFunc + ocache ocache.OCache + checkPeersCh chan struct{} s *service mu sync.RWMutex } func (m *clientManager) Add(ctx context.Context, ts ...*task) (err error) { + defer func() { + m.mu.Lock() + if m.ocache.Len() == 0 { + select { + case m.checkPeersCh <- struct{}{}: + default: + } + } + m.mu.Unlock() + }() return m.mb.Add(ctx, ts...) } @@ -62,6 +74,8 @@ func (m *clientManager) checkPeerLoop() { select { case <-m.ctx.Done(): return + case <-m.checkPeersCh: + m.checkPeers() case <-ticker.C: m.checkPeers() } @@ -75,7 +89,7 @@ func (m *clientManager) checkPeers() { // reached connection limit, can't add new peers return } - if m.ocache.Len() != 0 && m.mb.Len() == 0 { + if m.mb.Len() == 0 { // has empty queue, no need new peers return } diff --git a/client/filestorage/rpcstore/service.go b/client/filestorage/rpcstore/service.go index 9a3803c8..f1be16d2 100644 --- a/client/filestorage/rpcstore/service.go +++ b/client/filestorage/rpcstore/service.go @@ -3,7 +3,6 @@ package rpcstore import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" ) @@ -17,7 +16,7 @@ func New() Service { } type Service interface { - NewStore() fileblockstore.BlockStore + NewStore() RpcStore app.Component } @@ -36,7 +35,7 @@ func (s *service) Name() (name string) { return CName } -func (s *service) NewStore() fileblockstore.BlockStore { +func (s *service) NewStore() RpcStore { cm := newClientManager(s) return &store{ s: s, diff --git a/client/filestorage/rpcstore/store.go b/client/filestorage/rpcstore/store.go index 57a0632a..697545cd 100644 --- a/client/filestorage/rpcstore/store.go +++ b/client/filestorage/rpcstore/store.go @@ -2,9 +2,10 @@ package rpcstore import ( "context" - "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + "go.uber.org/multierr" "go.uber.org/zap" "sync" ) @@ -16,6 +17,11 @@ func init() { close(closedBlockChan) } +type RpcStore interface { + fileblockstore.BlockStore + AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid) +} + type store struct { s *service cm *clientManager @@ -80,28 +86,53 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error { if err := s.cm.Add(ctx, tasks...); err != nil { return err } - var errs = &ErrPartial{} + var errs []error for i := 0; i < len(tasks); i++ { select { case t := <-readyCh: if t.err != nil { - errs.ErrorCids = append(errs.ErrorCids, ErrCid{Cid: t.cid, Err: t.err}) - } else { - errs.SuccessCids = append(errs.SuccessCids, t.cid) + errs = append(errs, t.err) } case <-ctx.Done(): - if len(errs.SuccessCids) > 0 { - return errs - } return ctx.Err() } } - if len(errs.ErrorCids) > 0 { - return errs + if len(errs) > 0 { + return multierr.Combine(errs...) } return nil } +func (s *store) AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid) { + successCh = make(chan cid.Cid, len(bs)) + go func() { + defer close(successCh) + var readyCh = make(chan *task) + var tasks = make([]*task, len(bs)) + for i, b := range bs { + tasks[i] = newTask(ctx, taskOpPut, b.Cid(), readyCh) + tasks[i].data = b.RawData() + } + if err := s.cm.Add(ctx, tasks...); err != nil { + log.Info("addAsync: can't add tasks", zap.Error(err)) + return + } + for i := 0; i < len(tasks); i++ { + select { + case t := <-readyCh: + if t.err == nil { + successCh <- t.cid + } else { + log.Info("addAsync: task error", zap.Error(t.err)) + } + case <-ctx.Done(): + return + } + } + }() + return +} + func (s *store) Delete(ctx context.Context, c cid.Cid) error { t := newTask(ctx, taskOpDelete, c, nil) if err := s.cm.Add(ctx, t); err != nil { @@ -118,17 +149,3 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error { func (s *store) Close() (err error) { return s.cm.Close() } - -type ErrPartial struct { - SuccessCids []cid.Cid - ErrorCids []ErrCid -} - -func (e ErrPartial) Error() string { - return fmt.Sprintf("cid errors; success: %d; error: %d", len(e.SuccessCids), len(e.ErrorCids)) -} - -type ErrCid struct { - Cid cid.Cid - Err error -} diff --git a/client/filestorage/syncer.go b/client/filestorage/syncer.go index 410103b0..e3310eab 100644 --- a/client/filestorage/syncer.go +++ b/client/filestorage/syncer.go @@ -3,7 +3,6 @@ package filestorage import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" - "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" blocks "github.com/ipfs/go-block-format" "go.uber.org/zap" @@ -107,22 +106,10 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) bs = append(bs, b) } ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) - err := s.ps.origin.Add(ctx, bs) - if err != nil { - log.Debug("syncer: can't add to remote store", zap.Error(err)) - if cerrs, ok := err.(*rpcstore.ErrPartial); ok { - log.Debug("partial sync", zap.Int("success", len(cerrs.SuccessCids)), zap.Int("error", len(cerrs.ErrorCids))) - if len(cerrs.SuccessCids) == 0 { - return - } - for _, doneCid := range cerrs.SuccessCids { - doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid) - } - } - } else { - for _, b := range bs { - doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid()) - } + + successCidsCh := s.ps.origin.AddAsync(ctx, bs) + for doneCid := range successCidsCh { + doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid) } doneCount = int32(doneCids.Len()) @@ -130,7 +117,7 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) return } - if err = s.ps.index.Done(doneCids); err != nil { + if err := s.ps.index.Done(doneCids); err != nil { log.Error("syncer: index.Done error", zap.Error(err)) return }