Merge pull request #26 from anytypeio/files

files:partial upload
This commit is contained in:
Sergey Cherepanov 2022-12-29 22:36:41 +03:00 committed by GitHub
commit a55306aa27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 9 deletions

View File

@ -2,9 +2,9 @@ package rpcstore
import ( import (
"context" "context"
"fmt"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
) )
@ -27,7 +27,12 @@ func (s *store) Get(ctx context.Context, k cid.Cid) (b blocks.Block, err error)
if err = s.cm.Add(ctx, t); err != nil { if err = s.cm.Add(ctx, t); err != nil {
return return
} }
<-t.ready select {
case <-t.ready:
case <-ctx.Done():
return nil, ctx.Err()
}
if err = t.Validate(); err != nil { if err = t.Validate(); err != nil {
return return
} }
@ -75,18 +80,26 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error {
if err := s.cm.Add(ctx, tasks...); err != nil { if err := s.cm.Add(ctx, tasks...); err != nil {
return err return err
} }
var errs []error var errs = &ErrPartial{}
for i := 0; i < len(tasks); i++ { for i := 0; i < len(tasks); i++ {
select { select {
case t := <-readyCh: case t := <-readyCh:
if t.err != nil { if t.err != nil {
errs = append(errs, t.err) errs.ErrorCids = append(errs.ErrorCids, ErrCid{Cid: t.cid, Err: t.err})
} else {
errs.SuccessCids = append(errs.SuccessCids, t.cid)
} }
case <-ctx.Done(): case <-ctx.Done():
if len(errs.SuccessCids) > 0 {
return errs
}
return ctx.Err() return ctx.Err()
} }
} }
return multierr.Combine(errs...) if len(errs.ErrorCids) > 0 {
return errs
}
return nil
} }
func (s *store) Delete(ctx context.Context, c cid.Cid) error { func (s *store) Delete(ctx context.Context, c cid.Cid) error {
@ -105,3 +118,17 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error {
func (s *store) Close() (err error) { func (s *store) Close() (err error) {
return s.cm.Close() return s.cm.Close()
} }
type ErrPartial struct {
SuccessCids []cid.Cid
ErrorCids []ErrCid
}
func (e ErrPartial) Error() string {
return fmt.Sprintf("cid errors; success: %d; error: %d", len(e.SuccessCids), len(e.ErrorCids))
}
type ErrCid struct {
Cid cid.Cid
Err error
}

View File

@ -3,6 +3,7 @@ package filestorage
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"go.uber.org/zap" "go.uber.org/zap"
@ -104,19 +105,35 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps)
var bs []blocks.Block var bs []blocks.Block
for b := range res { for b := range res {
bs = append(bs, b) bs = append(bs, b)
doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid())
} }
ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId)
if err := s.ps.origin.Add(ctx, bs); err != nil { err := s.ps.origin.Add(ctx, bs)
if err != nil {
log.Debug("syncer: can't add to remote store", zap.Error(err)) log.Debug("syncer: can't add to remote store", zap.Error(err))
if cerrs, ok := err.(*rpcstore.ErrPartial); ok {
log.Debug("partial sync", zap.Int("success", len(cerrs.SuccessCids)), zap.Int("error", len(cerrs.ErrorCids)))
if len(cerrs.SuccessCids) == 0 {
return
}
for _, doneCid := range cerrs.SuccessCids {
doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid)
}
}
} else {
for _, b := range bs {
doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid())
}
}
doneCount = int32(doneCids.Len())
if doneCount == 0 {
return return
} }
if err := s.ps.index.Done(doneCids); err != nil { if err = s.ps.index.Done(doneCids); err != nil {
log.Error("syncer: index.Done error", zap.Error(err)) log.Error("syncer: index.Done error", zap.Error(err))
return return
} }
doneCount = int32(doneCids.Len())
log.Info("successfully added cids", zap.Int32("count", doneCount), zap.Stringers("cids", doneCids.SpaceOps[0].Add)) log.Info("successfully added cids", zap.Int32("count", doneCount), zap.Stringers("cids", doneCids.SpaceOps[0].Add))
return return
} }