Move diff logic to separate service, simplify send sync

This commit is contained in:
mcrakhman 2022-09-17 20:54:23 +02:00 committed by Mikhail Iudin
parent ca1d0a43b1
commit 4754c9704b
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
13 changed files with 652 additions and 317 deletions

View File

@ -0,0 +1,155 @@
package diffservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
"go.uber.org/zap"
"strings"
"time"
)
type DiffService interface {
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
UpdateHeads(id string, heads []string)
RemoveObject(id string)
Init(objectIds []string)
Close() (err error)
}
type diffService struct {
spaceId string
periodicSync *periodicSync
storage storage.Storage
nconf nodeconf.Configuration
diff ldiff.Diff
cache cache.TreeCache
log *zap.Logger
syncPeriod int
}
func NewDiffService(
spaceId string,
syncPeriod int,
storage storage.Storage,
nconf nodeconf.Configuration,
cache cache.TreeCache,
log *zap.Logger) DiffService {
return &diffService{
spaceId: spaceId,
storage: storage,
nconf: nconf,
cache: cache,
log: log,
syncPeriod: syncPeriod,
}
}
func (d *diffService) Init(objectIds []string) {
d.periodicSync = newPeriodicSync(d.syncPeriod, d.sync, d.log.With(zap.String("spaceId", d.spaceId)))
d.diff = ldiff.New(16, 16)
d.fillDiff(objectIds)
}
func (d *diffService) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
return remotediff.HandleRangeRequest(ctx, d.diff, req)
}
func (d *diffService) UpdateHeads(id string, heads []string) {
d.diff.Set(ldiff.Element{
Id: id,
Head: concatStrings(heads),
})
}
func (d *diffService) RemoveObject(id string) {
// TODO: add space document to remove ids
d.diff.RemoveId(id)
}
func (d *diffService) Close() (err error) {
d.periodicSync.Close()
return nil
}
func (d *diffService) sync(ctx context.Context) error {
st := time.Now()
// diffing with responsible peers according to configuration
peers, err := d.nconf.ResponsiblePeers(ctx, d.spaceId)
if err != nil {
return err
}
for _, p := range peers {
if err := d.syncWithPeer(ctx, p); err != nil {
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
}
}
d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
return nil
}
func (d *diffService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
cl := spacesyncproto.NewDRPCSpaceClient(p)
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
if err != nil {
return nil
}
d.pingTreesInCache(ctx, newIds)
d.pingTreesInCache(ctx, changedIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)))
return
}
func (d *diffService) pingTreesInCache(ctx context.Context, trees []string) {
for _, tId := range trees {
_, _ = d.cache.GetTree(ctx, tId)
}
}
func (d *diffService) fillDiff(objectIds []string) {
var els = make([]ldiff.Element, 0, len(objectIds))
for _, id := range objectIds {
st, err := d.storage.Storage(id)
if err != nil {
continue
}
heads, err := st.(treestorage.TreeStorage).Heads()
if err != nil {
continue
}
els = append(els, ldiff.Element{
Id: id,
Head: concatStrings(heads),
})
}
d.diff.Set(els...)
}
func concatStrings(strs []string) string {
var (
b strings.Builder
totalLen int
)
for _, s := range strs {
totalLen += len(s)
}
b.Grow(totalLen)
for _, s := range strs {
b.WriteString(s)
}
return b.String()
}

View File

@ -1,4 +1,4 @@
package commonspace package diffservice
import ( import (
"context" "context"

View File

@ -60,7 +60,7 @@ func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff
return return
} }
func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
ranges := make([]ldiff.Range, 0, len(req.Ranges)) ranges := make([]ldiff.Range, 0, len(req.Ranges))
for _, reqRange := range req.Ranges { for _, reqRange := range req.Ranges {
ranges = append(ranges, ldiff.Range{ ranges = append(ranges, ldiff.Range{

View File

@ -37,5 +37,5 @@ type mockClient struct {
} }
func (m *mockClient) HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { func (m *mockClient) HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
return HandlerRangeRequest(ctx, m.l, in) return HandleRangeRequest(ctx, m.l, in)
} }

View File

@ -2,7 +2,6 @@ package commonspace
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
) )
@ -16,7 +15,7 @@ type rpcHandler struct {
} }
func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
return remotediff.HandlerRangeRequest(ctx, r.s.diff, req) return r.s.DiffService().HandleRangeRequest(ctx, req)
} }
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {

View File

@ -5,6 +5,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
@ -44,12 +45,14 @@ func (s *service) Name() (name string) {
} }
func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) {
syncService := syncservice.NewSyncService(id, nil, s.configurationService.GetLast()) lastConfiguration := s.configurationService.GetLast()
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, s.storage, lastConfiguration, s.cache, log)
syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration)
sp := &space{ sp := &space{
id: id, id: id,
nconf: s.configurationService.GetLast(),
conf: s.config, conf: s.config,
syncService: syncService, syncService: syncService,
diffService: diffService,
cache: s.cache, cache: s.cache,
storage: s.storage, storage: s.storage,
} }

View File

