diff --git a/client/filestorage/rpcstore/store.go b/client/filestorage/rpcstore/store.go index 70cf2cee..57a0632a 100644 --- a/client/filestorage/rpcstore/store.go +++ b/client/filestorage/rpcstore/store.go @@ -2,9 +2,9 @@ package rpcstore import ( "context" + "fmt" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "go.uber.org/multierr" "go.uber.org/zap" "sync" ) @@ -27,7 +27,12 @@ func (s *store) Get(ctx context.Context, k cid.Cid) (b blocks.Block, err error) if err = s.cm.Add(ctx, t); err != nil { return } - <-t.ready + select { + case <-t.ready: + case <-ctx.Done(): + return nil, ctx.Err() + } + if err = t.Validate(); err != nil { return } @@ -75,18 +80,26 @@ func (s *store) Add(ctx context.Context, bs []blocks.Block) error { if err := s.cm.Add(ctx, tasks...); err != nil { return err } - var errs []error + var errs = &ErrPartial{} for i := 0; i < len(tasks); i++ { select { case t := <-readyCh: if t.err != nil { - errs = append(errs, t.err) + errs.ErrorCids = append(errs.ErrorCids, ErrCid{Cid: t.cid, Err: t.err}) + } else { + errs.SuccessCids = append(errs.SuccessCids, t.cid) } case <-ctx.Done(): + if len(errs.SuccessCids) > 0 { + return errs + } return ctx.Err() } } - return multierr.Combine(errs...) + if len(errs.ErrorCids) > 0 { + return errs + } + return nil } func (s *store) Delete(ctx context.Context, c cid.Cid) error { @@ -105,3 +118,17 @@ func (s *store) Delete(ctx context.Context, c cid.Cid) error { func (s *store) Close() (err error) { return s.cm.Close() } + +type ErrPartial struct { + SuccessCids []cid.Cid + ErrorCids []ErrCid +} + +func (e ErrPartial) Error() string { + return fmt.Sprintf("cid errors; success: %d; error: %d", len(e.SuccessCids), len(e.ErrorCids)) +} + +type ErrCid struct { + Cid cid.Cid + Err error +} diff --git a/client/filestorage/syncer.go b/client/filestorage/syncer.go index 30ff9d15..410103b0 100644 --- a/client/filestorage/syncer.go +++ b/client/filestorage/syncer.go @@ -3,6 +3,7 @@ package filestorage import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/badgerfilestore" + "github.com/anytypeio/go-anytype-infrastructure-experiments/client/filestorage/rpcstore" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileblockstore" blocks "github.com/ipfs/go-block-format" "go.uber.org/zap" @@ -104,19 +105,35 @@ func (s *syncer) add(ctx context.Context, spaceOps badgerfilestore.SpaceCidOps) var bs []blocks.Block for b := range res { bs = append(bs, b) - doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid()) } ctx = fileblockstore.CtxWithSpaceId(ctx, spaceOps.SpaceId) - if err := s.ps.origin.Add(ctx, bs); err != nil { + err := s.ps.origin.Add(ctx, bs) + if err != nil { log.Debug("syncer: can't add to remote store", zap.Error(err)) + if cerrs, ok := err.(*rpcstore.ErrPartial); ok { + log.Debug("partial sync", zap.Int("success", len(cerrs.SuccessCids)), zap.Int("error", len(cerrs.ErrorCids))) + if len(cerrs.SuccessCids) == 0 { + return + } + for _, doneCid := range cerrs.SuccessCids { + doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, doneCid) + } + } + } else { + for _, b := range bs { + doneCids.Add(spaceOps.SpaceId, badgerfilestore.OpAdd, b.Cid()) + } + } + + doneCount = int32(doneCids.Len()) + if doneCount == 0 { return } - if err := s.ps.index.Done(doneCids); err != nil { + if err = s.ps.index.Done(doneCids); err != nil { log.Error("syncer: index.Done error", zap.Error(err)) return } - doneCount = int32(doneCids.Len()) log.Info("successfully added cids", zap.Int32("count", doneCount), zap.Stringers("cids", doneCids.SpaceOps[0].Add)) return }