From 4754c9704b4c6e40490220ad9f19635e5e8956c8 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 17 Sep 2022 20:54:23 +0200 Subject: [PATCH] Move diff logic to separate service, simplify send sync --- common/commonspace/diffservice/diffservice.go | 155 +++++++ .../{ => diffservice}/periodicsync.go | 2 +- common/commonspace/remotediff/remotediff.go | 2 +- .../commonspace/remotediff/remotediff_test.go | 2 +- common/commonspace/rpchandler.go | 3 +- common/commonspace/service.go | 7 +- common/commonspace/space.go | 138 ++---- .../spacesyncproto/protos/spacesync.proto | 12 +- .../commonspace/spacesyncproto/spacesync.go | 16 +- .../spacesyncproto/spacesync.pb.go | 421 +++++++++++++----- common/commonspace/syncservice/streampool.go | 68 ++- common/commonspace/syncservice/synchandler.go | 57 +-- common/commonspace/syncservice/syncservice.go | 86 ++-- 13 files changed, 652 insertions(+), 317 deletions(-) create mode 100644 common/commonspace/diffservice/diffservice.go rename common/commonspace/{ => diffservice}/periodicsync.go (98%) diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go new file mode 100644 index 00000000..b530d05f --- /dev/null +++ b/common/commonspace/diffservice/diffservice.go @@ -0,0 +1,155 @@ +package diffservice + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" + "go.uber.org/zap" + "strings" + "time" +) + +type DiffService interface { + HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) + UpdateHeads(id string, heads []string) + RemoveObject(id string) + + Init(objectIds []string) + Close() (err error) +} + +type diffService struct { + spaceId string + periodicSync *periodicSync + storage storage.Storage + nconf nodeconf.Configuration + diff ldiff.Diff + cache cache.TreeCache + log *zap.Logger + + syncPeriod int +} + +func NewDiffService( + spaceId string, + syncPeriod int, + storage storage.Storage, + nconf nodeconf.Configuration, + cache cache.TreeCache, + log *zap.Logger) DiffService { + return &diffService{ + spaceId: spaceId, + storage: storage, + nconf: nconf, + cache: cache, + log: log, + syncPeriod: syncPeriod, + } +} + +func (d *diffService) Init(objectIds []string) { + d.periodicSync = newPeriodicSync(d.syncPeriod, d.sync, d.log.With(zap.String("spaceId", d.spaceId))) + d.diff = ldiff.New(16, 16) + d.fillDiff(objectIds) +} + +func (d *diffService) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { + return remotediff.HandleRangeRequest(ctx, d.diff, req) +} + +func (d *diffService) UpdateHeads(id string, heads []string) { + d.diff.Set(ldiff.Element{ + Id: id, + Head: concatStrings(heads), + }) +} + +func (d *diffService) RemoveObject(id string) { + // TODO: add space document to remove ids + d.diff.RemoveId(id) +} + +func (d *diffService) Close() (err error) { + d.periodicSync.Close() + return nil +} + +func (d *diffService) sync(ctx context.Context) error { + st := time.Now() + // diffing with responsible peers according to configuration + peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId) + if err != nil { + return err + } + for _, p := range peers { + if err := d.syncWithPeer(ctx, p); err != nil { + d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) + } + } + d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) + return nil +} + +func (d *diffService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { + cl := spacesyncproto.NewDRPCSpaceClient(p) + rdiff := remotediff.NewRemoteDiff(d.spaceId, cl) + newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) + if err != nil { + return nil + } + + d.pingTreesInCache(ctx, newIds) + d.pingTreesInCache(ctx, changedIds) + + d.log.Info("sync done:", zap.Int("newIds", len(newIds)), + zap.Int("changedIds", len(changedIds)), + zap.Int("removedIds", len(removedIds))) + return +} + +func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) { + for _, tId := range trees { + _, _ = d.cache.GetTree(ctx, tId) + } +} + +func (d *diffService) fillDiff(objectIds []string) { + var els = make([]ldiff.Element, 0, len(objectIds)) + for _, id := range objectIds { + st, err := d.storage.Storage(id) + if err != nil { + continue + } + heads, err := st.(treestorage.TreeStorage).Heads() + if err != nil { + continue + } + els = append(els, ldiff.Element{ + Id: id, + Head: concatStrings(heads), + }) + } + d.diff.Set(els...) +} + +func concatStrings(strs []string) string { + var ( + b strings.Builder + totalLen int + ) + for _, s := range strs { + totalLen += len(s) + } + + b.Grow(totalLen) + for _, s := range strs { + b.WriteString(s) + } + return b.String() +} diff --git a/common/commonspace/periodicsync.go b/common/commonspace/diffservice/periodicsync.go similarity index 98% rename from common/commonspace/periodicsync.go rename to common/commonspace/diffservice/periodicsync.go index 219f953e..59616eab 100644 --- a/common/commonspace/periodicsync.go +++ b/common/commonspace/diffservice/periodicsync.go @@ -1,4 +1,4 @@ -package commonspace +package diffservice import ( "context" diff --git a/common/commonspace/remotediff/remotediff.go b/common/commonspace/remotediff/remotediff.go index 1a01814c..8121b1c1 100644 --- a/common/commonspace/remotediff/remotediff.go +++ b/common/commonspace/remotediff/remotediff.go @@ -60,7 +60,7 @@ func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff return } -func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { +func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { ranges := make([]ldiff.Range, 0, len(req.Ranges)) for _, reqRange := range req.Ranges { ranges = append(ranges, ldiff.Range{ diff --git a/common/commonspace/remotediff/remotediff_test.go b/common/commonspace/remotediff/remotediff_test.go index d209b753..f6b3e7e5 100644 --- a/common/commonspace/remotediff/remotediff_test.go +++ b/common/commonspace/remotediff/remotediff_test.go @@ -37,5 +37,5 @@ type mockClient struct { } func (m *mockClient) HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - return HandlerRangeRequest(ctx, m.l, in) + return HandleRangeRequest(ctx, m.l, in) } diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index 8d8aacac..dbc5b7aa 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -2,7 +2,6 @@ package commonspace import ( "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" ) @@ -16,7 +15,7 @@ type rpcHandler struct { } func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - return remotediff.HandlerRangeRequest(ctx, r.s.diff, req) + return r.s.DiffService().HandleRangeRequest(ctx, req) } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 3c084562..9598d53a 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -5,6 +5,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" @@ -44,12 +45,14 @@ func (s *service) Name() (name string) { } func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { - syncService := syncservice.NewSyncService(id, nil, s.configurationService.GetLast()) + lastConfiguration := s.configurationService.GetLast() + diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, s.storage, lastConfiguration, s.cache, log) + syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration) sp := &space{ id: id, - nconf: s.configurationService.GetLast(), conf: s.config, syncService: syncService, + diffService: diffService, cache: s.cache, storage: s.storage, } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 5723201b..91657bb7 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -2,24 +2,17 @@ package commonspace import ( "context" - "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" - "go.uber.org/zap" - "math/rand" "sync" - "time" ) type Space interface { @@ -27,6 +20,7 @@ type Space interface { SpaceSyncRpc() RpcHandler SyncService() syncservice.SyncService + DiffService() diffservice.DiffService CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) @@ -35,18 +29,40 @@ type Space interface { } type space struct { - id string - nconf nodeconf.Configuration - conf config.Space - diff ldiff.Diff - mu sync.RWMutex + id string + conf config.Space + mu sync.RWMutex - rpc *rpcHandler - periodicSync *periodicSync - syncService syncservice.SyncService - storage storage.Storage - cache cache.TreeCache - aclList list.ACLList + rpc *rpcHandler + + syncService syncservice.SyncService + diffService diffservice.DiffService + storage storage.Storage + cache cache.TreeCache + aclList list.ACLList +} + +func (s *space) Id() string { + return s.id +} + +func (s *space) Init(ctx context.Context) error { + s.rpc = &rpcHandler{s: s} + s.diffService.Init(s.getObjectIds()) + s.syncService.Init() + return nil +} + +func (s *space) SpaceSyncRpc() RpcHandler { + return s.rpc +} + +func (s *space) SyncService() syncservice.SyncService { + return s.syncService +} + +func (s *space) DiffService() diffservice.DiffService { + return s.diffService } func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) { @@ -63,10 +79,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr return s.syncService.StreamPool().SendSync( peerId, - spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id), - func(syncMessage *spacesyncproto.ObjectSyncMessage) bool { - return syncMessage.TreeId == id && syncMessage.GetContent().GetFullSyncResponse() != nil - }, + spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, ""), ) } @@ -105,85 +118,12 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) } -func (s *space) Id() string { - return s.id -} - -func (s *space) Init(ctx context.Context) error { - s.diff = ldiff.New(16, 16) - s.periodicSync = newPeriodicSync(s.conf.SyncPeriod, s.sync, log.With(zap.String("spaceId", s.id))) - s.rpc = &rpcHandler{s: s} - s.testFill() +func (s *space) getObjectIds() []string { + // TODO: add space object logic return nil } -func (s *space) SpaceSyncRpc() RpcHandler { - return s.rpc -} - -func (s *space) SyncService() syncservice.SyncService { - return s.syncService -} - -func (s *space) testFill() { - var n = 1000 - var els = make([]ldiff.Element, 0, n) - rand.Seed(time.Now().UnixNano()) - for i := 0; i < n; i++ { - if rand.Intn(n) > 2 { - id := fmt.Sprintf("%s.%d", s.id, i) - head := "head." + id - if rand.Intn(n) > n-10 { - head += ".modified" - } - el := ldiff.Element{ - Id: id, - Head: head, - } - els = append(els, el) - } - } - s.diff.Set(els...) -} - -func (s *space) sync(ctx context.Context) error { - st := time.Now() - // diffing with responsible peers according to configuration - peers, err := s.nconf.ResponsiblePeers(ctx, s.id) - if err != nil { - return err - } - for _, p := range peers { - if err := s.syncWithPeer(ctx, p); err != nil { - log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) - } - } - log.Info("synced", zap.String("spaceId", s.id), zap.Duration("dur", time.Since(st))) - return nil -} - -func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { - cl := spacesyncproto.NewDRPCSpaceClient(p) - rdiff := remotediff.NewRemoteDiff(s.id, cl) - newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff) - if err != nil { - return nil - } - - s.pingTreesInCache(ctx, newIds) - s.pingTreesInCache(ctx, changedIds) - - log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds))) - return -} - -func (s *space) pingTreesInCache(ctx context.Context, trees []string) { - for _, tId := range trees { - _, _ = s.cache.GetTree(ctx, tId) - } -} - func (s *space) Close() error { - s.periodicSync.Close() + s.diffService.Close() return s.syncService.Close() } diff --git a/common/commonspace/spacesyncproto/protos/spacesync.proto b/common/commonspace/spacesyncproto/protos/spacesync.proto index 7fd7d411..22f7e82d 100644 --- a/common/commonspace/spacesyncproto/protos/spacesync.proto +++ b/common/commonspace/spacesyncproto/protos/spacesync.proto @@ -51,9 +51,10 @@ message ObjectSyncMessage { ObjectSyncContentValue content = 2; acl.TreeHeader treeHeader = 3; string treeId = 4; - - string identity = 5; - string peerSignature = 6; + string trackingId = 5; +// +// string identity = 5; +// string peerSignature = 6; } // ObjectSyncContentValue provides different types for object sync @@ -62,6 +63,7 @@ message ObjectSyncContentValue { ObjectHeadUpdate headUpdate = 1; ObjectFullSyncRequest fullSyncRequest = 2; ObjectFullSyncResponse fullSyncResponse = 3; + ObjectErrorResponse errorResponse = 4; } } @@ -85,3 +87,7 @@ message ObjectFullSyncResponse { repeated acl.RawTreeChangeWithId changes = 2; repeated string snapshotPath = 3; } + +message ObjectErrorResponse { + string error = 1; +} diff --git a/common/commonspace/spacesyncproto/spacesync.go b/common/commonspace/spacesyncproto/spacesync.go index 66de459d..25ec1ba6 100644 --- a/common/commonspace/spacesyncproto/spacesync.go +++ b/common/commonspace/spacesyncproto/spacesync.go @@ -4,7 +4,7 @@ import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclch type SpaceStream = DRPCSpace_StreamStream -func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { +func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage { return &ObjectSyncMessage{ Content: &ObjectSyncContentValue{ Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update}, @@ -14,7 +14,7 @@ func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId s } } -func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { +func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage { return &ObjectSyncMessage{ Content: &ObjectSyncContentValue{ Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request}, @@ -24,7 +24,7 @@ func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, t } } -func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { +func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage { return &ObjectSyncMessage{ Content: &ObjectSyncContentValue{ Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response}, @@ -33,3 +33,13 @@ func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader TreeId: treeId, } } + +func WrapError(err error, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage { + return &ObjectSyncMessage{ + Content: &ObjectSyncContentValue{ + Value: &ObjectSyncContentValue_ErrorResponse{ErrorResponse: &ObjectErrorResponse{Error: err.Error()}}, + }, + TreeHeader: header, + TreeId: treeId, + } +} diff --git a/common/commonspace/spacesyncproto/spacesync.pb.go b/common/commonspace/spacesyncproto/spacesync.pb.go index 64419bb0..04b77518 100644 --- a/common/commonspace/spacesyncproto/spacesync.pb.go +++ b/common/commonspace/spacesyncproto/spacesync.pb.go @@ -320,12 +320,11 @@ func (m *HeadSyncResponse) GetResults() []*HeadSyncResult { // ObjectSyncMessage is a message sent on object sync type ObjectSyncMessage struct { - SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` - Content *ObjectSyncContentValue `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` - TreeHeader *aclpb.TreeHeader `protobuf:"bytes,3,opt,name=treeHeader,proto3" json:"treeHeader,omitempty"` - TreeId string `protobuf:"bytes,4,opt,name=treeId,proto3" json:"treeId,omitempty"` - Identity string `protobuf:"bytes,5,opt,name=identity,proto3" json:"identity,omitempty"` - PeerSignature string `protobuf:"bytes,6,opt,name=peerSignature,proto3" json:"peerSignature,omitempty"` + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + Content *ObjectSyncContentValue `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` + TreeHeader *aclpb.TreeHeader `protobuf:"bytes,3,opt,name=treeHeader,proto3" json:"treeHeader,omitempty"` + TreeId string `protobuf:"bytes,4,opt,name=treeId,proto3" json:"treeId,omitempty"` + TrackingId string `protobuf:"bytes,5,opt,name=trackingId,proto3" json:"trackingId,omitempty"` } func (m *ObjectSyncMessage) Reset() { *m = ObjectSyncMessage{} } @@ -389,16 +388,9 @@ func (m *ObjectSyncMessage) GetTreeId() string { return "" } -func (m *ObjectSyncMessage) GetIdentity() string { +func (m *ObjectSyncMessage) GetTrackingId() string { if m != nil { - return m.Identity - } - return "" -} - -func (m *ObjectSyncMessage) GetPeerSignature() string { - if m != nil { - return m.PeerSignature + return m.TrackingId } return "" } @@ -409,6 +401,7 @@ type ObjectSyncContentValue struct { // *ObjectSyncContentValue_HeadUpdate // *ObjectSyncContentValue_FullSyncRequest // *ObjectSyncContentValue_FullSyncResponse + // *ObjectSyncContentValue_ErrorResponse Value isObjectSyncContentValue_Value `protobuf_oneof:"value"` } @@ -460,10 +453,14 @@ type ObjectSyncContentValue_FullSyncRequest struct { type ObjectSyncContentValue_FullSyncResponse struct { FullSyncResponse *ObjectFullSyncResponse `protobuf:"bytes,3,opt,name=fullSyncResponse,proto3,oneof" json:"fullSyncResponse,omitempty"` } +type ObjectSyncContentValue_ErrorResponse struct { + ErrorResponse *ObjectErrorResponse `protobuf:"bytes,4,opt,name=errorResponse,proto3,oneof" json:"errorResponse,omitempty"` +} func (*ObjectSyncContentValue_HeadUpdate) isObjectSyncContentValue_Value() {} func (*ObjectSyncContentValue_FullSyncRequest) isObjectSyncContentValue_Value() {} func (*ObjectSyncContentValue_FullSyncResponse) isObjectSyncContentValue_Value() {} +func (*ObjectSyncContentValue_ErrorResponse) isObjectSyncContentValue_Value() {} func (m *ObjectSyncContentValue) GetValue() isObjectSyncContentValue_Value { if m != nil { @@ -493,12 +490,20 @@ func (m *ObjectSyncContentValue) GetFullSyncResponse() *ObjectFullSyncResponse { return nil } +func (m *ObjectSyncContentValue) GetErrorResponse() *ObjectErrorResponse { + if x, ok := m.GetValue().(*ObjectSyncContentValue_ErrorResponse); ok { + return x.ErrorResponse + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*ObjectSyncContentValue) XXX_OneofWrappers() []interface{} { return []interface{}{ (*ObjectSyncContentValue_HeadUpdate)(nil), (*ObjectSyncContentValue_FullSyncRequest)(nil), (*ObjectSyncContentValue_FullSyncResponse)(nil), + (*ObjectSyncContentValue_ErrorResponse)(nil), } } @@ -685,6 +690,50 @@ func (m *ObjectFullSyncResponse) GetSnapshotPath() []string { return nil } +type ObjectErrorResponse struct { + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` +} + +func (m *ObjectErrorResponse) Reset() { *m = ObjectErrorResponse{} } +func (m *ObjectErrorResponse) String() string { return proto.CompactTextString(m) } +func (*ObjectErrorResponse) ProtoMessage() {} +func (*ObjectErrorResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_5855f4ef9cf24cdb, []int{10} +} +func (m *ObjectErrorResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ObjectErrorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ObjectErrorResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ObjectErrorResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ObjectErrorResponse.Merge(m, src) +} +func (m *ObjectErrorResponse) XXX_Size() int { + return m.Size() +} +func (m *ObjectErrorResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ObjectErrorResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ObjectErrorResponse proto.InternalMessageInfo + +func (m *ObjectErrorResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + func init() { proto.RegisterEnum("anySpace.ErrCodes", ErrCodes_name, ErrCodes_value) proto.RegisterType((*HeadSyncRange)(nil), "anySpace.HeadSyncRange") @@ -697,6 +746,7 @@ func init() { proto.RegisterType((*ObjectHeadUpdate)(nil), "anySpace.ObjectHeadUpdate") proto.RegisterType((*ObjectFullSyncRequest)(nil), "anySpace.ObjectFullSyncRequest") proto.RegisterType((*ObjectFullSyncResponse)(nil), "anySpace.ObjectFullSyncResponse") + proto.RegisterType((*ObjectErrorResponse)(nil), "anySpace.ObjectErrorResponse") } func init() { @@ -704,51 +754,52 @@ func init() { } var fileDescriptor_5855f4ef9cf24cdb = []byte{ - // 695 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xcb, 0x6f, 0xd3, 0x4e, - 0x10, 0x8e, 0xd3, 0xe6, 0xd1, 0x49, 0x1f, 0xf9, 0xad, 0x7e, 0x2d, 0x26, 0x48, 0x69, 0xb0, 0x38, - 0x44, 0x1c, 0x92, 0x2a, 0x5c, 0x10, 0xe5, 0x42, 0xab, 0x56, 0x89, 0x10, 0x0f, 0x6d, 0x28, 0x48, - 0x88, 0xcb, 0xd6, 0x9e, 0x26, 0x06, 0xc7, 0x36, 0xde, 0x0d, 0x25, 0x37, 0x2e, 0x70, 0x46, 0xfc, - 0x55, 0x1c, 0x7b, 0xe4, 0x88, 0xda, 0x7f, 0x81, 0x23, 0x07, 0xb4, 0x63, 0x3b, 0x2f, 0xd2, 0x72, - 0xeb, 0x21, 0xf6, 0xce, 0xec, 0xf7, 0xcd, 0x7e, 0x33, 0xb3, 0x13, 0xc3, 0x7d, 0x3b, 0x18, 0x0c, - 0x02, 0xbf, 0x19, 0xbf, 0x64, 0x28, 0x6c, 0x6c, 0xd2, 0x53, 0x8e, 0x7c, 0x3b, 0x8c, 0x02, 0x15, - 0x34, 0xe9, 0x29, 0x27, 0xde, 0x06, 0x39, 0x58, 0x51, 0xf8, 0xa3, 0xae, 0xf6, 0x55, 0x76, 0xc2, - 0x77, 0xbd, 0xa6, 0xb0, 0x3d, 0xfd, 0xb3, 0xfb, 0xc2, 0xef, 0xa1, 0xd4, 0xcb, 0xf0, 0x38, 0xa5, - 0x4e, 0xfc, 0x31, 0xd7, 0xea, 0xc0, 0x5a, 0x1b, 0x85, 0xd3, 0x1d, 0xf9, 0x36, 0xd7, 0x7e, 0xc6, - 0x60, 0xf9, 0x24, 0x0a, 0x06, 0xa6, 0x51, 0x33, 0xea, 0xcb, 0x9c, 0xd6, 0x6c, 0x1d, 0xb2, 0x2a, - 0x30, 0xb3, 0xe4, 0xc9, 0xaa, 0x80, 0xfd, 0x0f, 0x39, 0xcf, 0x1d, 0xb8, 0xca, 0x5c, 0xaa, 0x19, - 0xf5, 0x35, 0x1e, 0x1b, 0xd6, 0x29, 0xac, 0x8f, 0x43, 0xa1, 0x1c, 0x7a, 0x4a, 0xc7, 0xea, 0x0b, - 0xd9, 0xa7, 0x58, 0xab, 0x9c, 0xd6, 0x6c, 0x17, 0x8a, 0xe8, 0xe1, 0x00, 0x7d, 0x25, 0xcd, 0x6c, - 0x6d, 0xa9, 0x5e, 0x6a, 0x6d, 0x37, 0x52, 0xfd, 0x8d, 0x59, 0xfe, 0x41, 0x8c, 0xe3, 0x63, 0x82, - 0x3e, 0xd8, 0x0e, 0x86, 0xfe, 0xf8, 0x60, 0x32, 0xac, 0x5d, 0xd8, 0x5c, 0x48, 0xd4, 0xba, 0x5d, - 0x87, 0x4e, 0x5f, 0xe1, 0x59, 0xd7, 0x21, 0x3d, 0x28, 0x1c, 0xca, 0x64, 0x85, 0xd3, 0xda, 0x7a, - 0x03, 0x1b, 0x13, 0xf2, 0xfb, 0x21, 0x4a, 0xc5, 0x4c, 0x28, 0x50, 0x89, 0x3b, 0x29, 0x37, 0x35, - 0x59, 0x13, 0xf2, 0x11, 0x55, 0x2f, 0x91, 0x7e, 0x63, 0x81, 0x74, 0xbd, 0xcf, 0x13, 0x98, 0x75, - 0x08, 0xe5, 0x29, 0x69, 0x61, 0xe0, 0x4b, 0x64, 0x2d, 0x28, 0x44, 0x24, 0x53, 0x9a, 0x06, 0x45, - 0x31, 0x2f, 0x2b, 0x00, 0x4f, 0x81, 0xd6, 0x2f, 0x03, 0xfe, 0x7b, 0x76, 0xfc, 0x16, 0x6d, 0xa5, - 0x77, 0x9f, 0xa0, 0x94, 0xa2, 0x87, 0x57, 0x08, 0x7d, 0x00, 0x05, 0x3b, 0xf0, 0x15, 0xfa, 0x8a, - 0x92, 0x2d, 0xb5, 0x6a, 0x93, 0x33, 0x26, 0x71, 0xf6, 0x63, 0xc8, 0x4b, 0xe1, 0x0d, 0x91, 0xa7, - 0x04, 0xd6, 0x04, 0x50, 0x11, 0xa2, 0x96, 0x82, 0x11, 0x55, 0xba, 0xd4, 0xda, 0x68, 0x08, 0xdb, - 0x6b, 0xbc, 0x18, 0xbb, 0xf9, 0x14, 0x84, 0x6d, 0x41, 0x5e, 0x5b, 0x1d, 0xc7, 0x5c, 0x26, 0x15, - 0x89, 0xc5, 0x2a, 0x50, 0x74, 0x1d, 0xf4, 0x95, 0xab, 0x46, 0x66, 0x8e, 0x76, 0xc6, 0x36, 0xbb, - 0x03, 0x6b, 0x21, 0x62, 0xd4, 0x75, 0x7b, 0xbe, 0x50, 0xc3, 0x08, 0xcd, 0x3c, 0x01, 0x66, 0x9d, - 0xd6, 0x6f, 0x03, 0xb6, 0x16, 0xcb, 0x65, 0x0f, 0x01, 0x74, 0xff, 0x8e, 0x42, 0x47, 0x28, 0xa4, - 0xf4, 0x4b, 0xad, 0xca, 0x7c, 0x92, 0xed, 0x31, 0xa2, 0x9d, 0xe1, 0x53, 0x78, 0xf6, 0x18, 0x36, - 0x4e, 0x86, 0x9e, 0x37, 0xd5, 0xf5, 0xa4, 0x4e, 0xdb, 0xf3, 0x21, 0x0e, 0x67, 0x61, 0xed, 0x0c, - 0x9f, 0x67, 0xb2, 0xa7, 0x50, 0x9e, 0xb8, 0xe2, 0x26, 0x27, 0x65, 0xab, 0x5d, 0x1e, 0x2d, 0xc6, - 0xb5, 0x33, 0xfc, 0x2f, 0xee, 0x5e, 0x01, 0x72, 0x1f, 0x74, 0x8e, 0xd6, 0x27, 0x03, 0xca, 0xf3, - 0x89, 0xe8, 0x19, 0xd0, 0x89, 0xc4, 0x97, 0x67, 0x85, 0xc7, 0x86, 0xbe, 0x54, 0xc9, 0x60, 0x27, - 0x57, 0xd3, 0xa4, 0x8e, 0x71, 0x71, 0xaa, 0x9b, 0xb6, 0x4f, 0x5b, 0xaf, 0x5c, 0xd5, 0xef, 0x38, - 0x3c, 0x05, 0x32, 0x0b, 0x56, 0xa5, 0x2f, 0x42, 0xd9, 0x0f, 0xd4, 0x73, 0xa1, 0xfa, 0xe6, 0x12, - 0x05, 0x9c, 0xf1, 0x59, 0x9f, 0x0d, 0xd8, 0x5c, 0x58, 0x88, 0x6b, 0xd6, 0xf1, 0x65, 0x7c, 0x13, - 0xe6, 0x4b, 0x78, 0xbd, 0x42, 0xee, 0x56, 0xa0, 0x78, 0x10, 0x45, 0xfb, 0x81, 0x83, 0x92, 0xad, - 0x03, 0x1c, 0xf9, 0xf8, 0x31, 0x44, 0x5b, 0xa1, 0x53, 0xce, 0xb4, 0xbe, 0x19, 0x90, 0xa3, 0x6e, - 0xb3, 0x47, 0x50, 0x4c, 0x47, 0x99, 0xdd, 0x5c, 0x34, 0xde, 0x54, 0xc3, 0x4a, 0x65, 0xe1, 0xe4, - 0xc7, 0x69, 0x1d, 0x42, 0xbe, 0xab, 0x22, 0x14, 0x03, 0x76, 0x6b, 0xd1, 0xec, 0x26, 0xff, 0x01, - 0x95, 0xab, 0x36, 0xeb, 0xc6, 0x8e, 0xb1, 0xb7, 0xfb, 0xfd, 0xbc, 0x6a, 0x9c, 0x9d, 0x57, 0x8d, - 0x9f, 0xe7, 0x55, 0xe3, 0xeb, 0x45, 0x35, 0x73, 0x76, 0x51, 0xcd, 0xfc, 0xb8, 0xa8, 0x66, 0x5e, - 0xdf, 0xfe, 0xe7, 0x17, 0xe7, 0x38, 0x4f, 0xaf, 0x7b, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x57, - 0xb9, 0x93, 0x47, 0x9d, 0x06, 0x00, 0x00, + // 712 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0x4b, 0x6f, 0xd3, 0x40, + 0x10, 0xf6, 0xa6, 0x79, 0x75, 0xd2, 0x47, 0x58, 0x68, 0x31, 0x41, 0xa4, 0xc1, 0xa7, 0x08, 0xa4, + 0xa4, 0x0a, 0x17, 0x44, 0xb9, 0xd0, 0x2a, 0x55, 0x22, 0xc4, 0x43, 0x5b, 0x0a, 0x12, 0xe2, 0xb2, + 0xb5, 0xb7, 0x49, 0xa8, 0x63, 0x1b, 0xef, 0x86, 0xd2, 0x1b, 0x17, 0x38, 0x23, 0xfe, 0x12, 0x17, + 0x8e, 0x15, 0x27, 0x8e, 0xa8, 0xfd, 0x23, 0x68, 0xc7, 0x76, 0x5e, 0xb8, 0xe5, 0xd6, 0x43, 0xec, + 0x9d, 0xd9, 0xf9, 0xbe, 0xfd, 0xf6, 0x9b, 0xf5, 0x06, 0x1e, 0xda, 0xfe, 0x70, 0xe8, 0x7b, 0xcd, + 0xe8, 0x25, 0x03, 0x6e, 0x8b, 0x26, 0x3e, 0xe5, 0x89, 0x67, 0x07, 0xa1, 0xaf, 0xfc, 0x26, 0x3e, + 0xe5, 0x24, 0xdb, 0xc0, 0x04, 0x2d, 0x72, 0xef, 0x64, 0x4f, 0xe7, 0x2a, 0x9b, 0xc1, 0x51, 0xaf, + 0xc9, 0x6d, 0x57, 0xff, 0xec, 0x3e, 0xf7, 0x7a, 0x42, 0xea, 0x61, 0x70, 0x90, 0x40, 0x27, 0xf9, + 0x08, 0x6b, 0x75, 0x61, 0xb9, 0x23, 0xb8, 0xb3, 0x77, 0xe2, 0xd9, 0x4c, 0xe7, 0x29, 0x85, 0xec, + 0x61, 0xe8, 0x0f, 0x4d, 0x52, 0x23, 0xf5, 0x2c, 0xc3, 0x31, 0x5d, 0x81, 0x8c, 0xf2, 0xcd, 0x0c, + 0x66, 0x32, 0xca, 0xa7, 0x37, 0x20, 0xe7, 0x0e, 0x86, 0x03, 0x65, 0x2e, 0xd4, 0x48, 0x7d, 0x99, + 0x45, 0x81, 0x75, 0x0c, 0x2b, 0x63, 0x2a, 0x21, 0x47, 0xae, 0xd2, 0x5c, 0x7d, 0x2e, 0xfb, 0xc8, + 0xb5, 0xc4, 0x70, 0x4c, 0xb7, 0xa0, 0x28, 0x5c, 0x31, 0x14, 0x9e, 0x92, 0x66, 0xa6, 0xb6, 0x50, + 0x2f, 0xb5, 0x36, 0x1a, 0x89, 0xfe, 0xc6, 0x2c, 0xbe, 0x1d, 0xd5, 0xb1, 0x31, 0x40, 0x2f, 0x6c, + 0xfb, 0x23, 0x6f, 0xbc, 0x30, 0x06, 0xd6, 0x16, 0xac, 0xa5, 0x02, 0xb5, 0xee, 0x81, 0x83, 0xab, + 0x2f, 0xb2, 0xcc, 0xc0, 0x41, 0x3d, 0x82, 0x3b, 0xb8, 0x93, 0x45, 0x86, 0x63, 0xeb, 0x1d, 0xac, + 0x4e, 0xc0, 0x1f, 0x46, 0x42, 0x2a, 0x6a, 0x42, 0x01, 0x2d, 0xee, 0x26, 0xd8, 0x24, 0xa4, 0x4d, + 0xc8, 0x87, 0xe8, 0x5e, 0x2c, 0xfd, 0x66, 0x8a, 0x74, 0x3d, 0xcf, 0xe2, 0x32, 0x6b, 0x17, 0xca, + 0x53, 0xd2, 0x02, 0xdf, 0x93, 0x82, 0xb6, 0xa0, 0x10, 0xa2, 0x4c, 0x69, 0x12, 0x64, 0x31, 0x2f, + 0x32, 0x80, 0x25, 0x85, 0xd6, 0x2f, 0x02, 0xd7, 0x5e, 0x1c, 0xbc, 0x17, 0xb6, 0xd2, 0xb3, 0xcf, + 0x84, 0x94, 0xbc, 0x27, 0x2e, 0x11, 0xfa, 0x08, 0x0a, 0xb6, 0xef, 0x29, 0xe1, 0x29, 0xdc, 0x6c, + 0xa9, 0x55, 0x9b, 0xac, 0x31, 0xe1, 0xd9, 0x89, 0x4a, 0x5e, 0x73, 0x77, 0x24, 0x58, 0x02, 0xa0, + 0x4d, 0x00, 0x15, 0x0a, 0xa1, 0xa5, 0x88, 0x10, 0x9d, 0x2e, 0xb5, 0x56, 0x1b, 0xdc, 0x76, 0x1b, + 0xaf, 0xc6, 0x69, 0x36, 0x55, 0x42, 0xd7, 0x21, 0xaf, 0xa3, 0xae, 0x63, 0x66, 0x51, 0x45, 0x1c, + 0xd1, 0xaa, 0x26, 0xe2, 0xf6, 0xd1, 0xc0, 0xeb, 0x75, 0x1d, 0x33, 0x87, 0x73, 0x53, 0x19, 0xeb, + 0x47, 0x06, 0xd6, 0xd3, 0xc5, 0xd0, 0xc7, 0x00, 0xba, 0x3b, 0xfb, 0x81, 0xc3, 0x95, 0xc0, 0xcd, + 0x95, 0x5a, 0x95, 0xf9, 0x2d, 0x74, 0xc6, 0x15, 0x1d, 0x83, 0x4d, 0xd5, 0xd3, 0xa7, 0xb0, 0x7a, + 0x38, 0x72, 0xdd, 0xa9, 0x9e, 0xc6, 0x2e, 0x6c, 0xcc, 0x53, 0xec, 0xce, 0x96, 0x75, 0x0c, 0x36, + 0x8f, 0xa4, 0xcf, 0xa1, 0x3c, 0x49, 0x45, 0x2d, 0x8c, 0x4d, 0xa9, 0x5d, 0xcc, 0x16, 0xd5, 0x75, + 0x0c, 0xf6, 0x0f, 0x96, 0xb6, 0x61, 0x59, 0x84, 0xa1, 0x1f, 0x8e, 0xc9, 0xb2, 0x48, 0x76, 0x67, + 0x9e, 0xac, 0x3d, 0x5d, 0xd4, 0x31, 0xd8, 0x2c, 0x6a, 0xbb, 0x00, 0xb9, 0x8f, 0xda, 0x2a, 0xeb, + 0x33, 0x81, 0xf2, 0xbc, 0x1f, 0xfa, 0x43, 0xd1, 0x7e, 0x44, 0x27, 0x6c, 0x91, 0x45, 0x81, 0x3e, + 0x79, 0xf1, 0xd7, 0x1f, 0x9f, 0x5f, 0x13, 0xdb, 0xca, 0xf8, 0xb1, 0xee, 0xec, 0x0e, 0x4e, 0xbd, + 0x19, 0xa8, 0x7e, 0xd7, 0x61, 0x49, 0x21, 0xb5, 0x60, 0x49, 0x7a, 0x3c, 0x90, 0x7d, 0x5f, 0xbd, + 0xe4, 0xaa, 0x6f, 0x2e, 0x20, 0xe1, 0x4c, 0xce, 0xfa, 0x42, 0x60, 0x2d, 0xd5, 0xcf, 0x2b, 0xd6, + 0xf1, 0x95, 0x24, 0x07, 0x6a, 0xbe, 0x13, 0x57, 0x2c, 0xe4, 0x3e, 0x5c, 0x4f, 0x69, 0xa2, 0x16, + 0x81, 0x4d, 0x8c, 0xbf, 0xd6, 0x28, 0xb8, 0x57, 0x81, 0x62, 0x3b, 0x0c, 0x77, 0x7c, 0x47, 0x48, + 0xba, 0x02, 0xb0, 0xef, 0x89, 0x4f, 0x81, 0xb0, 0x95, 0x70, 0xca, 0x46, 0xeb, 0x3b, 0x81, 0x1c, + 0x1e, 0x0a, 0xfa, 0x04, 0x8a, 0xc9, 0xe5, 0x40, 0x6f, 0xa5, 0x5d, 0x18, 0x68, 0x78, 0xa5, 0x92, + 0x7a, 0x97, 0x44, 0xcb, 0xef, 0x42, 0x7e, 0x4f, 0x85, 0x82, 0x0f, 0xe9, 0xed, 0xb4, 0xdb, 0x20, + 0xbe, 0x55, 0x2a, 0x97, 0x4d, 0xd6, 0xc9, 0x26, 0xd9, 0xde, 0xfa, 0x79, 0x56, 0x25, 0xa7, 0x67, + 0x55, 0xf2, 0xe7, 0xac, 0x4a, 0xbe, 0x9d, 0x57, 0x8d, 0xd3, 0xf3, 0xaa, 0xf1, 0xfb, 0xbc, 0x6a, + 0xbc, 0xbd, 0xfb, 0xdf, 0xff, 0xb0, 0x83, 0x3c, 0xbe, 0x1e, 0xfc, 0x0d, 0x00, 0x00, 0xff, 0xff, + 0x93, 0x27, 0x56, 0x41, 0xef, 0x06, 0x00, 0x00, } func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { @@ -976,17 +1027,10 @@ func (m *ObjectSyncMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.PeerSignature) > 0 { - i -= len(m.PeerSignature) - copy(dAtA[i:], m.PeerSignature) - i = encodeVarintSpacesync(dAtA, i, uint64(len(m.PeerSignature))) - i-- - dAtA[i] = 0x32 - } - if len(m.Identity) > 0 { - i -= len(m.Identity) - copy(dAtA[i:], m.Identity) - i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Identity))) + if len(m.TrackingId) > 0 { + i -= len(m.TrackingId) + copy(dAtA[i:], m.TrackingId) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.TrackingId))) i-- dAtA[i] = 0x2a } @@ -1126,6 +1170,27 @@ func (m *ObjectSyncContentValue_FullSyncResponse) MarshalToSizedBuffer(dAtA []by } return len(dAtA) - i, nil } +func (m *ObjectSyncContentValue_ErrorResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ObjectSyncContentValue_ErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.ErrorResponse != nil { + { + size, err := m.ErrorResponse.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSpacesync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func (m *ObjectHeadUpdate) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1291,6 +1356,36 @@ func (m *ObjectFullSyncResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) return len(dAtA) - i, nil } +func (m *ObjectErrorResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ObjectErrorResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ObjectErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int { offset -= sovSpacesync(v) base := offset @@ -1415,11 +1510,7 @@ func (m *ObjectSyncMessage) Size() (n int) { if l > 0 { n += 1 + l + sovSpacesync(uint64(l)) } - l = len(m.Identity) - if l > 0 { - n += 1 + l + sovSpacesync(uint64(l)) - } - l = len(m.PeerSignature) + l = len(m.TrackingId) if l > 0 { n += 1 + l + sovSpacesync(uint64(l)) } @@ -1474,6 +1565,18 @@ func (m *ObjectSyncContentValue_FullSyncResponse) Size() (n int) { } return n } +func (m *ObjectSyncContentValue_ErrorResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ErrorResponse != nil { + l = m.ErrorResponse.Size() + n += 1 + l + sovSpacesync(uint64(l)) + } + return n +} func (m *ObjectHeadUpdate) Size() (n int) { if m == nil { return 0 @@ -1555,6 +1658,19 @@ func (m *ObjectFullSyncResponse) Size() (n int) { return n } +func (m *ObjectErrorResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Error) + if l > 0 { + n += 1 + l + sovSpacesync(uint64(l)) + } + return n +} + func sovSpacesync(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -2286,7 +2402,7 @@ func (m *ObjectSyncMessage) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 5: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Identity", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TrackingId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -2314,39 +2430,7 @@ func (m *ObjectSyncMessage) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Identity = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PeerSignature", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowSpacesync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthSpacesync - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthSpacesync - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.PeerSignature = string(dAtA[iNdEx:postIndex]) + m.TrackingId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2503,6 +2587,41 @@ func (m *ObjectSyncContentValue) Unmarshal(dAtA []byte) error { } m.Value = &ObjectSyncContentValue_FullSyncResponse{v} iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ErrorResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &ObjectErrorResponse{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Value = &ObjectSyncContentValue_ErrorResponse{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipSpacesync(dAtA[iNdEx:]) @@ -2968,6 +3087,88 @@ func (m *ObjectFullSyncResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *ObjectErrorResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ObjectErrorResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ObjectErrorResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSpacesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSpacesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipSpacesync(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index cbfb7424..65197270 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/sec" "storj.io/drpc/drpcctx" "sync" + "sync/atomic" ) var ErrEmptyPeer = errors.New("don't have such a peer") @@ -17,17 +18,15 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { + SyncClient AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) HasStream(peerId string) bool - SyncClient Close() (err error) } type SyncClient interface { - SendSync(peerId string, - message *spacesyncproto.ObjectSyncMessage, - msgCheck func(syncMessage *spacesyncproto.ObjectSyncMessage) bool) (reply *spacesyncproto.ObjectSyncMessage, err error) + SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) } @@ -35,8 +34,7 @@ type SyncClient interface { type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) type responseWaiter struct { - ch chan *spacesyncproto.ObjectSyncMessage - msgCheck func(message *spacesyncproto.ObjectSyncMessage) bool + ch chan *spacesyncproto.ObjectSyncMessage } type streamPool struct { @@ -46,6 +44,7 @@ type streamPool struct { wg *sync.WaitGroup waiters map[string]responseWaiter waitersMx sync.Mutex + counter uint64 } func newStreamPool(messageHandler MessageHandler) StreamPool { @@ -63,36 +62,23 @@ func (s *streamPool) HasStream(peerId string) (res bool) { func (s *streamPool) SendSync( peerId string, - message *spacesyncproto.ObjectSyncMessage, - msgCheck func(syncMessage *spacesyncproto.ObjectSyncMessage) bool) (reply *spacesyncproto.ObjectSyncMessage, err error) { + msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + newCounter := atomic.AddUint64(&s.counter, 1) + msg.TrackingId = genStreamPoolKey(peerId, msg.TreeId, newCounter) - sendAndWait := func(waiter responseWaiter) (err error) { - err = s.SendAsync(peerId, message) - if err != nil { - return - } - - reply = <-waiter.ch - return - } - - key := fmt.Sprintf("%s.%s", peerId, message.TreeId) s.waitersMx.Lock() - waiter, exists := s.waiters[key] - if exists { - s.waitersMx.Unlock() + waiter := responseWaiter{ + ch: make(chan *spacesyncproto.ObjectSyncMessage), + } + s.waiters[msg.TrackingId] = waiter + s.waitersMx.Unlock() - err = sendAndWait(waiter) + err = s.SendAsync(peerId, msg) + if err != nil { return } - waiter = responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage), - msgCheck: msgCheck, - } - s.waiters[key] = waiter - s.waitersMx.Unlock() - err = sendAndWait(waiter) + reply = <-waiter.ch return } @@ -189,17 +175,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre } process := func(msg *spacesyncproto.ObjectSyncMessage) { - key := fmt.Sprintf("%s.%s", peerId, msg.TreeId) - s.waitersMx.Lock() - waiter, exists := s.waiters[key] + if msg.TrackingId == "" { + s.messageHandler(stream.Context(), peerId, msg) + return + } - if !exists || !waiter.msgCheck(msg) { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.TrackingId] + + if !exists { s.waitersMx.Unlock() s.messageHandler(stream.Context(), peerId, msg) return } - delete(s.waiters, key) + delete(s.waiters, msg.TrackingId) s.waitersMx.Unlock() waiter.ch <- msg } @@ -216,10 +206,8 @@ Loop: break Loop } go func() { - defer func() { - limiter <- struct{}{} - }() process(msg) + limiter <- struct{}{} }() } return s.removePeer(peerId) @@ -244,3 +232,7 @@ func GetPeerIdFromStreamContext(ctx context.Context) (string, error) { return conn.RemotePeer().String(), nil } + +func genStreamPoolKey(peerId, treeId string, counter uint64) string { + return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter) +} diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index 9627f7a0..12798bc3 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -25,15 +25,15 @@ func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandl } } -func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error { - msg := message.GetContent() +func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) error { + content := msg.GetContent() switch { - case msg.GetFullSyncRequest() != nil: - return s.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), message.GetTreeHeader(), message.GetTreeId()) - case msg.GetFullSyncResponse() != nil: - return s.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), message.GetTreeHeader(), message.GetTreeId()) - case msg.GetHeadUpdate() != nil: - return s.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), message.GetTreeHeader(), message.GetTreeId()) + case content.GetFullSyncRequest() != nil: + return s.HandleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg) + case content.GetFullSyncResponse() != nil: + return s.HandleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse(), msg) + case content.GetHeadUpdate() != nil: + return s.HandleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg) } return nil } @@ -42,15 +42,13 @@ func (s *syncHandler) HandleHeadUpdate( ctx context.Context, senderId string, update *spacesyncproto.ObjectHeadUpdate, - header *aclpb.TreeHeader, - treeId string) (err error) { + msg *spacesyncproto.ObjectSyncMessage) (err error) { var ( fullRequest *spacesyncproto.ObjectFullSyncRequest result tree.AddResult ) - - res, err := s.treeCache.GetTree(ctx, treeId) + res, err := s.treeCache.GetTree(ctx, msg.TreeId) if err != nil { return } @@ -81,7 +79,8 @@ func (s *syncHandler) HandleHeadUpdate( }() if fullRequest != nil { - return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) + return s.syncClient.SendAsync(senderId, + spacesyncproto.WrapFullRequest(fullRequest, msg.TreeHeader, msg.TreeId, msg.TrackingId)) } return } @@ -90,12 +89,18 @@ func (s *syncHandler) HandleFullSyncRequest( ctx context.Context, senderId string, request *spacesyncproto.ObjectFullSyncRequest, - header *aclpb.TreeHeader, - treeId string) (err error) { + msg *spacesyncproto.ObjectSyncMessage) (err error) { + var ( + fullResponse *spacesyncproto.ObjectFullSyncResponse + header = msg.TreeHeader + ) + defer func() { + if err != nil { + s.syncClient.SendAsync(senderId, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId)) + } + }() - var fullResponse *spacesyncproto.ObjectFullSyncResponse - - res, err := s.treeCache.GetTree(ctx, treeId) + res, err := s.treeCache.GetTree(ctx, msg.TreeId) if err != nil { return } @@ -106,29 +111,32 @@ func (s *syncHandler) HandleFullSyncRequest( defer res.Release() defer objTree.Unlock() + if header == nil { + header = objTree.Header() + } + _, err = objTree.AddRawChanges(ctx, request.Changes...) if err != nil { return err } - fullResponse, err = s.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) + fullResponse, err = s.prepareFullSyncResponse(request.SnapshotPath, request.Heads, objTree) return err }() if err != nil { return } - return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId)) + return s.syncClient.SendAsync(senderId, + spacesyncproto.WrapFullResponse(fullResponse, header, msg.TreeId, msg.TrackingId)) } func (s *syncHandler) HandleFullSyncResponse( ctx context.Context, senderId string, response *spacesyncproto.ObjectFullSyncResponse, - header *aclpb.TreeHeader, - treeId string) (err error) { - - res, err := s.treeCache.GetTree(ctx, treeId) + msg *spacesyncproto.ObjectSyncMessage) (err error) { + res, err := s.treeCache.GetTree(ctx, msg.TreeId) if err != nil { return } @@ -173,7 +181,6 @@ func (s *syncHandler) prepareFullSyncRequest( } func (s *syncHandler) prepareFullSyncResponse( - treeId string, theirPath, theirHeads []string, t tree.ObjectTree) (*spacesyncproto.ObjectFullSyncResponse, error) { diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 2c155fea..736d40b7 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -12,39 +12,77 @@ import ( type SyncService interface { NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) StreamPool() StreamPool + + Init() Close() (err error) } +type HeadNotifiable interface { + UpdateHeads(id string, heads []string) +} + const respPeersStreamCheckInterval = time.Second * 10 type syncService struct { + spaceId string + syncHandler SyncHandler streamPool StreamPool + headNotifiable HeadNotifiable configuration nodeconf.Configuration - spaceId string + streamLoopCtx context.Context stopStreamLoop context.CancelFunc + streamLoopDone chan struct{} } -func (s *syncService) Run() { +func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService { + var syncHandler SyncHandler + streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + return syncHandler.HandleMessage(ctx, senderId, message) + }) + syncHandler = newSyncHandler(cache, streamPool) + return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration) +} + +func newSyncService( + spaceId string, + headNotifiable HeadNotifiable, + syncHandler SyncHandler, + streamPool StreamPool, + configuration nodeconf.Configuration) *syncService { + return &syncService{ + syncHandler: syncHandler, + streamPool: streamPool, + headNotifiable: headNotifiable, + configuration: configuration, + spaceId: spaceId, + streamLoopDone: make(chan struct{}), + } +} + +func (s *syncService) Init() { s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background()) - s.streamCheckLoop(s.streamLoopCtx) + go s.responsibleStreamCheckLoop(s.streamLoopCtx) } func (s *syncService) Close() (err error) { s.stopStreamLoop() + <-s.streamLoopDone return s.streamPool.Close() } func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { - return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId)) + s.headNotifiable.UpdateHeads(treeId, update.Heads) + return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId, "")) } -func (s *syncService) streamCheckLoop(ctx context.Context) { - for { +func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { + defer close(s.streamLoopDone) + checkResponsiblePeers := func() { respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId) if err != nil { - continue + return } for _, peer := range respPeers { if s.streamPool.HasStream(peer.Id()) { @@ -58,11 +96,17 @@ func (s *syncService) streamCheckLoop(ctx context.Context) { s.streamPool.AddAndReadStreamAsync(stream) } + } + + checkResponsiblePeers() + ticker := time.NewTicker(respPeersStreamCheckInterval) + defer ticker.Stop() + for { select { - case <-time.After(respPeersStreamCheckInterval): - break - case <-ctx.Done(): + case <-s.streamLoopCtx.Done(): return + case <-ticker.C: + checkResponsiblePeers() } } } @@ -70,25 +114,3 @@ func (s *syncService) streamCheckLoop(ctx context.Context) { func (s *syncService) StreamPool() StreamPool { return s.streamPool } - -func NewSyncService(spaceId string, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService { - var syncHandler SyncHandler - streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - return syncHandler.HandleMessage(ctx, senderId, message) - }) - syncHandler = newSyncHandler(cache, streamPool) - return newSyncService(spaceId, syncHandler, streamPool, configuration) -} - -func newSyncService( - spaceId string, - syncHandler SyncHandler, - streamPool StreamPool, - configuration nodeconf.Configuration) *syncService { - return &syncService{ - syncHandler: syncHandler, - streamPool: streamPool, - configuration: configuration, - spaceId: spaceId, - } -}