@ -2,24 +2,17 @@ package commonspace
import ( import (
"context" "context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
"go.uber.org/zap"
"math/rand"
"sync" "sync"
"time"
) )
type Space interface { type Space interface {
@ -27,6 +20,7 @@ type Space interface {
SpaceSyncRpc() RpcHandler SpaceSyncRpc() RpcHandler
SyncService() syncservice.SyncService SyncService() syncservice.SyncService
DiffService() diffservice.DiffService
CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error)
BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error)
@ -35,18 +29,40 @@ type Space interface {
} }
type space struct { type space struct {
id string id string
nconf nodeconf.Configuration conf config.Space
conf config.Space mu sync.RWMutex
diff ldiff.Diff
mu sync.RWMutex
rpc *rpcHandler rpc *rpcHandler
periodicSync *periodicSync
syncService syncservice.SyncService syncService syncservice.SyncService
storage storage.Storage diffService diffservice.DiffService
cache cache.TreeCache storage storage.Storage
aclList list.ACLList cache cache.TreeCache
aclList list.ACLList
}
func (s *space) Id() string {
return s.id
}
func (s *space) Init(ctx context.Context) error {
s.rpc = &rpcHandler{s: s}
s.diffService.Init(s.getObjectIds())
s.syncService.Init()
return nil
}
func (s *space) SpaceSyncRpc() RpcHandler {
return s.rpc
}
func (s *space) SyncService() syncservice.SyncService {
return s.syncService
}
func (s *space) DiffService() diffservice.DiffService {
return s.diffService
} }
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) { func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) {
@ -63,10 +79,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr
return s.syncService.StreamPool().SendSync( return s.syncService.StreamPool().SendSync(
peerId, peerId,
spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id), spacesyncproto.WrapFullRequest(&spacesyncproto.ObjectFullSyncRequest{}, nil, id, ""),
func(syncMessage *spacesyncproto.ObjectSyncMessage) bool {
return syncMessage.TreeId == id && syncMessage.GetContent().GetFullSyncResponse() != nil
},
) )
} }
@ -105,85 +118,12 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr
return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, s.aclList) return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, s.aclList)
} }
func (s *space) Id() string { func (s *space) getObjectIds() []string {
return s.id // TODO: add space object logic
}
func (s *space) Init(ctx context.Context) error {
s.diff = ldiff.New(16, 16)
s.periodicSync = newPeriodicSync(s.conf.SyncPeriod, s.sync, log.With(zap.String("spaceId", s.id)))
s.rpc = &rpcHandler{s: s}
s.testFill()
return nil return nil
} }
func (s *space) SpaceSyncRpc() RpcHandler {
return s.rpc
}
func (s *space) SyncService() syncservice.SyncService {
return s.syncService
}
func (s *space) testFill() {
var n = 1000
var els = make([]ldiff.Element, 0, n)
rand.Seed(time.Now().UnixNano())
for i := 0; i < n; i++ {
if rand.Intn(n) > 2 {
id := fmt.Sprintf("%s.%d", s.id, i)
head := "head." + id
if rand.Intn(n) > n-10 {
head += ".modified"
}
el := ldiff.Element{
Id: id,
Head: head,
}
els = append(els, el)
}
}
s.diff.Set(els...)
}
func (s *space) sync(ctx context.Context) error {
st := time.Now()
// diffing with responsible peers according to configuration
peers, err := s.nconf.ResponsiblePeers(ctx, s.id)
if err != nil {
return err
}
for _, p := range peers {
if err := s.syncWithPeer(ctx, p); err != nil {
log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
}
}
log.Info("synced", zap.String("spaceId", s.id), zap.Duration("dur", time.Since(st)))
return nil
}
func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
cl := spacesyncproto.NewDRPCSpaceClient(p)
rdiff := remotediff.NewRemoteDiff(s.id, cl)
newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff)
if err != nil {
return nil
}
s.pingTreesInCache(ctx, newIds)
s.pingTreesInCache(ctx, changedIds)
log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds)))
return
}
func (s *space) pingTreesInCache(ctx context.Context, trees []string) {
for _, tId := range trees {
_, _ = s.cache.GetTree(ctx, tId)
}
}
func (s *space) Close() error { func (s *space) Close() error {
s.periodicSync.Close() s.diffService.Close()
return s.syncService.Close() return s.syncService.Close()
} }

View File

@ -51,9 +51,10 @@ message ObjectSyncMessage {
ObjectSyncContentValue content = 2; ObjectSyncContentValue content = 2;
acl.TreeHeader treeHeader = 3; acl.TreeHeader treeHeader = 3;
string treeId = 4; string treeId = 4;
string trackingId = 5;
string identity = 5; //
string peerSignature = 6; // string identity = 5;
// string peerSignature = 6;
} }
// ObjectSyncContentValue provides different types for object sync // ObjectSyncContentValue provides different types for object sync
@ -62,6 +63,7 @@ message ObjectSyncContentValue {
ObjectHeadUpdate headUpdate = 1; ObjectHeadUpdate headUpdate = 1;
ObjectFullSyncRequest fullSyncRequest = 2; ObjectFullSyncRequest fullSyncRequest = 2;
ObjectFullSyncResponse fullSyncResponse = 3; ObjectFullSyncResponse fullSyncResponse = 3;
ObjectErrorResponse errorResponse = 4;
} }
} }
@ -85,3 +87,7 @@ message ObjectFullSyncResponse {
repeated acl.RawTreeChangeWithId changes = 2; repeated acl.RawTreeChangeWithId changes = 2;
repeated string snapshotPath = 3; repeated string snapshotPath = 3;
} }
message ObjectErrorResponse {
string error = 1;
}

View File

@ -4,7 +4,7 @@ import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclch
type SpaceStream = DRPCSpace_StreamStream type SpaceStream = DRPCSpace_StreamStream
func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{ return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{ Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update}, Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update},
@ -14,7 +14,7 @@ func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId s
} }
} }
func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{ return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{ Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request}, Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request},
@ -24,7 +24,7 @@ func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, t
} }
} }
func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage { func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{ return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{ Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response}, Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response},
@ -33,3 +33,13 @@ func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader
TreeId: treeId, TreeId: treeId,
} }
} }
func WrapError(err error, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_ErrorResponse{ErrorResponse: &ObjectErrorResponse{Error: err.Error()}},
},
TreeHeader: header,
TreeId: treeId,
}
}

View File

