Send error response on sync (wip)
This commit is contained in:
parent
07b16554cd
commit
c64921a249
@ -20,7 +20,7 @@ type syncClient struct {
|
|||||||
spaceId string
|
spaceId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncClient(
|
func NewSyncClient(
|
||||||
spaceId string,
|
spaceId string,
|
||||||
peerManager peermanager.PeerManager,
|
peerManager peermanager.PeerManager,
|
||||||
factory RequestFactory) SyncClient {
|
factory RequestFactory) SyncClient {
|
||||||
|
|||||||
@ -54,7 +54,7 @@ type syncTree struct {
|
|||||||
|
|
||||||
var log = logger.NewNamed("common.commonspace.synctree")
|
var log = logger.NewNamed("common.commonspace.synctree")
|
||||||
|
|
||||||
var createSyncClient = newSyncClient
|
var createSyncClient = NewSyncClient
|
||||||
|
|
||||||
type ResponsiblePeersGetter interface {
|
type ResponsiblePeersGetter interface {
|
||||||
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
|
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
|
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"time"
|
"time"
|
||||||
@ -117,9 +118,16 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if resp.GetContent().GetFullSyncResponse() == nil {
|
switch {
|
||||||
|
case resp.GetContent().GetErrorResponse() != nil:
|
||||||
|
errResp := resp.GetContent().GetErrorResponse()
|
||||||
|
err = rpcerr.Err(errResp.ErrCode)
|
||||||
|
return
|
||||||
|
case resp.GetContent().GetFullSyncResponse() == nil:
|
||||||
err = fmt.Errorf("expected to get full sync response, but got something else")
|
err = fmt.Errorf("expected to get full sync response, but got something else")
|
||||||
return
|
return
|
||||||
|
default:
|
||||||
|
break
|
||||||
}
|
}
|
||||||
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
||||||
|
|
||||||
|
|||||||
@ -244,7 +244,7 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra
|
|||||||
|
|
||||||
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
|
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
|
||||||
factory := GetRequestFactory()
|
factory := GetRequestFactory()
|
||||||
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
|
syncClient := NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
|
||||||
netTree := &broadcastTree{
|
netTree := &broadcastTree{
|
||||||
ObjectTree: objTree,
|
ObjectTree: objTree,
|
||||||
SyncClient: syncClient,
|
SyncClient: syncClient,
|
||||||
@ -255,7 +255,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo
|
|||||||
|
|
||||||
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
|
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
|
||||||
factory := GetRequestFactory()
|
factory := GetRequestFactory()
|
||||||
syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
|
syncClient := NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
|
||||||
|
|
||||||
batcher := mb.New[processMsg](0)
|
batcher := mb.New[processMsg](0)
|
||||||
return &processSyncHandler{
|
return &processSyncHandler{
|
||||||
|
|||||||
@ -94,7 +94,7 @@ message TreeFullSyncResponse {
|
|||||||
|
|
||||||
// TreeErrorResponse is an error sent as a response for a full sync request
|
// TreeErrorResponse is an error sent as a response for a full sync request
|
||||||
message TreeErrorResponse {
|
message TreeErrorResponse {
|
||||||
string error = 1;
|
uint64 errCode = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
|
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
|
||||||
|
|||||||
@ -691,7 +691,7 @@ func (m *TreeFullSyncResponse) GetSnapshotPath() []string {
|
|||||||
|
|
||||||
// TreeErrorResponse is an error sent as a response for a full sync request
|
// TreeErrorResponse is an error sent as a response for a full sync request
|
||||||
type TreeErrorResponse struct {
|
type TreeErrorResponse struct {
|
||||||
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
|
ErrCode uint64 `protobuf:"varint,1,opt,name=errCode,proto3" json:"errCode,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TreeErrorResponse) Reset() { *m = TreeErrorResponse{} }
|
func (m *TreeErrorResponse) Reset() { *m = TreeErrorResponse{} }
|
||||||
@ -727,11 +727,11 @@ func (m *TreeErrorResponse) XXX_DiscardUnknown() {
|
|||||||
|
|
||||||
var xxx_messageInfo_TreeErrorResponse proto.InternalMessageInfo
|
var xxx_messageInfo_TreeErrorResponse proto.InternalMessageInfo
|
||||||
|
|
||||||
func (m *TreeErrorResponse) GetError() string {
|
func (m *TreeErrorResponse) GetErrCode() uint64 {
|
||||||
if m != nil {
|
if m != nil {
|
||||||
return m.Error
|
return m.ErrCode
|
||||||
}
|
}
|
||||||
return ""
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
|
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
|
||||||
@ -806,50 +806,50 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var fileDescriptor_5033f0301ef9b772 = []byte{
|
var fileDescriptor_5033f0301ef9b772 = []byte{
|
||||||
// 677 bytes of a gzipped FileDescriptorProto
|
// 681 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0xcf, 0x4f, 0xd4, 0x40,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0x4d, 0x6f, 0xd3, 0x40,
|
||||||
0x14, 0xee, 0x74, 0x81, 0xb2, 0x8f, 0x05, 0x74, 0xe0, 0xd0, 0x10, 0xad, 0x4d, 0x63, 0x74, 0xbd,
|
0x10, 0xf5, 0x3a, 0x69, 0xdd, 0x4c, 0xd3, 0x16, 0xb6, 0x3d, 0x58, 0x15, 0x18, 0xcb, 0x42, 0x90,
|
||||||
0x40, 0x82, 0x27, 0x8d, 0x09, 0x11, 0x04, 0x77, 0x43, 0x34, 0x64, 0x40, 0x4c, 0xbc, 0x0d, 0xed,
|
0x0b, 0xad, 0x54, 0x4e, 0x20, 0xa4, 0x8a, 0x86, 0x96, 0x44, 0x15, 0xa8, 0xda, 0x96, 0x22, 0x71,
|
||||||
0xc0, 0xd6, 0xec, 0x76, 0x6a, 0x67, 0x56, 0xb2, 0x7f, 0x80, 0x17, 0x4d, 0x88, 0xff, 0x92, 0x37,
|
0xdb, 0xda, 0xdb, 0xc6, 0x28, 0xf1, 0x1a, 0xef, 0x86, 0x2a, 0x3f, 0x80, 0x0b, 0x48, 0x15, 0x7f,
|
||||||
0x8f, 0x1c, 0x39, 0x1a, 0xf6, 0x1f, 0x31, 0x9d, 0x69, 0xb7, 0x3f, 0x76, 0x0f, 0xdc, 0xb8, 0x74,
|
0x89, 0x1b, 0xc7, 0x1e, 0x7b, 0x44, 0xcd, 0x1f, 0x41, 0xde, 0x8d, 0xe3, 0x8f, 0xe4, 0xd0, 0x5b,
|
||||||
0xf7, 0x7d, 0x7d, 0xef, 0x7b, 0xdf, 0xfb, 0xe6, 0x47, 0x61, 0xc7, 0xe7, 0x83, 0x01, 0x8f, 0x44,
|
0x2f, 0x4e, 0xe6, 0x79, 0xe6, 0xcd, 0x9b, 0xb7, 0x1f, 0x86, 0x5d, 0x9f, 0x0f, 0x06, 0x3c, 0x12,
|
||||||
0x4c, 0x7d, 0xb6, 0xc5, 0xcf, 0xbe, 0x32, 0x5f, 0x6e, 0xc9, 0x84, 0x31, 0xf5, 0xf0, 0x7b, 0x34,
|
0x31, 0xf5, 0xd9, 0x36, 0x3f, 0xfb, 0xca, 0x7c, 0xb9, 0x2d, 0x13, 0xc6, 0xd4, 0xc3, 0xef, 0xd1,
|
||||||
0xba, 0x60, 0x71, 0xc2, 0x25, 0xdf, 0x52, 0x4f, 0x51, 0x82, 0x37, 0x15, 0x82, 0xa1, 0x40, 0xbc,
|
0xe8, 0x82, 0xc5, 0x09, 0x97, 0x7c, 0x5b, 0x3d, 0x45, 0x01, 0xde, 0x52, 0x08, 0x86, 0x1c, 0xf1,
|
||||||
0x1b, 0x04, 0x40, 0x38, 0x97, 0x7b, 0x2a, 0xc4, 0x8f, 0xa0, 0x49, 0xfd, 0x7e, 0x87, 0xd1, 0xa0,
|
0x6e, 0x10, 0x00, 0xe1, 0x5c, 0xb6, 0x55, 0x88, 0x1f, 0x41, 0x83, 0xfa, 0xfd, 0x0e, 0xa3, 0x41,
|
||||||
0x1b, 0xd8, 0xc8, 0x45, 0xed, 0x26, 0x29, 0x00, 0x6c, 0x83, 0xa5, 0xba, 0x76, 0x03, 0xdb, 0x54,
|
0x37, 0xb0, 0x91, 0x8b, 0x5a, 0x0d, 0x92, 0x03, 0xd8, 0x06, 0x4b, 0x75, 0xed, 0x06, 0xb6, 0xa9,
|
||||||
0xef, 0xf2, 0x10, 0x3b, 0x00, 0x9a, 0xf0, 0x64, 0x14, 0x33, 0xbb, 0xa1, 0x5e, 0x96, 0x90, 0x94,
|
0xde, 0x65, 0x21, 0x76, 0x00, 0x34, 0xe1, 0xc9, 0x28, 0x66, 0x76, 0x4d, 0xbd, 0x2c, 0x20, 0x29,
|
||||||
0x57, 0x86, 0x03, 0x26, 0x24, 0x1d, 0xc4, 0xf6, 0x9c, 0x8b, 0xda, 0x0d, 0x52, 0x00, 0x18, 0xc3,
|
0xaf, 0x0c, 0x07, 0x4c, 0x48, 0x3a, 0x88, 0xed, 0xba, 0x8b, 0x5a, 0x35, 0x92, 0x03, 0x18, 0x43,
|
||||||
0x9c, 0x60, 0x2c, 0xb0, 0xe7, 0x5d, 0xd4, 0x6e, 0x11, 0xf5, 0x1f, 0x6f, 0xc0, 0x62, 0x18, 0xb0,
|
0x5d, 0x30, 0x16, 0xd8, 0x0b, 0x2e, 0x6a, 0x35, 0x89, 0xfa, 0x8f, 0x37, 0x61, 0x29, 0x0c, 0x58,
|
||||||
0x48, 0x86, 0x72, 0x64, 0x2f, 0x28, 0x7c, 0x12, 0xe3, 0xa7, 0xb0, 0xac, 0xb9, 0x8f, 0xe8, 0xa8,
|
0x24, 0x43, 0x39, 0xb2, 0x17, 0x15, 0x3e, 0x8d, 0xf1, 0x53, 0x58, 0xd1, 0xdc, 0x47, 0x74, 0xd4,
|
||||||
0xcf, 0x69, 0x60, 0x5b, 0x2a, 0xa1, 0x0a, 0x7a, 0x57, 0x26, 0xc0, 0x49, 0xc2, 0x58, 0x36, 0x9a,
|
0xe7, 0x34, 0xb0, 0x2d, 0x95, 0x50, 0x06, 0xbd, 0x2b, 0x13, 0xe0, 0x24, 0x61, 0x6c, 0x32, 0x9a,
|
||||||
0x0b, 0x4b, 0xe9, 0xdc, 0x7a, 0x14, 0x61, 0x23, 0xb7, 0xd1, 0x6e, 0x92, 0x32, 0x54, 0x1d, 0xde,
|
0x0b, 0xcb, 0xe9, 0xdc, 0x7a, 0x14, 0x61, 0x23, 0xb7, 0xd6, 0x6a, 0x90, 0x22, 0x54, 0x1e, 0xde,
|
||||||
0xac, 0x0f, 0xff, 0x0c, 0x56, 0x44, 0x44, 0x63, 0xd1, 0xe3, 0x72, 0x97, 0x8a, 0xd4, 0x03, 0x3d,
|
0xac, 0x0e, 0xff, 0x0c, 0x56, 0x45, 0x44, 0x63, 0xd1, 0xe3, 0x72, 0x8f, 0x8a, 0xd4, 0x03, 0x3d,
|
||||||
0x66, 0x0d, 0x4d, 0xfb, 0x68, 0x1d, 0xe2, 0x1d, 0x95, 0x54, 0x0d, 0xdb, 0x22, 0x65, 0x28, 0xed,
|
0x66, 0x05, 0x4d, 0xfb, 0x68, 0x1d, 0xe2, 0x1d, 0x95, 0x54, 0x0d, 0xdb, 0x24, 0x45, 0x28, 0xed,
|
||||||
0x93, 0x30, 0x1a, 0x1c, 0xb2, 0x51, 0x57, 0xcf, 0xdc, 0x24, 0x05, 0x50, 0xb5, 0x6a, 0xa1, 0x6e,
|
0x93, 0x30, 0x1a, 0x1c, 0xb2, 0x51, 0x57, 0xcf, 0xdc, 0x20, 0x39, 0x50, 0xb6, 0x6a, 0xb1, 0x6a,
|
||||||
0x55, 0xd9, 0x16, 0xab, 0x66, 0x8b, 0x03, 0x10, 0x8a, 0xe3, 0x4c, 0x8d, 0xbd, 0xe8, 0xa2, 0xf6,
|
0x55, 0xd1, 0x16, 0xab, 0x62, 0x8b, 0x03, 0x10, 0x8a, 0xe3, 0x89, 0x1a, 0x7b, 0xc9, 0x45, 0xad,
|
||||||
0x22, 0x29, 0x21, 0xde, 0x7b, 0x58, 0x26, 0xf4, 0xb2, 0x64, 0x89, 0x0d, 0x56, 0x9c, 0x39, 0x88,
|
0x25, 0x52, 0x40, 0xbc, 0xf7, 0xb0, 0x42, 0xe8, 0x65, 0xc1, 0x12, 0x1b, 0xac, 0x78, 0xe2, 0x20,
|
||||||
0x14, 0x57, 0x1e, 0xa6, 0x22, 0x44, 0x78, 0x11, 0x51, 0x39, 0x4c, 0x98, 0xb2, 0xa2, 0x45, 0x0a,
|
0x52, 0x5c, 0x59, 0x98, 0x8a, 0x10, 0xe1, 0x45, 0x44, 0xe5, 0x30, 0x61, 0xca, 0x8a, 0x26, 0xc9,
|
||||||
0xc0, 0xdb, 0x83, 0xb5, 0x0a, 0xd1, 0xe7, 0x50, 0xf6, 0xb4, 0xf2, 0x84, 0x5e, 0x6a, 0x28, 0x23,
|
0x01, 0xaf, 0x0d, 0xeb, 0x25, 0xa2, 0xcf, 0xa1, 0xec, 0x69, 0xe5, 0x09, 0xbd, 0xd4, 0xd0, 0x84,
|
||||||
0x2c, 0x00, 0xbc, 0x02, 0x66, 0x98, 0xdb, 0x6a, 0x86, 0x81, 0x77, 0x85, 0x60, 0x35, 0xa5, 0x38,
|
0x30, 0x07, 0xf0, 0x2a, 0x98, 0x61, 0x66, 0xab, 0x19, 0x06, 0xde, 0x15, 0x82, 0xb5, 0x94, 0xe2,
|
||||||
0x1e, 0x45, 0xfe, 0x07, 0x26, 0x04, 0xbd, 0x60, 0xf8, 0x35, 0x58, 0x3e, 0x8f, 0x24, 0x8b, 0xa4,
|
0x78, 0x14, 0xf9, 0x1f, 0x98, 0x10, 0xf4, 0x82, 0xe1, 0xd7, 0x60, 0xf9, 0x3c, 0x92, 0x2c, 0x92,
|
||||||
0xaa, 0x5f, 0xda, 0x76, 0x37, 0x4b, 0xbb, 0x37, 0xcf, 0xde, 0xd3, 0x29, 0xa7, 0xb4, 0x3f, 0x64,
|
0xaa, 0x7e, 0x79, 0xc7, 0xdd, 0x2a, 0xec, 0xde, 0x2c, 0xbb, 0xad, 0x53, 0x4e, 0x69, 0x7f, 0xc8,
|
||||||
0x24, 0x2f, 0xc0, 0x3b, 0x00, 0xc9, 0x64, 0x23, 0xab, 0x3e, 0x4b, 0xdb, 0x4f, 0xca, 0xe5, 0x33,
|
0x48, 0x56, 0x80, 0x77, 0x01, 0x92, 0xe9, 0x46, 0x56, 0x7d, 0x96, 0x77, 0x9e, 0x14, 0xcb, 0xe7,
|
||||||
0x24, 0x93, 0x52, 0x89, 0xf7, 0xc7, 0x84, 0xf5, 0x59, 0x2d, 0xf0, 0x1b, 0x80, 0x1e, 0xa3, 0xc1,
|
0x48, 0x26, 0x85, 0x12, 0xef, 0x8f, 0x09, 0x1b, 0xf3, 0x5a, 0xe0, 0x37, 0x00, 0x3d, 0x46, 0x83,
|
||||||
0xa7, 0x38, 0xa0, 0x92, 0x65, 0xc2, 0x36, 0xea, 0xc2, 0x3a, 0x93, 0x8c, 0x8e, 0x41, 0x4a, 0xf9,
|
0x4f, 0x71, 0x40, 0x25, 0x9b, 0x08, 0xdb, 0xac, 0x0a, 0xeb, 0x4c, 0x33, 0x3a, 0x06, 0x29, 0xe4,
|
||||||
0xf8, 0x10, 0x56, 0xcf, 0x87, 0xfd, 0x7e, 0xca, 0x4a, 0xd8, 0xb7, 0x21, 0x13, 0x72, 0x96, 0xb8,
|
0xe3, 0x43, 0x58, 0x3b, 0x1f, 0xf6, 0xfb, 0x29, 0x2b, 0x61, 0xdf, 0x86, 0x4c, 0xc8, 0x79, 0xe2,
|
||||||
0x94, 0xe2, 0xa0, 0x9a, 0xd6, 0x31, 0x48, 0xbd, 0x12, 0x7f, 0x84, 0x07, 0x05, 0x24, 0x62, 0x1e,
|
0x52, 0x8a, 0x83, 0x72, 0x5a, 0xc7, 0x20, 0xd5, 0x4a, 0xfc, 0x11, 0x1e, 0xe4, 0x90, 0x88, 0x79,
|
||||||
0x09, 0x7d, 0xda, 0x66, 0x38, 0x75, 0x50, 0xcb, 0xeb, 0x18, 0x64, 0xaa, 0x16, 0xef, 0xc3, 0x32,
|
0x24, 0xf4, 0x69, 0x9b, 0xe3, 0xd4, 0x41, 0x25, 0xaf, 0x63, 0x90, 0x99, 0x5a, 0xbc, 0x0f, 0x2b,
|
||||||
0x4b, 0x12, 0x9e, 0x4c, 0xc8, 0xe6, 0x14, 0xd9, 0xe3, 0x3a, 0xd9, 0x7e, 0x39, 0xa9, 0x63, 0x90,
|
0x2c, 0x49, 0x78, 0x32, 0x25, 0xab, 0x2b, 0xb2, 0xc7, 0x55, 0xb2, 0xfd, 0x62, 0x52, 0xc7, 0x20,
|
||||||
0x6a, 0xd5, 0xae, 0x05, 0xf3, 0xdf, 0x53, 0xab, 0xbc, 0x1f, 0x08, 0x56, 0xaa, 0x6e, 0xe0, 0x75,
|
0xe5, 0xaa, 0x3d, 0x0b, 0x16, 0xbe, 0xa7, 0x56, 0x79, 0x3f, 0x10, 0xac, 0x96, 0xdd, 0xc0, 0x1b,
|
||||||
0x98, 0x4f, 0xdd, 0xc8, 0x4f, 0x9c, 0x0e, 0xf0, 0x2b, 0xb0, 0xb2, 0x23, 0x61, 0x9b, 0x6e, 0xe3,
|
0xb0, 0x90, 0xba, 0x91, 0x9d, 0x38, 0x1d, 0xe0, 0x57, 0x60, 0x4d, 0x8e, 0x84, 0x6d, 0xba, 0xb5,
|
||||||
0x2e, 0x4b, 0x95, 0xe7, 0x63, 0x0f, 0x5a, 0xf9, 0x91, 0x3b, 0xa2, 0xb2, 0x67, 0x37, 0x14, 0x6f,
|
0xbb, 0x2c, 0x55, 0x96, 0x8f, 0x3d, 0x68, 0x66, 0x47, 0xee, 0x88, 0xca, 0x9e, 0x5d, 0x53, 0xbc,
|
||||||
0x05, 0xf3, 0x7e, 0x22, 0x58, 0x9b, 0x61, 0xe9, 0xfd, 0x88, 0xf9, 0x85, 0xf4, 0xc6, 0xaa, 0xaf,
|
0x25, 0xcc, 0xfb, 0x89, 0x60, 0x7d, 0x8e, 0xa5, 0xf7, 0x23, 0xe6, 0x17, 0xd2, 0x1b, 0xab, 0xba,
|
||||||
0xc8, 0xfd, 0xa8, 0x79, 0x01, 0x0f, 0xa7, 0x56, 0x34, 0x55, 0xa2, 0x56, 0x34, 0xbb, 0xf3, 0x75,
|
0x22, 0xf7, 0xa3, 0xe6, 0x05, 0x3c, 0x9c, 0x59, 0xd1, 0xf4, 0x26, 0x60, 0x49, 0xd2, 0xe6, 0x81,
|
||||||
0xe0, 0x9d, 0xea, 0xc5, 0xd4, 0xbd, 0xba, 0xd1, 0x39, 0xaf, 0xdd, 0xf3, 0x68, 0xea, 0x9e, 0x9f,
|
0xde, 0xdf, 0x75, 0x92, 0x85, 0xde, 0xa9, 0x5e, 0x50, 0xdd, 0xaf, 0x1b, 0x9d, 0xf3, 0xca, 0x5d,
|
||||||
0xba, 0x99, 0xcd, 0x19, 0x37, 0xf3, 0xee, 0xdb, 0xbf, 0xb7, 0x0e, 0xba, 0xbe, 0x75, 0xd0, 0xbf,
|
0x8f, 0x66, 0xee, 0xfa, 0x99, 0xdb, 0xd9, 0x9c, 0x73, 0x3b, 0xef, 0xbd, 0xfd, 0x7b, 0xeb, 0xa0,
|
||||||
0x5b, 0x07, 0xfd, 0x1e, 0x3b, 0xc6, 0xf5, 0xd8, 0x31, 0x6e, 0xc6, 0x8e, 0xf1, 0xe5, 0xf9, 0x1d,
|
0xeb, 0x5b, 0x07, 0xfd, 0xbb, 0x75, 0xd0, 0xef, 0xb1, 0x63, 0x5c, 0x8f, 0x1d, 0xe3, 0x66, 0xec,
|
||||||
0xbf, 0x6d, 0x67, 0x0b, 0xea, 0xe7, 0xe5, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0xbc, 0xc0, 0xf7,
|
0x18, 0x5f, 0x9e, 0xdf, 0xf1, 0xfb, 0x76, 0xb6, 0xa8, 0x7e, 0x5e, 0xfe, 0x0f, 0x00, 0x00, 0xff,
|
||||||
0x30, 0x0d, 0x07, 0x00, 0x00,
|
0xff, 0x28, 0x34, 0x11, 0x90, 0x11, 0x07, 0x00, 0x00,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *RootChange) Marshal() (dAtA []byte, err error) {
|
func (m *RootChange) Marshal() (dAtA []byte, err error) {
|
||||||
@ -1426,12 +1426,10 @@ func (m *TreeErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|||||||
_ = i
|
_ = i
|
||||||
var l int
|
var l int
|
||||||
_ = l
|
_ = l
|
||||||
if len(m.Error) > 0 {
|
if m.ErrCode != 0 {
|
||||||
i -= len(m.Error)
|
i = encodeVarintTreechange(dAtA, i, uint64(m.ErrCode))
|
||||||
copy(dAtA[i:], m.Error)
|
|
||||||
i = encodeVarintTreechange(dAtA, i, uint64(len(m.Error)))
|
|
||||||
i--
|
i--
|
||||||
dAtA[i] = 0xa
|
dAtA[i] = 0x8
|
||||||
}
|
}
|
||||||
return len(dAtA) - i, nil
|
return len(dAtA) - i, nil
|
||||||
}
|
}
|
||||||
@ -1759,9 +1757,8 @@ func (m *TreeErrorResponse) Size() (n int) {
|
|||||||
}
|
}
|
||||||
var l int
|
var l int
|
||||||
_ = l
|
_ = l
|
||||||
l = len(m.Error)
|
if m.ErrCode != 0 {
|
||||||
if l > 0 {
|
n += 1 + sovTreechange(uint64(m.ErrCode))
|
||||||
n += 1 + l + sovTreechange(uint64(l))
|
|
||||||
}
|
}
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
@ -3361,10 +3358,10 @@ func (m *TreeErrorResponse) Unmarshal(dAtA []byte) error {
|
|||||||
}
|
}
|
||||||
switch fieldNum {
|
switch fieldNum {
|
||||||
case 1:
|
case 1:
|
||||||
if wireType != 2 {
|
if wireType != 0 {
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
|
return fmt.Errorf("proto: wrong wireType = %d for field ErrCode", wireType)
|
||||||
}
|
}
|
||||||
var stringLen uint64
|
m.ErrCode = 0
|
||||||
for shift := uint(0); ; shift += 7 {
|
for shift := uint(0); ; shift += 7 {
|
||||||
if shift >= 64 {
|
if shift >= 64 {
|
||||||
return ErrIntOverflowTreechange
|
return ErrIntOverflowTreechange
|
||||||
@ -3374,24 +3371,11 @@ func (m *TreeErrorResponse) Unmarshal(dAtA []byte) error {
|
|||||||
}
|
}
|
||||||
b := dAtA[iNdEx]
|
b := dAtA[iNdEx]
|
||||||
iNdEx++
|
iNdEx++
|
||||||
stringLen |= uint64(b&0x7F) << shift
|
m.ErrCode |= uint64(b&0x7F) << shift
|
||||||
if b < 0x80 {
|
if b < 0x80 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
intStringLen := int(stringLen)
|
|
||||||
if intStringLen < 0 {
|
|
||||||
return ErrInvalidLengthTreechange
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + intStringLen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthTreechange
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Error = string(dAtA[iNdEx:postIndex])
|
|
||||||
iNdEx = postIndex
|
|
||||||
default:
|
default:
|
||||||
iNdEx = preIndex
|
iNdEx = preIndex
|
||||||
skippy, err := skipTreechange(dAtA[iNdEx:])
|
skippy, err := skipTreechange(dAtA[iNdEx:])
|
||||||
|
|||||||
@ -2,6 +2,9 @@ package objectsync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -23,7 +26,6 @@ type ObjectSync interface {
|
|||||||
synchandler.SyncHandler
|
synchandler.SyncHandler
|
||||||
MessagePool() MessagePool
|
MessagePool() MessagePool
|
||||||
|
|
||||||
Init()
|
|
||||||
Close() (err error)
|
Close() (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,6 +33,7 @@ type objectSync struct {
|
|||||||
spaceId string
|
spaceId string
|
||||||
|
|
||||||
messagePool MessagePool
|
messagePool MessagePool
|
||||||
|
syncClient synctree.SyncClient
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter
|
objectGetter syncobjectgetter.SyncObjectGetter
|
||||||
configuration nodeconf.Configuration
|
configuration nodeconf.Configuration
|
||||||
spaceStorage spacestorage.SpaceStorage
|
spaceStorage spacestorage.SpaceStorage
|
||||||
@ -48,40 +51,18 @@ func NewObjectSync(
|
|||||||
objectGetter syncobjectgetter.SyncObjectGetter,
|
objectGetter syncobjectgetter.SyncObjectGetter,
|
||||||
storage spacestorage.SpaceStorage) ObjectSync {
|
storage spacestorage.SpaceStorage) ObjectSync {
|
||||||
syncCtx, cancel := context.WithCancel(context.Background())
|
syncCtx, cancel := context.WithCancel(context.Background())
|
||||||
os := newObjectSync(
|
os := &objectSync{
|
||||||
spaceId,
|
|
||||||
spaceIsDeleted,
|
|
||||||
configuration,
|
|
||||||
objectGetter,
|
|
||||||
storage,
|
|
||||||
syncCtx,
|
|
||||||
cancel)
|
|
||||||
msgPool := newMessagePool(peerManager, os.handleMessage)
|
|
||||||
os.messagePool = msgPool
|
|
||||||
return os
|
|
||||||
}
|
|
||||||
|
|
||||||
func newObjectSync(
|
|
||||||
spaceId string,
|
|
||||||
spaceIsDeleted *atomic.Bool,
|
|
||||||
configuration nodeconf.Configuration,
|
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter,
|
|
||||||
spaceStorage spacestorage.SpaceStorage,
|
|
||||||
syncCtx context.Context,
|
|
||||||
cancel context.CancelFunc,
|
|
||||||
) *objectSync {
|
|
||||||
return &objectSync{
|
|
||||||
objectGetter: objectGetter,
|
objectGetter: objectGetter,
|
||||||
spaceStorage: spaceStorage,
|
spaceStorage: storage,
|
||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
syncCtx: syncCtx,
|
syncCtx: syncCtx,
|
||||||
cancelSync: cancel,
|
cancelSync: cancel,
|
||||||
spaceIsDeleted: spaceIsDeleted,
|
spaceIsDeleted: spaceIsDeleted,
|
||||||
configuration: configuration,
|
configuration: configuration,
|
||||||
|
syncClient: synctree.NewSyncClient(spaceId, peerManager, synctree.GetRequestFactory()),
|
||||||
}
|
}
|
||||||
}
|
os.messagePool = newMessagePool(peerManager, os.handleMessage)
|
||||||
|
return os
|
||||||
func (s *objectSync) Init() {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) Close() (err error) {
|
func (s *objectSync) Close() (err error) {
|
||||||
@ -109,6 +90,10 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message")
|
log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message")
|
||||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
respErr := s.sendErrorResponse(ctx, msg, senderId, err)
|
||||||
|
if respErr != nil {
|
||||||
|
log.Debug("failed to send error response", zap.Error(respErr))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return obj.HandleMessage(ctx, senderId, msg)
|
return obj.HandleMessage(ctx, senderId, msg)
|
||||||
@ -117,3 +102,13 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
func (s *objectSync) MessagePool() MessagePool {
|
func (s *objectSync) MessagePool() MessagePool {
|
||||||
return s.messagePool
|
return s.messagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *objectSync) sendErrorResponse(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, senderId string, respErr error) (err error) {
|
||||||
|
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||||
|
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp := treechangeproto.WrapError(respErr, unmarshalled.RootChange)
|
||||||
|
return s.syncClient.SendWithReply(ctx, senderId, resp, msg.ReplyId)
|
||||||
|
}
|
||||||
|
|||||||
@ -204,7 +204,6 @@ func (s *space) Init(ctx context.Context) (err error) {
|
|||||||
OnSpaceDelete: s.onSpaceDelete,
|
OnSpaceDelete: s.onSpaceDelete,
|
||||||
}
|
}
|
||||||
s.settingsObject = settings.NewSettingsObject(deps, s.id)
|
s.settingsObject = settings.NewSettingsObject(deps, s.id)
|
||||||
s.objectSync.Init()
|
|
||||||
s.headSync.Init(initialIds, deletionState)
|
s.headSync.Init(initialIds, deletionState)
|
||||||
err = s.settingsObject.Init(ctx)
|
err = s.settingsObject.Init(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user