files: rpcstore.AddAsync + lazy connect in clientmgr

This commit is contained in:
Sergey Cherepanov 2023-01-03 15:08:07 +03:00 committed by Mikhail Iudin
parent 4294c7d2f9
commit 9c1f0acf8b
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
6 changed files with 83 additions and 52 deletions

View File

@ -3,6 +3,7 @@ package filestorage
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -13,7 +14,7 @@ import (
type proxyStore struct { type proxyStore struct {
cache fileblockstore.BlockStoreLocal cache fileblockstore.BlockStoreLocal
origin fileblockstore.BlockStore origin rpcstore.RpcStore
index *badgerfilestore.FileBadgerIndex index *badgerfilestore.FileBadgerIndex
} }

View File

@ -219,6 +219,19 @@ func (t *testStore) Add(ctx context.Context, bs []blocks.Block) error {
return nil return nil
} }
func (t *testStore) AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid) {
successCh = make(chan cid.Cid, len(bs))
go func() {
defer close(successCh)
for _, b := range bs {
if err := t.Add(ctx, []blocks.Block{b}); err == nil {
successCh <- b.Cid()
}
}
}()
return successCh
}
func (t *testStore) Delete(ctx context.Context, c cid.Cid) error { func (t *testStore) Delete(ctx context.Context, c cid.Cid) error {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()

View File

@ -32,7 +32,8 @@ func newClientManager(s *service) *clientManager {
ocache.WithLogger(log.Sugar()), ocache.WithLogger(log.Sugar()),
ocache.WithGCPeriod(0), ocache.WithGCPeriod(0),
), ),
s: s, checkPeersCh: make(chan struct{}),
s: s,
} }
cm.ctx, cm.ctxCancel = context.WithCancel(context.Background()) cm.ctx, cm.ctxCancel = context.WithCancel(context.Background())
go cm.checkPeerLoop() go cm.checkPeerLoop()
@ -41,16 +42,27 @@ func newClientManager(s *service) *clientManager {
// clientManager manages clients, removes unused ones, and adds new ones if necessary // clientManager manages clients, removes unused ones, and adds new ones if necessary
type clientManager struct { type clientManager struct {
mb *mb.MB[*task] mb *mb.MB[*task]
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
ocache ocache.OCache ocache ocache.OCache
checkPeersCh chan struct{}
s *service s *service
mu sync.RWMutex mu sync.RWMutex
} }
func (m *clientManager) Add(ctx context.Context, ts ...*task) (err error) { func (m *clientManager) Add(ctx context.Context, ts ...*task) (err error) {
defer func() {
m.mu.Lock()
if m.ocache.Len() == 0 {
select {
case m.checkPeersCh <- struct{}{}:
default:
}
}
m.mu.Unlock()
}()
return m.mb.Add(ctx, ts...) return m.mb.Add(ctx, ts...)
} }
@ -62,6 +74,8 @@ func (m *clientManager) checkPeerLoop() {
select { select {
case <-m.ctx.Done(): case <-m.ctx.Done():
return return
case <-m.checkPeersCh:
m.checkPeers()
case <-ticker.C: case <-ticker.C:
m.checkPeers() m.checkPeers()
} }
@ -75,7 +89,7 @@ func (m *clientManager) checkPeers() {
// reached connection limit, can't add new peers // reached connection limit, can't add new peers
return return
} }
if m.ocache.Len() != 0 && m.mb.Len() == 0 { if m.mb.Len() == 0 {
// has empty queue, no need new peers // has empty queue, no need new peers
return return
} }

View File

@ -3,7 +3,6 @@ package rpcstore
import ( import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
) )
@ -17,7 +16,7 @@ func New() Service {
} }
type Service interface { type Service interface {
NewStore() fileblockstore.BlockStore NewStore() RpcStore
app.Component app.Component
} }
@ -36,7 +35,7 @@ func (s *service) Name() (name string) {
return CName return CName
} }
func (s *service) NewStore() fileblockstore.BlockStore { func (s *service) NewStore() RpcStore {
cm := newClientManager(s) cm := newClientManager(s)
return &store{ return &store{
s: s, s: s,

View File

@ -2,9 +2,10 @@ package rpcstore
import ( import (
"context" "context"
"fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
) )
@ -16,6 +17,11 @@ func init() {
close(closedBlockChan) close(closedBlockChan)
} }
type RpcStore interface {
fileblockstore.BlockStore
AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid)
}
type store struct { type store struct {
s *service s *service
cm *clientManager cm *clientManager
@ -80,28 +86,53 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error {
if err := s.cm.Add(ctx, tasks...); err != nil { if err := s.cm.Add(ctx, tasks...); err != nil {
return err return err
} }
var errs = &ErrPartial{} var errs []error
for i := 0; i < len(tasks); i++ { for i := 0; i < len(tasks); i++ {
select { select {
case t := <-readyCh: case t := <-readyCh:
if t.err != nil { if t.err != nil {
errs.ErrorCids = append(errs.ErrorCids, ErrCid{Cid: t.cid, Err: t.err}) errs = append(errs, t.err)
} else {
errs.SuccessCids = append(errs.SuccessCids, t.cid)
} }
case <-ctx.Done(): case <-ctx.Done():
if len(errs.SuccessCids) > 0 {
return errs
}
return ctx.Err() return ctx.Err()
} }
} }
if len(errs.ErrorCids) > 0 { if len(errs) > 0 {
return errs return multierr.Combine(errs...)
} }
return nil return nil
} }
func (s *store) AddAsync(ctx context.Context, bs []blocks.Block) (successCh chan cid.Cid) {
successCh = make(chan cid.Cid, len(bs))
go func() {
defer close(successCh)
var readyCh = make(chan *task)
var tasks = make([]*task, len(bs))
for i, b := range bs {
tasks[i] = newTask(ctx, taskOpPut, b.Cid(), readyCh)
tasks[i].data = b.RawData()
}
if err := s.cm.Add(ctx, tasks...); err != nil {
log.Info("addAsync: can't add tasks", zap.Error(err))
return
}
for i := 0; i < len(tasks); i++ {
select {
case t := <-readyCh:
if t.err == nil {
successCh <- t.cid
} else {
log.Info("addAsync: task error", zap.Error(t.err))
}
case <-ctx.Done():
return
}
}
}()
return
}
func (s *store) Delete(ctx context.Context, c cid.Cid) error { func (s *store) Delete(ctx context.Context, c cid.Cid) error {
t := newTask(ctx, taskOpDelete, c, nil) t := newTask(ctx, taskOpDelete, c, nil)
if err := s.cm.Add(ctx, t); err != nil { if err := s.cm.Add(ctx, t); err != nil {
@ -118,17 +149,3 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error {
func (s *store) Close() (err error) { func (s *store) Close() (err error) {
return s.cm.Close() return s.cm.Close()
} }
type ErrPartial struct {
SuccessCids []cid.Cid
ErrorCids []ErrCid
}
func (e ErrPartial) Error() string {
return fmt.Sprintf("cid errors; success: %d; error: %d", len(e.SuccessCids), len(e.ErrorCids))
}
type ErrCid struct {
Cid cid.Cid
Err error
}

View File

@ -3,7 +3,6 @@ package filestorage
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"go.uber.org/zap" "go.uber.org/zap"
@ -107,22 +106,10 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps)
bs = append(bs, b) bs = append(bs, b)
} }
ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId)
err := s.ps.origin.Add(ctx, bs)
if err != nil { successCidsCh := s.ps.origin.AddAsync(ctx, bs)
log.Debug("syncer: can't add to remote store", zap.Error(err)) for doneCid := range successCidsCh {
if cerrs, ok := err.(*rpcstore.ErrPartial); ok { doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid)
log.Debug("partial sync", zap.Int("success", len(cerrs.SuccessCids)), zap.Int("error", len(cerrs.ErrorCids)))
if len(cerrs.SuccessCids) == 0 {
return
}
for _, doneCid := range cerrs.SuccessCids {
doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid)
}
}
} else {
for _, b := range bs {
doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid())
}
} }
doneCount = int32(doneCids.Len()) doneCount = int32(doneCids.Len())
@ -130,7 +117,7 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps)
return return
} }
if err = s.ps.index.Done(doneCids); err != nil { if err := s.ps.index.Done(doneCids); err != nil {
log.Error("syncer: index.Done error", zap.Error(err)) log.Error("syncer: index.Done error", zap.Error(err))
return return
} }