@ -320,12 +320,11 @@ func (m *HeadSyncResponse) GetResults() []*HeadSyncResult {
// ObjectSyncMessage is a message sent on object sync // ObjectSyncMessage is a message sent on object sync
type ObjectSyncMessage struct { type ObjectSyncMessage struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
Content *ObjectSyncContentValue `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` Content *ObjectSyncContentValue `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"`
TreeHeader *aclpb.TreeHeader `protobuf:"bytes,3,opt,name=treeHeader,proto3" json:"treeHeader,omitempty"` TreeHeader *aclpb.TreeHeader `protobuf:"bytes,3,opt,name=treeHeader,proto3" json:"treeHeader,omitempty"`
TreeId string `protobuf:"bytes,4,opt,name=treeId,proto3" json:"treeId,omitempty"` TreeId string `protobuf:"bytes,4,opt,name=treeId,proto3" json:"treeId,omitempty"`
Identity string `protobuf:"bytes,5,opt,name=identity,proto3" json:"identity,omitempty"` TrackingId string `protobuf:"bytes,5,opt,name=trackingId,proto3" json:"trackingId,omitempty"`
PeerSignature string `protobuf:"bytes,6,opt,name=peerSignature,proto3" json:"peerSignature,omitempty"`
} }
func (m *ObjectSyncMessage) Reset() { *m = ObjectSyncMessage{} } func (m *ObjectSyncMessage) Reset() { *m = ObjectSyncMessage{} }
@ -389,16 +388,9 @@ func (m *ObjectSyncMessage) GetTreeId() string {
return "" return ""
} }
func (m *ObjectSyncMessage) GetIdentity() string { func (m *ObjectSyncMessage) GetTrackingId() string {
if m != nil { if m != nil {
return m.Identity return m.TrackingId
}
return ""
}
func (m *ObjectSyncMessage) GetPeerSignature() string {
if m != nil {
return m.PeerSignature
} }
return "" return ""
} }
@ -409,6 +401,7 @@ type ObjectSyncContentValue struct {
// *ObjectSyncContentValue_HeadUpdate // *ObjectSyncContentValue_HeadUpdate
// *ObjectSyncContentValue_FullSyncRequest // *ObjectSyncContentValue_FullSyncRequest
// *ObjectSyncContentValue_FullSyncResponse // *ObjectSyncContentValue_FullSyncResponse
// *ObjectSyncContentValue_ErrorResponse
Value isObjectSyncContentValue_Value `protobuf_oneof:"value"` Value isObjectSyncContentValue_Value `protobuf_oneof:"value"`
} }
@ -460,10 +453,14 @@ type ObjectSyncContentValue_FullSyncRequest struct {
type ObjectSyncContentValue_FullSyncResponse struct { type ObjectSyncContentValue_FullSyncResponse struct {
FullSyncResponse *ObjectFullSyncResponse `protobuf:"bytes,3,opt,name=fullSyncResponse,proto3,oneof" json:"fullSyncResponse,omitempty"` FullSyncResponse *ObjectFullSyncResponse `protobuf:"bytes,3,opt,name=fullSyncResponse,proto3,oneof" json:"fullSyncResponse,omitempty"`
} }
type ObjectSyncContentValue_ErrorResponse struct {
ErrorResponse *ObjectErrorResponse `protobuf:"bytes,4,opt,name=errorResponse,proto3,oneof" json:"errorResponse,omitempty"`
}
func (*ObjectSyncContentValue_HeadUpdate) isObjectSyncContentValue_Value() {} func (*ObjectSyncContentValue_HeadUpdate) isObjectSyncContentValue_Value() {}
func (*ObjectSyncContentValue_FullSyncRequest) isObjectSyncContentValue_Value() {} func (*ObjectSyncContentValue_FullSyncRequest) isObjectSyncContentValue_Value() {}
func (*ObjectSyncContentValue_FullSyncResponse) isObjectSyncContentValue_Value() {} func (*ObjectSyncContentValue_FullSyncResponse) isObjectSyncContentValue_Value() {}
func (*ObjectSyncContentValue_ErrorResponse) isObjectSyncContentValue_Value() {}
func (m *ObjectSyncContentValue) GetValue() isObjectSyncContentValue_Value { func (m *ObjectSyncContentValue) GetValue() isObjectSyncContentValue_Value {
if m != nil { if m != nil {
@ -493,12 +490,20 @@ func (m *ObjectSyncContentValue) GetFullSyncResponse() *ObjectFullSyncResponse {
return nil return nil
} }
func (m *ObjectSyncContentValue) GetErrorResponse() *ObjectErrorResponse {
if x, ok := m.GetValue().(*ObjectSyncContentValue_ErrorResponse); ok {
return x.ErrorResponse
}
return nil
}
// XXX_OneofWrappers is for the internal use of the proto package. // XXX_OneofWrappers is for the internal use of the proto package.
func (*ObjectSyncContentValue) XXX_OneofWrappers() []interface{} { func (*ObjectSyncContentValue) XXX_OneofWrappers() []interface{} {
return []interface{}{ return []interface{}{
(*ObjectSyncContentValue_HeadUpdate)(nil), (*ObjectSyncContentValue_HeadUpdate)(nil),
(*ObjectSyncContentValue_FullSyncRequest)(nil), (*ObjectSyncContentValue_FullSyncRequest)(nil),
(*ObjectSyncContentValue_FullSyncResponse)(nil), (*ObjectSyncContentValue_FullSyncResponse)(nil),
(*ObjectSyncContentValue_ErrorResponse)(nil),
} }
} }
@ -685,6 +690,50 @@ func (m *ObjectFullSyncResponse) GetSnapshotPath() []string {
return nil return nil
} }
type ObjectErrorResponse struct {
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
}
func (m *ObjectErrorResponse) Reset() { *m = ObjectErrorResponse{} }
func (m *ObjectErrorResponse) String() string { return proto.CompactTextString(m) }
func (*ObjectErrorResponse) ProtoMessage() {}
func (*ObjectErrorResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_5855f4ef9cf24cdb, []int{10}
}
func (m *ObjectErrorResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ObjectErrorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ObjectErrorResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ObjectErrorResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ObjectErrorResponse.Merge(m, src)
}
func (m *ObjectErrorResponse) XXX_Size() int {
return m.Size()
}
func (m *ObjectErrorResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ObjectErrorResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ObjectErrorResponse proto.InternalMessageInfo
func (m *ObjectErrorResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func init() { func init() {
proto.RegisterEnum("anySpace.ErrCodes", ErrCodes_name, ErrCodes_value) proto.RegisterEnum("anySpace.ErrCodes", ErrCodes_name, ErrCodes_value)
proto.RegisterType((*HeadSyncRange)(nil), "anySpace.HeadSyncRange") proto.RegisterType((*HeadSyncRange)(nil), "anySpace.HeadSyncRange")
@ -697,6 +746,7 @@ func init() {
proto.RegisterType((*ObjectHeadUpdate)(nil), "anySpace.ObjectHeadUpdate") proto.RegisterType((*ObjectHeadUpdate)(nil), "anySpace.ObjectHeadUpdate")
proto.RegisterType((*ObjectFullSyncRequest)(nil), "anySpace.ObjectFullSyncRequest") proto.RegisterType((*ObjectFullSyncRequest)(nil), "anySpace.ObjectFullSyncRequest")
proto.RegisterType((*ObjectFullSyncResponse)(nil), "anySpace.ObjectFullSyncResponse") proto.RegisterType((*ObjectFullSyncResponse)(nil), "anySpace.ObjectFullSyncResponse")
proto.RegisterType((*ObjectErrorResponse)(nil), "anySpace.ObjectErrorResponse")
} }
func init() { func init() {
@ -704,51 +754,52 @@ func init() {
} }
var fileDescriptor_5855f4ef9cf24cdb = []byte{ var fileDescriptor_5855f4ef9cf24cdb = []byte{
// 695 bytes of a gzipped FileDescriptorProto // 712 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xcb, 0x6f, 0xd3, 0x4e, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0x4b, 0x6f, 0xd3, 0x40,
0x10, 0x8e, 0xd3, 0xe6, 0xd1, 0x49, 0x1f, 0xf9, 0xad, 0x7e, 0x2d, 0x26, 0x48, 0x69, 0xb0, 0x38, 0x10, 0xf6, 0xa6, 0x79, 0x75, 0xd2, 0x47, 0x58, 0x68, 0x31, 0x41, 0xa4, 0xc1, 0xa7, 0x08, 0xa4,
0x44, 0x1c, 0x92, 0x2a, 0x5c, 0x10, 0xe5, 0x42, 0xab, 0x56, 0x89, 0x10, 0x0f, 0x6d, 0x28, 0x48, 0xa4, 0x0a, 0x17, 0x44, 0xb9, 0xd0, 0x2a, 0x55, 0x22, 0xc4, 0x43, 0x5b, 0x0a, 0x12, 0xe2, 0xb2,
0x88, 0xcb, 0xd6, 0x9e, 0x26, 0x06, 0xc7, 0x36, 0xde, 0x0d, 0x25, 0x37, 0x2e, 0x70, 0x46, 0xfc, 0xb5, 0xb7, 0x49, 0xa8, 0x63, 0x1b, 0xef, 0x86, 0xd2, 0x1b, 0x17, 0x38, 0x23, 0xfe, 0x12, 0x17,
0x55, 0x1c, 0x7b, 0xe4, 0x88, 0xda, 0x7f, 0x81, 0x23, 0x07, 0xb4, 0x63, 0x3b, 0x2f, 0xd2, 0x72, 0x8e, 0x15, 0x27, 0x8e, 0xa8, 0xfd, 0x23, 0x68, 0xc7, 0x76, 0x5e, 0xb8, 0xe5, 0xd6, 0x43, 0xec,
0xeb, 0x21, 0xf6, 0xce, 0xec, 0xf7, 0xcd, 0x7e, 0x33, 0xb3, 0x13, 0xc3, 0x7d, 0x3b, 0x18, 0x0c, 0x9d, 0xd9, 0xf9, 0xbe, 0xfd, 0xf6, 0x9b, 0xf5, 0x06, 0x1e, 0xda, 0xfe, 0x70, 0xe8, 0x7b, 0xcd,
0x02, 0xbf, 0x19, 0xbf, 0x64, 0x28, 0x6c, 0x6c, 0xd2, 0x53, 0x8e, 0x7c, 0x3b, 0x8c, 0x02, 0x15, 0xe8, 0x25, 0x03, 0x6e, 0x8b, 0x26, 0x3e, 0xe5, 0x89, 0x67, 0x07, 0xa1, 0xaf, 0xfc, 0x26, 0x3e,
0x34, 0xe9, 0x29, 0x27, 0xde, 0x06, 0x39, 0x58, 0x51, 0xf8, 0xa3, 0xae, 0xf6, 0x55, 0x76, 0xc2, 0xe5, 0x24, 0xdb, 0xc0, 0x04, 0x2d, 0x72, 0xef, 0x64, 0x4f, 0xe7, 0x2a, 0x9b, 0xc1, 0x51, 0xaf,
0x77, 0xbd, 0xa6, 0xb0, 0x3d, 0xfd, 0xb3, 0xfb, 0xc2, 0xef, 0xa1, 0xd4, 0xcb, 0xf0, 0x38, 0xa5, 0xc9, 0x6d, 0x57, 0xff, 0xec, 0x3e, 0xf7, 0x7a, 0x42, 0xea, 0x61, 0x70, 0x90, 0x40, 0x27, 0xf9,
0x4e, 0xfc, 0x31, 0xd7, 0xea, 0xc0, 0x5a, 0x1b, 0x85, 0xd3, 0x1d, 0xf9, 0x36, 0xd7, 0x7e, 0xc6, 0x08, 0x6b, 0x75, 0x61, 0xb9, 0x23, 0xb8, 0xb3, 0x77, 0xe2, 0xd9, 0x4c, 0xe7, 0x29, 0x85, 0xec,
0x60, 0xf9, 0x24, 0x0a, 0x06, 0xa6, 0x51, 0x33, 0xea, 0xcb, 0x9c, 0xd6, 0x6c, 0x1d, 0xb2, 0x2a, 0x61, 0xe8, 0x0f, 0x4d, 0x52, 0x23, 0xf5, 0x2c, 0xc3, 0x31, 0x5d, 0x81, 0x8c, 0xf2, 0xcd, 0x0c,
0x30, 0xb3, 0xe4, 0xc9, 0xaa, 0x80, 0xfd, 0x0f, 0x39, 0xcf, 0x1d, 0xb8, 0xca, 0x5c, 0xaa, 0x19, 0x66, 0x32, 0xca, 0xa7, 0x37, 0x20, 0xe7, 0x0e, 0x86, 0x03, 0x65, 0x2e, 0xd4, 0x48, 0x7d, 0x99,
0xf5, 0x35, 0x1e, 0x1b, 0xd6, 0x29, 0xac, 0x8f, 0x43, 0xa1, 0x1c, 0x7a, 0x4a, 0xc7, 0xea, 0x0b, 0x45, 0x81, 0x75, 0x0c, 0x2b, 0x63, 0x2a, 0x21, 0x47, 0xae, 0xd2, 0x5c, 0x7d, 0x2e, 0xfb, 0xc8,
0xd9, 0xa7, 0x58, 0xab, 0x9c, 0xd6, 0x6c, 0x17, 0x8a, 0xe8, 0xe1, 0x00, 0x7d, 0x25, 0xcd, 0x6c, 0xb5, 0xc4, 0x70, 0x4c, 0xb7, 0xa0, 0x28, 0x5c, 0x31, 0x14, 0x9e, 0x92, 0x66, 0xa6, 0xb6, 0x50,
0x6d, 0xa9, 0x5e, 0x6a, 0x6d, 0x37, 0x52, 0xfd, 0x8d, 0x59, 0xfe, 0x41, 0x8c, 0xe3, 0x63, 0x82, 0x2f, 0xb5, 0x36, 0x1a, 0x89, 0xfe, 0xc6, 0x2c, 0xbe, 0x1d, 0xd5, 0xb1, 0x31, 0x40, 0x2f, 0x6c,
0x3e, 0xd8, 0x0e, 0x86, 0xfe, 0xf8, 0x60, 0x32, 0xac, 0x5d, 0xd8, 0x5c, 0x48, 0xd4, 0xba, 0x5d, 0xfb, 0x23, 0x6f, 0xbc, 0x30, 0x06, 0xd6, 0x16, 0xac, 0xa5, 0x02, 0xb5, 0xee, 0x81, 0x83, 0xab,
0x87, 0x4e, 0x5f, 0xe1, 0x59, 0xd7, 0x21, 0x3d, 0x28, 0x1c, 0xca, 0x64, 0x85, 0xd3, 0xda, 0x7a, 0x2f, 0xb2, 0xcc, 0xc0, 0x41, 0x3d, 0x82, 0x3b, 0xb8, 0x93, 0x45, 0x86, 0x63, 0xeb, 0x1d, 0xac,
0x03, 0x1b, 0x13, 0xf2, 0xfb, 0x21, 0x4a, 0xc5, 0x4c, 0x28, 0x50, 0x89, 0x3b, 0x29, 0x37, 0x35, 0x4e, 0xc0, 0x1f, 0x46, 0x42, 0x2a, 0x6a, 0x42, 0x01, 0x2d, 0xee, 0x26, 0xd8, 0x24, 0xa4, 0x4d,
0x59, 0x13, 0xf2, 0x11, 0x55, 0x2f, 0x91, 0x7e, 0x63, 0x81, 0x74, 0xbd, 0xcf, 0x13, 0x98, 0x75, 0xc8, 0x87, 0xe8, 0x5e, 0x2c, 0xfd, 0x66, 0x8a, 0x74, 0x3d, 0xcf, 0xe2, 0x32, 0x6b, 0x17, 0xca,
0x08, 0xe5, 0x29, 0x69, 0x61, 0xe0, 0x4b, 0x64, 0x2d, 0x28, 0x44, 0x24, 0x53, 0x9a, 0x06, 0x45, 0x53, 0xd2, 0x02, 0xdf, 0x93, 0x82, 0xb6, 0xa0, 0x10, 0xa2, 0x4c, 0x69, 0x12, 0x64, 0x31, 0x2f,
0x31, 0x2f, 0x2b, 0x00, 0x4f, 0x81, 0xd6, 0x2f, 0x03, 0xfe, 0x7b, 0x76, 0xfc, 0x16, 0x6d, 0xa5, 0x32, 0x80, 0x25, 0x85, 0xd6, 0x2f, 0x02, 0xd7, 0x5e, 0x1c, 0xbc, 0x17, 0xb6, 0xd2, 0xb3, 0xcf,
0x77, 0x9f, 0xa0, 0x94, 0xa2, 0x87, 0x57, 0x08, 0x7d, 0x00, 0x05, 0x3b, 0xf0, 0x15, 0xfa, 0x8a, 0x84, 0x94, 0xbc, 0x27, 0x2e, 0x11, 0xfa, 0x08, 0x0a, 0xb6, 0xef, 0x29, 0xe1, 0x29, 0xdc, 0x6c,
0x92, 0x2d, 0xb5, 0x6a, 0x93, 0x33, 0x26, 0x71, 0xf6, 0x63, 0xc8, 0x4b, 0xe1, 0x0d, 0x91, 0xa7, 0xa9, 0x55, 0x9b, 0xac, 0x31, 0xe1, 0xd9, 0x89, 0x4a, 0x5e, 0x73, 0x77, 0x24, 0x58, 0x02, 0xa0,
0x04, 0xd6, 0x04, 0x50, 0x11, 0xa2, 0x96, 0x82, 0x11, 0x55, 0xba, 0xd4, 0xda, 0x68, 0x08, 0xdb, 0x4d, 0x00, 0x15, 0x0a, 0xa1, 0xa5, 0x88, 0x10, 0x9d, 0x2e, 0xb5, 0x56, 0x1b, 0xdc, 0x76, 0x1b,
0x6b, 0xbc, 0x18, 0xbb, 0xf9, 0x14, 0x84, 0x6d, 0x41, 0x5e, 0x5b, 0x1d, 0xc7, 0x5c, 0x26, 0x15, 0xaf, 0xc6, 0x69, 0x36, 0x55, 0x42, 0xd7, 0x21, 0xaf, 0xa3, 0xae, 0x63, 0x66, 0x51, 0x45, 0x1c,
0x89, 0xc5, 0x2a, 0x50, 0x74, 0x1d, 0xf4, 0x95, 0xab, 0x46, 0x66, 0x8e, 0x76, 0xc6, 0x36, 0xbb, 0xd1, 0xaa, 0x26, 0xe2, 0xf6, 0xd1, 0xc0, 0xeb, 0x75, 0x1d, 0x33, 0x87, 0x73, 0x53, 0x19, 0xeb,
0x03, 0x6b, 0x21, 0x62, 0xd4, 0x75, 0x7b, 0xbe, 0x50, 0xc3, 0x08, 0xcd, 0x3c, 0x01, 0x66, 0x9d, 0x47, 0x06, 0xd6, 0xd3, 0xc5, 0xd0, 0xc7, 0x00, 0xba, 0x3b, 0xfb, 0x81, 0xc3, 0x95, 0xc0, 0xcd,
0xd6, 0x6f, 0x03, 0xb6, 0x16, 0xcb, 0x65, 0x0f, 0x01, 0x74, 0xff, 0x8e, 0x42, 0x47, 0x28, 0xa4, 0x95, 0x5a, 0x95, 0xf9, 0x2d, 0x74, 0xc6, 0x15, 0x1d, 0x83, 0x4d, 0xd5, 0xd3, 0xa7, 0xb0, 0x7a,
0xf4, 0x4b, 0xad, 0xca, 0x7c, 0x92, 0xed, 0x31, 0xa2, 0x9d, 0xe1, 0x53, 0x78, 0xf6, 0x18, 0x36, 0x38, 0x72, 0xdd, 0xa9, 0x9e, 0xc6, 0x2e, 0x6c, 0xcc, 0x53, 0xec, 0xce, 0x96, 0x75, 0x0c, 0x36,
0x4e, 0x86, 0x9e, 0x37, 0xd5, 0xf5, 0xa4, 0x4e, 0xdb, 0xf3, 0x21, 0x0e, 0x67, 0x61, 0xed, 0x0c, 0x8f, 0xa4, 0xcf, 0xa1, 0x3c, 0x49, 0x45, 0x2d, 0x8c, 0x4d, 0xa9, 0x5d, 0xcc, 0x16, 0xd5, 0x75,
0x9f, 0x67, 0xb2, 0xa7, 0x50, 0x9e, 0xb8, 0xe2, 0x26, 0x27, 0x65, 0xab, 0x5d, 0x1e, 0x2d, 0xc6, 0x0c, 0xf6, 0x0f, 0x96, 0xb6, 0x61, 0x59, 0x84, 0xa1, 0x1f, 0x8e, 0xc9, 0xb2, 0x48, 0x76, 0x67,
0xb5, 0x33, 0xfc, 0x2f, 0xee, 0x5e, 0x01, 0x72, 0x1f, 0x74, 0x8e, 0xd6, 0x27, 0x03, 0xca, 0xf3, 0x9e, 0xac, 0x3d, 0x5d, 0xd4, 0x31, 0xd8, 0x2c, 0x6a, 0xbb, 0x00, 0xb9, 0x8f, 0xda, 0x2a, 0xeb,
0x89, 0xe8, 0x19, 0xd0, 0x89, 0xc4, 0x97, 0x67, 0x85, 0xc7, 0x86, 0xbe, 0x54, 0xc9, 0x60, 0x27, 0x33, 0x81, 0xf2, 0xbc, 0x1f, 0xfa, 0x43, 0xd1, 0x7e, 0x44, 0x27, 0x6c, 0x91, 0x45, 0x81, 0x3e,
0x57, 0xd3, 0xa4, 0x8e, 0x71, 0x71, 0xaa, 0x9b, 0xb6, 0x4f, 0x5b, 0xaf, 0x5c, 0xd5, 0xef, 0x38, 0x79, 0xf1, 0xd7, 0x1f, 0x9f, 0x5f, 0x13, 0xdb, 0xca, 0xf8, 0xb1, 0xee, 0xec, 0x0e, 0x4e, 0xbd,
0x3c, 0x05, 0x32, 0x0b, 0x56, 0xa5, 0x2f, 0x42, 0xd9, 0x0f, 0xd4, 0x73, 0xa1, 0xfa, 0xe6, 0x12, 0x19, 0xa8, 0x7e, 0xd7, 0x61, 0x49, 0x21, 0xb5, 0x60, 0x49, 0x7a, 0x3c, 0x90, 0x7d, 0x5f, 0xbd,
0x05, 0x9c, 0xf1, 0x59, 0x9f, 0x0d, 0xd8, 0x5c, 0x58, 0x88, 0x6b, 0xd6, 0xf1, 0x65, 0x7c, 0x13, 0xe4, 0xaa, 0x6f, 0x2e, 0x20, 0xe1, 0x4c, 0xce, 0xfa, 0x42, 0x60, 0x2d, 0xd5, 0xcf, 0x2b, 0xd6,
0xe6, 0x4b, 0x78, 0xbd, 0x42, 0xee, 0x56, 0xa0, 0x78, 0x10, 0x45, 0xfb, 0x81, 0x83, 0x92, 0xad, 0xf1, 0x95, 0x24, 0x07, 0x6a, 0xbe, 0x13, 0x57, 0x2c, 0xe4, 0x3e, 0x5c, 0x4f, 0x69, 0xa2, 0x16,
0x03, 0x1c, 0xf9, 0xf8, 0x31, 0x44, 0x5b, 0xa1, 0x53, 0xce, 0xb4, 0xbe, 0x19, 0x90, 0xa3, 0x6e, 0x81, 0x4d, 0x8c, 0xbf, 0xd6, 0x28, 0xb8, 0x57, 0x81, 0x62, 0x3b, 0x0c, 0x77, 0x7c, 0x47, 0x48,
0xb3, 0x47, 0x50, 0x4c, 0x47, 0x99, 0xdd, 0x5c, 0x34, 0xde, 0x54, 0xc3, 0x4a, 0x65, 0xe1, 0xe4, 0xba, 0x02, 0xb0, 0xef, 0x89, 0x4f, 0x81, 0xb0, 0x95, 0x70, 0xca, 0x46, 0xeb, 0x3b, 0x81, 0x1c,
0xc7, 0x69, 0x1d, 0x42, 0xbe, 0xab, 0x22, 0x14, 0x03, 0x76, 0x6b, 0xd1, 0xec, 0x26, 0xff, 0x01, 0x1e, 0x0a, 0xfa, 0x04, 0x8a, 0xc9, 0xe5, 0x40, 0x6f, 0xa5, 0x5d, 0x18, 0x68, 0x78, 0xa5, 0x92,
0x95, 0xab, 0x36, 0xeb, 0xc6, 0x8e, 0xb1, 0xb7, 0xfb, 0xfd, 0xbc, 0x6a, 0x9c, 0x9d, 0x57, 0x8d, 0x7a, 0x97, 0x44, 0xcb, 0xef, 0x42, 0x7e, 0x4f, 0x85, 0x82, 0x0f, 0xe9, 0xed, 0xb4, 0xdb, 0x20,
0x9f, 0xe7, 0x55, 0xe3, 0xeb, 0x45, 0x35, 0x73, 0x76, 0x51, 0xcd, 0xfc, 0xb8, 0xa8, 0x66, 0x5e, 0xbe, 0x55, 0x2a, 0x97, 0x4d, 0xd6, 0xc9, 0x26, 0xd9, 0xde, 0xfa, 0x79, 0x56, 0x25, 0xa7, 0x67,
0xdf, 0xfe, 0xe7, 0x17, 0xe7, 0x38, 0x4f, 0xaf, 0x7b, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x57, 0x55, 0xf2, 0xe7, 0xac, 0x4a, 0xbe, 0x9d, 0x57, 0x8d, 0xd3, 0xf3, 0xaa, 0xf1, 0xfb, 0xbc, 0x6a,
0xb9, 0x93, 0x47, 0x9d, 0x06, 0x00, 0x00, 0xbc, 0xbd, 0xfb, 0xdf, 0xff, 0xb0, 0x83, 0x3c, 0xbe, 0x1e, 0xfc, 0x0d, 0x00, 0x00, 0xff, 0xff,
0x93, 0x27, 0x56, 0x41, 0xef, 0x06, 0x00, 0x00,
} }
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
@ -976,17 +1027,10 @@ func (m *ObjectSyncMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.PeerSignature) > 0 { if len(m.TrackingId) > 0 {
i -= len(m.PeerSignature) i -= len(m.TrackingId)
copy(dAtA[i:], m.PeerSignature) copy(dAtA[i:], m.TrackingId)
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.PeerSignature))) i = encodeVarintSpacesync(dAtA, i, uint64(len(m.TrackingId)))
i--
dAtA[i] = 0x32
}
if len(m.Identity) > 0 {
i -= len(m.Identity)
copy(dAtA[i:], m.Identity)
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Identity)))
i-- i--
dAtA[i] = 0x2a dAtA[i] = 0x2a
} }
@ -1126,6 +1170,27 @@ func (m *ObjectSyncContentValue_FullSyncResponse) MarshalToSizedBuffer(dAtA []by
} }
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *ObjectSyncContentValue_ErrorResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ObjectSyncContentValue_ErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.ErrorResponse != nil {
{
size, err := m.ErrorResponse.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSpacesync(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
return len(dAtA) - i, nil
}
func (m *ObjectHeadUpdate) Marshal() (dAtA []byte, err error) { func (m *ObjectHeadUpdate) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -1291,6 +1356,36 @@ func (m *ObjectFullSyncResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *ObjectErrorResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ObjectErrorResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ObjectErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Error) > 0 {
i -= len(m.Error)
copy(dAtA[i:], m.Error)
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Error)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int { func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int {
offset -= sovSpacesync(v) offset -= sovSpacesync(v)
base := offset base := offset
@ -1415,11 +1510,7 @@ func (m *ObjectSyncMessage) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovSpacesync(uint64(l)) n += 1 + l + sovSpacesync(uint64(l))
} }
l = len(m.Identity) l = len(m.TrackingId)
if l > 0 {
n += 1 + l + sovSpacesync(uint64(l))
}
l = len(m.PeerSignature)
if l > 0 { if l > 0 {
n += 1 + l + sovSpacesync(uint64(l)) n += 1 + l + sovSpacesync(uint64(l))
} }
@ -1474,6 +1565,18 @@ func (m *ObjectSyncContentValue_FullSyncResponse) Size() (n int) {
} }
return n return n
} }
func (m *ObjectSyncContentValue_ErrorResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.ErrorResponse != nil {
l = m.ErrorResponse.Size()
n += 1 + l + sovSpacesync(uint64(l))
}
return n
}
func (m *ObjectHeadUpdate) Size() (n int) { func (m *ObjectHeadUpdate) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -1555,6 +1658,19 @@ func (m *ObjectFullSyncResponse) Size() (n int) {
return n return n
} }
func (m *ObjectErrorResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Error)
if l > 0 {
n += 1 + l + sovSpacesync(uint64(l))
}
return n
}
func sovSpacesync(x uint64) (n int) { func sovSpacesync(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7 return (math_bits.Len64(x|1) + 6) / 7
} }
@ -2286,7 +2402,7 @@ func (m *ObjectSyncMessage) Unmarshal(dAtA []byte) error {
iNdEx = postIndex iNdEx = postIndex
case 5: case 5:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Identity", wireType) return fmt.Errorf("proto: wrong wireType = %d for field TrackingId", wireType)
} }
var stringLen uint64 var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -2314,39 +2430,7 @@ func (m *ObjectSyncMessage) Unmarshal(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Identity = string(dAtA[iNdEx:postIndex]) m.TrackingId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PeerSignature", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSpacesync
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSpacesync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PeerSignature = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
@ -2503,6 +2587,41 @@ func (m *ObjectSyncContentValue) Unmarshal(dAtA []byte) error {
} }
m.Value = &ObjectSyncContentValue_FullSyncResponse{v} m.Value = &ObjectSyncContentValue_FullSyncResponse{v}
iNdEx = postIndex iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ErrorResponse", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSpacesync
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSpacesync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &ObjectErrorResponse{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Value = &ObjectSyncContentValue_ErrorResponse{v}
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipSpacesync(dAtA[iNdEx:]) skippy, err := skipSpacesync(dAtA[iNdEx:])
@ -2968,6 +3087,88 @@ func (m *ObjectFullSyncResponse) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *ObjectErrorResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ObjectErrorResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ObjectErrorResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSpacesync
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSpacesync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Error = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSpacesync(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSpacesync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipSpacesync(dAtA []byte) (n int, err error) { func skipSpacesync(dAtA []byte) (n int, err error) {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0

View File

@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/sec" "github.com/libp2p/go-libp2p-core/sec"
"storj.io/drpc/drpcctx" "storj.io/drpc/drpcctx"
"sync" "sync"
"sync/atomic"
) )
var ErrEmptyPeer = errors.New("don't have such a peer") var ErrEmptyPeer = errors.New("don't have such a peer")
@ -17,17 +18,15 @@ const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams // StreamPool can be made generic to work with different streams
type StreamPool interface { type StreamPool interface {
SyncClient
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
HasStream(peerId string) bool HasStream(peerId string) bool
SyncClient
Close() (err error) Close() (err error)
} }
type SyncClient interface { type SyncClient interface {
SendSync(peerId string, SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
message *spacesyncproto.ObjectSyncMessage,
msgCheck func(syncMessage *spacesyncproto.ObjectSyncMessage) bool) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
} }
@ -35,8 +34,7 @@ type SyncClient interface {
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
type responseWaiter struct { type responseWaiter struct {
ch chan *spacesyncproto.ObjectSyncMessage ch chan *spacesyncproto.ObjectSyncMessage
msgCheck func(message *spacesyncproto.ObjectSyncMessage) bool
} }
type streamPool struct { type streamPool struct {
@ -46,6 +44,7 @@ type streamPool struct {
wg *sync.WaitGroup wg *sync.WaitGroup
waiters map[string]responseWaiter waiters map[string]responseWaiter
waitersMx sync.Mutex waitersMx sync.Mutex
counter uint64
} }
func newStreamPool(messageHandler MessageHandler) StreamPool { func newStreamPool(messageHandler MessageHandler) StreamPool {
@ -63,36 +62,23 @@ func (s *streamPool) HasStream(peerId string) (res bool) {
func (s *streamPool) SendSync( func (s *streamPool) SendSync(
peerId string, peerId string,
message *spacesyncproto.ObjectSyncMessage, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
msgCheck func(syncMessage *spacesyncproto.ObjectSyncMessage) bool) (reply *spacesyncproto.ObjectSyncMessage, err error) { newCounter := atomic.AddUint64(&s.counter, 1)
msg.TrackingId = genStreamPoolKey(peerId, msg.TreeId, newCounter)
sendAndWait := func(waiter responseWaiter) (err error) {
err = s.SendAsync(peerId, message)
if err != nil {
return
}
reply = <-waiter.ch
return
}
key := fmt.Sprintf("%s.%s", peerId, message.TreeId)
s.waitersMx.Lock() s.waitersMx.Lock()
waiter, exists := s.waiters[key] waiter := responseWaiter{
if exists { ch: make(chan *spacesyncproto.ObjectSyncMessage),
s.waitersMx.Unlock() }
s.waiters[msg.TrackingId] = waiter
s.waitersMx.Unlock()
err = sendAndWait(waiter) err = s.SendAsync(peerId, msg)
if err != nil {
return return
} }
waiter = responseWaiter{ reply = <-waiter.ch
ch: make(chan *spacesyncproto.ObjectSyncMessage),
msgCheck: msgCheck,
}
s.waiters[key] = waiter
s.waitersMx.Unlock()
err = sendAndWait(waiter)
return return
} }
@ -189,17 +175,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
} }
process := func(msg *spacesyncproto.ObjectSyncMessage) { process := func(msg *spacesyncproto.ObjectSyncMessage) {
key := fmt.Sprintf("%s.%s", peerId, msg.TreeId) if msg.TrackingId == "" {
s.waitersMx.Lock() s.messageHandler(stream.Context(), peerId, msg)
waiter, exists := s.waiters[key] return
}
if !exists || !waiter.msgCheck(msg) { s.waitersMx.Lock()
waiter, exists := s.waiters[msg.TrackingId]
if !exists {
s.waitersMx.Unlock() s.waitersMx.Unlock()
s.messageHandler(stream.Context(), peerId, msg) s.messageHandler(stream.Context(), peerId, msg)
return return
} }
delete(s.waiters, key) delete(s.waiters, msg.TrackingId)
s.waitersMx.Unlock() s.waitersMx.Unlock()
waiter.ch <- msg waiter.ch <- msg
} }
@ -216,10 +206,8 @@ Loop:
break Loop break Loop
} }
go func() { go func() {
defer func() {
limiter <- struct{}{}
}()
process(msg) process(msg)
limiter <- struct{}{}
}() }()
} }
return s.removePeer(peerId) return s.removePeer(peerId)
@ -244,3 +232,7 @@ func GetPeerIdFromStreamContext(ctx context.Context) (string, error) {
return conn.RemotePeer().String(), nil return conn.RemotePeer().String(), nil
} }
func genStreamPoolKey(peerId, treeId string, counter uint64) string {
return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter)
}

View File

@ -25,15 +25,15 @@ func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandl
} }
} }
func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error { func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) error {
msg := message.GetContent() content := msg.GetContent()
switch { switch {
case msg.GetFullSyncRequest() != nil: case content.GetFullSyncRequest() != nil:
return s.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), message.GetTreeHeader(), message.GetTreeId()) return s.HandleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), msg)
case msg.GetFullSyncResponse() != nil: case content.GetFullSyncResponse() != nil:
return s.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), message.GetTreeHeader(), message.GetTreeId()) return s.HandleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse(), msg)
case msg.GetHeadUpdate() != nil: case content.GetHeadUpdate() != nil:
return s.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), message.GetTreeHeader(), message.GetTreeId()) return s.HandleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), msg)
} }
return nil return nil
} }
@ -42,15 +42,13 @@ func (s *syncHandler) HandleHeadUpdate(
ctx context.Context, ctx context.Context,
senderId string, senderId string,
update *spacesyncproto.ObjectHeadUpdate, update *spacesyncproto.ObjectHeadUpdate,
header *aclpb.TreeHeader, msg *spacesyncproto.ObjectSyncMessage) (err error) {
treeId string) (err error) {
var ( var (
fullRequest *spacesyncproto.ObjectFullSyncRequest fullRequest *spacesyncproto.ObjectFullSyncRequest
result tree.AddResult result tree.AddResult
) )
res, err := s.treeCache.GetTree(ctx, msg.TreeId)
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil { if err != nil {
return return
} }
@ -81,7 +79,8 @@ func (s *syncHandler) HandleHeadUpdate(
}() }()
if fullRequest != nil { if fullRequest != nil {
return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) return s.syncClient.SendAsync(senderId,
spacesyncproto.WrapFullRequest(fullRequest, msg.TreeHeader, msg.TreeId, msg.TrackingId))
} }
return return
} }
@ -90,12 +89,18 @@ func (s *syncHandler) HandleFullSyncRequest(
ctx context.Context, ctx context.Context,
senderId string, senderId string,
request *spacesyncproto.ObjectFullSyncRequest, request *spacesyncproto.ObjectFullSyncRequest,
header *aclpb.TreeHeader, msg *spacesyncproto.ObjectSyncMessage) (err error) {
treeId string) (err error) { var (
fullResponse *spacesyncproto.ObjectFullSyncResponse
header = msg.TreeHeader
)
defer func() {
if err != nil {
s.syncClient.SendAsync(senderId, spacesyncproto.WrapError(err, header, msg.TreeId, msg.TrackingId))
}
}()
var fullResponse *spacesyncproto.ObjectFullSyncResponse res, err := s.treeCache.GetTree(ctx, msg.TreeId)
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil { if err != nil {
return return
} }
@ -106,29 +111,32 @@ func (s *syncHandler) HandleFullSyncRequest(
defer res.Release() defer res.Release()
defer objTree.Unlock() defer objTree.Unlock()
if header == nil {
header = objTree.Header()
}
_, err = objTree.AddRawChanges(ctx, request.Changes...) _, err = objTree.AddRawChanges(ctx, request.Changes...)
if err != nil { if err != nil {
return err return err
} }
fullResponse, err = s.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) fullResponse, err = s.prepareFullSyncResponse(request.SnapshotPath, request.Heads, objTree)
return err return err
}() }()
if err != nil { if err != nil {
return return
} }
return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId)) return s.syncClient.SendAsync(senderId,
spacesyncproto.WrapFullResponse(fullResponse, header, msg.TreeId, msg.TrackingId))
} }
func (s *syncHandler) HandleFullSyncResponse( func (s *syncHandler) HandleFullSyncResponse(
ctx context.Context, ctx context.Context,
senderId string, senderId string,
response *spacesyncproto.ObjectFullSyncResponse, response *spacesyncproto.ObjectFullSyncResponse,
header *aclpb.TreeHeader, msg *spacesyncproto.ObjectSyncMessage) (err error) {
treeId string) (err error) { res, err := s.treeCache.GetTree(ctx, msg.TreeId)
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil { if err != nil {
return return
} }
@ -173,7 +181,6 @@ func (s *syncHandler) prepareFullSyncRequest(
} }
func (s *syncHandler) prepareFullSyncResponse( func (s *syncHandler) prepareFullSyncResponse(
treeId string,
theirPath, theirPath,
theirHeads []string, theirHeads []string,
t tree.ObjectTree) (*spacesyncproto.ObjectFullSyncResponse, error) { t tree.ObjectTree) (*spacesyncproto.ObjectFullSyncResponse, error) {

View File

@ -12,39 +12,77 @@ import (
type SyncService interface { type SyncService interface {
NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool StreamPool() StreamPool
Init()
Close() (err error) Close() (err error)
} }
type HeadNotifiable interface {
UpdateHeads(id string, heads []string)
}
const respPeersStreamCheckInterval = time.Second * 10 const respPeersStreamCheckInterval = time.Second * 10
type syncService struct { type syncService struct {
spaceId string
syncHandler SyncHandler syncHandler SyncHandler
streamPool StreamPool streamPool StreamPool
headNotifiable HeadNotifiable
configuration nodeconf.Configuration configuration nodeconf.Configuration
spaceId string
streamLoopCtx context.Context streamLoopCtx context.Context
stopStreamLoop context.CancelFunc stopStreamLoop context.CancelFunc
streamLoopDone chan struct{}
} }
func (s *syncService) Run() { func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
var syncHandler SyncHandler
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message)
})
syncHandler = newSyncHandler(cache, streamPool)
return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration)
}
func newSyncService(
spaceId string,
headNotifiable HeadNotifiable,
syncHandler SyncHandler,
streamPool StreamPool,
configuration nodeconf.Configuration) *syncService {
return &syncService{
syncHandler: syncHandler,
streamPool: streamPool,
headNotifiable: headNotifiable,
configuration: configuration,
spaceId: spaceId,
streamLoopDone: make(chan struct{}),
}
}
func (s *syncService) Init() {
s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background()) s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background())
s.streamCheckLoop(s.streamLoopCtx) go s.responsibleStreamCheckLoop(s.streamLoopCtx)
} }
func (s *syncService) Close() (err error) { func (s *syncService) Close() (err error) {
s.stopStreamLoop() s.stopStreamLoop()
<-s.streamLoopDone
return s.streamPool.Close() return s.streamPool.Close()
} }
func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId)) s.headNotifiable.UpdateHeads(treeId, update.Heads)
return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId, ""))
} }
func (s *syncService) streamCheckLoop(ctx context.Context) { func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
for { defer close(s.streamLoopDone)
checkResponsiblePeers := func() {
respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId) respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId)
if err != nil { if err != nil {
continue return
} }
for _, peer := range respPeers { for _, peer := range respPeers {
if s.streamPool.HasStream(peer.Id()) { if s.streamPool.HasStream(peer.Id()) {
@ -58,11 +96,17 @@ func (s *syncService) streamCheckLoop(ctx context.Context) {
s.streamPool.AddAndReadStreamAsync(stream) s.streamPool.AddAndReadStreamAsync(stream)
} }
}
checkResponsiblePeers()
ticker := time.NewTicker(respPeersStreamCheckInterval)
defer ticker.Stop()
for {
select { select {
case <-time.After(respPeersStreamCheckInterval): case <-s.streamLoopCtx.Done():
break
case <-ctx.Done():
return return
case <-ticker.C:
checkResponsiblePeers()
} }
} }
} }
@ -70,25 +114,3 @@ func (s *syncService) streamCheckLoop(ctx context.Context) {
func (s *syncService) StreamPool() StreamPool { func (s *syncService) StreamPool() StreamPool {
return s.streamPool return s.streamPool
} }
func NewSyncService(spaceId string, cache cache.TreeCache, configuration nodeconf.Configuration) SyncService {
var syncHandler SyncHandler
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return syncHandler.HandleMessage(ctx, senderId, message)
})
syncHandler = newSyncHandler(cache, streamPool)
return newSyncService(spaceId, syncHandler, streamPool, configuration)
}
func newSyncService(
spaceId string,
syncHandler SyncHandler,
streamPool StreamPool,
configuration nodeconf.Configuration) *syncService {
return &syncService{
syncHandler: syncHandler,
streamPool: streamPool,
configuration: configuration,
spaceId: spaceId,
}
}