From e1987a36bbbf1ccdc7c6f7c70b97ca7c74cd807a Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 6 Nov 2022 14:25:27 +0100 Subject: [PATCH] Add pull space method to proto and provide get space --- client/clientspace/rpchandler.go | 52 +- client/clientspace/service.go | 5 + common/commonspace/service.go | 79 ++- common/commonspace/space.go | 27 +- .../mock_spacesyncproto.go | 15 + .../spacesyncproto/protos/spacesync.proto | 14 + .../spacesyncproto/spacesync.pb.go | 552 ++++++++++++++++-- .../spacesyncproto/spacesync_drpc.pb.go | 42 +- .../syncservice/streampool_test.go | 4 + common/nodeconf/confconnector.go | 8 +- common/pkg/acl/list/list.go | 12 +- common/pkg/acl/list/mock_list/mock_list.go | 4 +- node/nodespace/rpchandler.go | 51 +- node/nodespace/service.go | 5 + 14 files changed, 754 insertions(+), 116 deletions(-) diff --git a/client/clientspace/rpchandler.go b/client/clientspace/rpchandler.go index 6c481d3f..8f6a7989 100644 --- a/client/clientspace/rpchandler.go +++ b/client/clientspace/rpchandler.go @@ -2,43 +2,43 @@ package clientspace import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" ) type rpcHandler struct { s *service } -func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpaceRequest) (resp *spacesyncproto.PushSpaceResponse, err error) { - _, err = r.s.GetSpace(ctx, req.SpaceHeader.Id) - if err == nil { - err = spacesyncproto.ErrSpaceExists - return - } - if err != treegetter.ErrSpaceNotFound { - err = spacesyncproto.ErrUnexpected - return - } - - payload := storage.SpaceStorageCreatePayload{ - RecWithId: &aclrecordproto.RawACLRecordWithId{ - Payload: req.AclPayload, - Id: req.AclPayloadId, - }, - SpaceHeaderWithId: req.SpaceHeader, - } - st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload) +func (r *rpcHandler) PullSpace(ctx context.Context, request *spacesyncproto.PullSpaceRequest) (resp *spacesyncproto.PullSpaceResponse, err error) { + sp, err := r.s.GetSpace(ctx, request.Id) if err != nil { - err = spacesyncproto.ErrUnexpected - if err == storage.ErrSpaceStorageExists { - err = spacesyncproto.ErrSpaceExists + if err != spacesyncproto.ErrSpaceMissing { + err = spacesyncproto.ErrUnexpected } return } - st.Close() + + description := sp.Description() + resp = &spacesyncproto.PullSpaceResponse{ + SpaceHeader: description.SpaceHeader, + AclPayload: description.AclPayload, + AclPayloadId: description.AclId, + } + return +} + +func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpaceRequest) (resp *spacesyncproto.PushSpaceResponse, err error) { + description := commonspace.SpaceDescription{ + SpaceHeader: req.SpaceHeader, + AclId: req.AclPayloadId, + AclPayload: req.AclPayload, + } + err = r.s.AddSpace(ctx, description) + if err != nil { + return + } + resp = &spacesyncproto.PushSpaceResponse{} return } diff --git a/client/clientspace/service.go b/client/clientspace/service.go index 876dcd3f..26497d94 100644 --- a/client/clientspace/service.go +++ b/client/clientspace/service.go @@ -23,6 +23,7 @@ func New() Service { type Service interface { GetSpace(ctx context.Context, id string) (commonspace.Space, error) + AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) CreateSpace(ctx context.Context, payload commonspace.SpaceCreatePayload) (commonspace.Space, error) DeriveSpace(ctx context.Context, payload commonspace.SpaceDerivePayload) (commonspace.Space, error) app.ComponentRunnable @@ -92,6 +93,10 @@ func (s *service) GetSpace(ctx context.Context, id string) (container commonspac return v.(commonspace.Space), nil } +func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) { + return s.commonSpace.AddSpace(ctx, description) +} + func (s *service) Close(ctx context.Context) (err error) { return s.spaceCache.Close() } diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 8ab4d9f5..48d81ffc 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -6,12 +6,15 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" ) const CName = "common.commonspace" @@ -26,6 +29,7 @@ type Service interface { DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (string, error) CreateSpace(ctx context.Context, payload SpaceCreatePayload) (string, error) GetSpace(ctx context.Context, id string) (sp Space, err error) + AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) app.Component } @@ -82,10 +86,48 @@ func (s *service) DeriveSpace( return store.ID(), nil } +func (s *service) AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) { + _, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id) + if err == nil { + err = spacesyncproto.ErrSpaceExists + return + } + if err != storage.ErrSpaceStorageMissing { + err = spacesyncproto.ErrUnexpected + return + } + + payload := storage.SpaceStorageCreatePayload{ + RecWithId: &aclrecordproto.RawACLRecordWithId{ + Payload: spaceDescription.AclPayload, + Id: spaceDescription.AclId, + }, + SpaceHeaderWithId: spaceDescription.SpaceHeader, + } + st, err := s.storageProvider.CreateSpaceStorage(payload) + if err != nil { + err = spacesyncproto.ErrUnexpected + if err == storage.ErrSpaceStorageExists { + err = spacesyncproto.ErrSpaceExists + } + return + } + err = st.Close() + return +} + func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { st, err := s.storageProvider.SpaceStorage(id) if err != nil { - return nil, err + if err != storage.ErrSpaceStorageMissing { + return nil, err + } + + st, err = s.getSpaceStorageFromRemote(ctx, id) + if err != nil { + err = storage.ErrSpaceStorageMissing + return nil, err + } } lastConfiguration := s.configurationService.GetLast() @@ -106,3 +148,38 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { } return sp, nil } + +func (s *service) getSpaceStorageFromRemote(ctx context.Context, id string) (st storage.SpaceStorage, err error) { + var p peer.Peer + peerId, err := syncservice.GetPeerIdFromStreamContext(ctx) + if err == nil { + p, err = s.pool.Dial(ctx, peerId) + if err != nil { + return + } + } else { + lastConfiguration := s.configurationService.GetLast() + // for nodes we always get remote space only if we have id in the context + if lastConfiguration.IsResponsible(id) { + err = spacesyncproto.ErrSpaceMissing + return + } + p, err = s.pool.DialOneOf(ctx, lastConfiguration.NodeIds(id)) + if err != nil { + return + } + } + cl := spacesyncproto.NewDRPCSpaceClient(p) + res, err := cl.PullSpace(ctx, &spacesyncproto.PullSpaceRequest{Id: id}) + if err != nil { + return + } + st, err = s.storageProvider.CreateSpaceStorage(storage.SpaceStorageCreatePayload{ + RecWithId: &aclrecordproto.RawACLRecordWithId{ + Payload: res.AclPayload, + Id: res.AclPayloadId, + }, + SpaceHeaderWithId: res.SpaceHeader, + }) + return +} diff --git a/common/commonspace/space.go b/common/commonspace/space.go index dd2541bd..367c1296 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" @@ -45,6 +46,12 @@ type SpaceDerivePayload struct { EncryptionKey encryptionkey.PrivKey } +type SpaceDescription struct { + SpaceHeader *spacesyncproto.RawSpaceHeaderWithId + AclId string + AclPayload []byte +} + func NewSpaceId(id string, repKey uint64) string { return fmt.Sprintf("%s.%d", id, repKey) } @@ -52,6 +59,7 @@ func NewSpaceId(id string, repKey uint64) string { type Space interface { Id() string StoredIds() []string + Description() SpaceDescription SpaceSyncRpc() RpcHandler @@ -63,8 +71,9 @@ type Space interface { } type space struct { - id string - mu sync.RWMutex + id string + mu sync.RWMutex + header *spacesyncproto.RawSpaceHeaderWithId rpc *rpcHandler @@ -87,7 +96,21 @@ func (s *space) Id() string { return s.id } +func (s *space) Description() SpaceDescription { + root := s.aclList.Root() + return SpaceDescription{ + SpaceHeader: s.header, + AclId: root.Id, + AclPayload: root.Payload, + } +} + func (s *space) Init(ctx context.Context) (err error) { + header, err := s.storage.SpaceHeader() + if err != nil { + return + } + s.header = header s.rpc = &rpcHandler{s: s} initialIds, err := s.storage.StoredIds() if err != nil { diff --git a/common/commonspace/spacesyncproto/mock_spacesyncproto/mock_spacesyncproto.go b/common/commonspace/spacesyncproto/mock_spacesyncproto/mock_spacesyncproto.go index 0d0d52c5..0df3a6ff 100644 --- a/common/commonspace/spacesyncproto/mock_spacesyncproto/mock_spacesyncproto.go +++ b/common/commonspace/spacesyncproto/mock_spacesyncproto/mock_spacesyncproto.go @@ -65,6 +65,21 @@ func (mr *MockDRPCSpaceClientMockRecorder) HeadSync(arg0, arg1 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadSync", reflect.TypeOf((*MockDRPCSpaceClient)(nil).HeadSync), arg0, arg1) } +// PullSpace mocks base method. +func (m *MockDRPCSpaceClient) PullSpace(arg0 context.Context, arg1 *spacesyncproto.PullSpaceRequest) (*spacesyncproto.PullSpaceResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PullSpace", arg0, arg1) + ret0, _ := ret[0].(*spacesyncproto.PullSpaceResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PullSpace indicates an expected call of PullSpace. +func (mr *MockDRPCSpaceClientMockRecorder) PullSpace(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullSpace", reflect.TypeOf((*MockDRPCSpaceClient)(nil).PullSpace), arg0, arg1) +} + // PushSpace mocks base method. func (m *MockDRPCSpaceClient) PushSpace(arg0 context.Context, arg1 *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) { m.ctrl.T.Helper() diff --git a/common/commonspace/spacesyncproto/protos/spacesync.proto b/common/commonspace/spacesyncproto/protos/spacesync.proto index 52924272..441c0992 100644 --- a/common/commonspace/spacesyncproto/protos/spacesync.proto +++ b/common/commonspace/spacesyncproto/protos/spacesync.proto @@ -15,6 +15,8 @@ service Space { rpc HeadSync(HeadSyncRequest) returns (HeadSyncResponse); // PushSpace sends new space to the node rpc PushSpace(PushSpaceRequest) returns (PushSpaceResponse); + // PullSpace gets space from the remote peer + rpc PullSpace(PullSpaceRequest) returns (PullSpaceResponse); // Stream opens object sync stream with node or client rpc Stream(stream ObjectSyncMessage) returns (stream ObjectSyncMessage); } @@ -70,6 +72,18 @@ message PushSpaceRequest { // PushSpaceResponse is an empty response message PushSpaceResponse {} +// PullSpaceRequest is a request to request a space on a node that doesn't have it +message PullSpaceRequest { + string id = 1; +} + +// PullSpaceResponse is a response with header and acl root +message PullSpaceResponse { + RawSpaceHeaderWithId spaceHeader = 1; + bytes aclPayload = 2; + string aclPayloadId = 3; +} + // SpaceHeader is a header for a space message SpaceHeader { bytes identity = 1; diff --git a/common/commonspace/spacesyncproto/spacesync.pb.go b/common/commonspace/spacesyncproto/spacesync.pb.go index 8579814c..9f73b956 100644 --- a/common/commonspace/spacesyncproto/spacesync.pb.go +++ b/common/commonspace/spacesyncproto/spacesync.pb.go @@ -493,6 +493,112 @@ func (m *PushSpaceResponse) XXX_DiscardUnknown() { var xxx_messageInfo_PushSpaceResponse proto.InternalMessageInfo +// PullSpaceRequest is a request to request a space on a node that doesn't have it +type PullSpaceRequest struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (m *PullSpaceRequest) Reset() { *m = PullSpaceRequest{} } +func (m *PullSpaceRequest) String() string { return proto.CompactTextString(m) } +func (*PullSpaceRequest) ProtoMessage() {} +func (*PullSpaceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_80e49f1f4ac27799, []int{8} +} +func (m *PullSpaceRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PullSpaceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PullSpaceRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PullSpaceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PullSpaceRequest.Merge(m, src) +} +func (m *PullSpaceRequest) XXX_Size() int { + return m.Size() +} +func (m *PullSpaceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PullSpaceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PullSpaceRequest proto.InternalMessageInfo + +func (m *PullSpaceRequest) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +// PullSpaceResponse is a response with header and acl root +type PullSpaceResponse struct { + SpaceHeader *RawSpaceHeaderWithId `protobuf:"bytes,1,opt,name=spaceHeader,proto3" json:"spaceHeader,omitempty"` + AclPayload []byte `protobuf:"bytes,2,opt,name=aclPayload,proto3" json:"aclPayload,omitempty"` + AclPayloadId string `protobuf:"bytes,3,opt,name=aclPayloadId,proto3" json:"aclPayloadId,omitempty"` +} + +func (m *PullSpaceResponse) Reset() { *m = PullSpaceResponse{} } +func (m *PullSpaceResponse) String() string { return proto.CompactTextString(m) } +func (*PullSpaceResponse) ProtoMessage() {} +func (*PullSpaceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_80e49f1f4ac27799, []int{9} +} +func (m *PullSpaceResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PullSpaceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PullSpaceResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PullSpaceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PullSpaceResponse.Merge(m, src) +} +func (m *PullSpaceResponse) XXX_Size() int { + return m.Size() +} +func (m *PullSpaceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PullSpaceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PullSpaceResponse proto.InternalMessageInfo + +func (m *PullSpaceResponse) GetSpaceHeader() *RawSpaceHeaderWithId { + if m != nil { + return m.SpaceHeader + } + return nil +} + +func (m *PullSpaceResponse) GetAclPayload() []byte { + if m != nil { + return m.AclPayload + } + return nil +} + +func (m *PullSpaceResponse) GetAclPayloadId() string { + if m != nil { + return m.AclPayloadId + } + return "" +} + // SpaceHeader is a header for a space type SpaceHeader struct { Identity []byte `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` @@ -506,7 +612,7 @@ func (m *SpaceHeader) Reset() { *m = SpaceHeader{} } func (m *SpaceHeader) String() string { return proto.CompactTextString(m) } func (*SpaceHeader) ProtoMessage() {} func (*SpaceHeader) Descriptor() ([]byte, []int) { - return fileDescriptor_80e49f1f4ac27799, []int{8} + return fileDescriptor_80e49f1f4ac27799, []int{10} } func (m *SpaceHeader) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -579,7 +685,7 @@ func (m *RawSpaceHeader) Reset() { *m = RawSpaceHeader{} } func (m *RawSpaceHeader) String() string { return proto.CompactTextString(m) } func (*RawSpaceHeader) ProtoMessage() {} func (*RawSpaceHeader) Descriptor() ([]byte, []int) { - return fileDescriptor_80e49f1f4ac27799, []int{9} + return fileDescriptor_80e49f1f4ac27799, []int{11} } func (m *RawSpaceHeader) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -631,7 +737,7 @@ func (m *RawSpaceHeaderWithId) Reset() { *m = RawSpaceHeaderWithId{} } func (m *RawSpaceHeaderWithId) String() string { return proto.CompactTextString(m) } func (*RawSpaceHeaderWithId) ProtoMessage() {} func (*RawSpaceHeaderWithId) Descriptor() ([]byte, []int) { - return fileDescriptor_80e49f1f4ac27799, []int{10} + return fileDescriptor_80e49f1f4ac27799, []int{12} } func (m *RawSpaceHeaderWithId) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -684,6 +790,8 @@ func init() { proto.RegisterType((*ObjectSyncMessage)(nil), "anySpace.ObjectSyncMessage") proto.RegisterType((*PushSpaceRequest)(nil), "anySpace.PushSpaceRequest") proto.RegisterType((*PushSpaceResponse)(nil), "anySpace.PushSpaceResponse") + proto.RegisterType((*PullSpaceRequest)(nil), "anySpace.PullSpaceRequest") + proto.RegisterType((*PullSpaceResponse)(nil), "anySpace.PullSpaceResponse") proto.RegisterType((*SpaceHeader)(nil), "anySpace.SpaceHeader") proto.RegisterType((*RawSpaceHeader)(nil), "anySpace.RawSpaceHeader") proto.RegisterType((*RawSpaceHeaderWithId)(nil), "anySpace.RawSpaceHeaderWithId") @@ -694,50 +802,53 @@ func init() { } var fileDescriptor_80e49f1f4ac27799 = []byte{ - // 685 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xcd, 0x4e, 0xdb, 0x4a, - 0x14, 0x8e, 0x4d, 0x80, 0xe4, 0x24, 0x84, 0x30, 0x97, 0xab, 0xeb, 0x9b, 0x56, 0x6e, 0xe4, 0x45, - 0x15, 0x75, 0x01, 0x6d, 0xda, 0x1d, 0x9b, 0xfe, 0x10, 0xd4, 0xa8, 0xa2, 0xa0, 0x49, 0xab, 0x4a, - 0x55, 0x37, 0x83, 0x3d, 0x24, 0x53, 0xc5, 0x1e, 0xd7, 0x33, 0x11, 0x78, 0xd1, 0x77, 0xe8, 0xb2, - 0xdb, 0xbe, 0x4d, 0x97, 0x2c, 0x59, 0x56, 0xf0, 0x22, 0xd5, 0x9c, 0xd8, 0x71, 0x02, 0x81, 0xcd, - 0x64, 0xce, 0x77, 0xfe, 0xbe, 0xf9, 0x72, 0x8e, 0xe1, 0x99, 0x2f, 0xc3, 0x50, 0x46, 0x2a, 0x66, - 0x3e, 0xdf, 0xc5, 0x53, 0xa5, 0x91, 0x1f, 0x27, 0x52, 0xcb, 0x5d, 0x3c, 0x55, 0x81, 0xee, 0x20, - 0x40, 0x2a, 0x2c, 0x4a, 0x07, 0x06, 0xf3, 0xfa, 0xb0, 0xf1, 0x96, 0xb3, 0x60, 0x90, 0x46, 0x3e, - 0x65, 0xd1, 0x90, 0x13, 0x02, 0xe5, 0xd3, 0x44, 0x86, 0x8e, 0xd5, 0xb6, 0x3a, 0x65, 0x8a, 0x77, - 0xd2, 0x00, 0x5b, 0x4b, 0xc7, 0x46, 0xc4, 0xd6, 0x92, 0x6c, 0xc3, 0xea, 0x58, 0x84, 0x42, 0x3b, - 0x2b, 0x6d, 0xab, 0xb3, 0x41, 0xa7, 0x86, 0x77, 0x06, 0x8d, 0x59, 0x29, 0xae, 0x26, 0x63, 0x6d, - 0x6a, 0x8d, 0x98, 0x1a, 0x61, 0xad, 0x3a, 0xc5, 0x3b, 0xd9, 0x83, 0x0a, 0x1f, 0xf3, 0x90, 0x47, - 0x5a, 0x39, 0x76, 0x7b, 0xa5, 0x53, 0xeb, 0x3e, 0xda, 0xc9, 0xd9, 0xec, 0x2c, 0xe6, 0xf7, 0xa6, - 0x71, 0x74, 0x96, 0x60, 0x1a, 0xfb, 0x72, 0x12, 0xcd, 0x1a, 0xa3, 0xe1, 0xed, 0xc1, 0xbf, 0x4b, - 0x13, 0x0d, 0x6f, 0x11, 0x60, 0xf7, 0x2a, 0xb5, 0x45, 0x80, 0x7c, 0x38, 0x0b, 0xf0, 0x25, 0x55, - 0x8a, 0x77, 0xef, 0x0b, 0x6c, 0x16, 0xc9, 0xdf, 0x26, 0x5c, 0x69, 0xe2, 0xc0, 0x3a, 0x0a, 0xd6, - 0xcf, 0x73, 0x73, 0x93, 0xec, 0xc2, 0x5a, 0x62, 0x54, 0xca, 0xa9, 0xff, 0xb7, 0x84, 0xba, 0xf1, - 0xd3, 0x2c, 0xcc, 0x3b, 0x80, 0xe6, 0x1c, 0xb5, 0x58, 0x46, 0x8a, 0x93, 0x2e, 0xac, 0x27, 0x48, - 0x53, 0x39, 0x16, 0x56, 0x71, 0xee, 0x12, 0x80, 0xe6, 0x81, 0xde, 0x77, 0xd8, 0x3a, 0x3a, 0xf9, - 0xca, 0x7d, 0x6d, 0x9c, 0x87, 0x5c, 0x29, 0x36, 0xe4, 0xf7, 0xf0, 0x74, 0x4c, 0x8b, 0x78, 0x9c, - 0xf6, 0xf3, 0xb7, 0xe6, 0xa6, 0xf1, 0xc4, 0x2c, 0x1d, 0x4b, 0x16, 0xa0, 0x86, 0x75, 0x9a, 0x9b, - 0xa4, 0x05, 0x15, 0x89, 0x2d, 0xfa, 0x81, 0x53, 0xc6, 0xa4, 0x99, 0xed, 0xfd, 0xb4, 0xa0, 0x79, - 0x3c, 0x51, 0x23, 0x24, 0x99, 0xcb, 0xf4, 0x12, 0x6a, 0xd8, 0xcf, 0x70, 0xe6, 0x09, 0x52, 0xa8, - 0x75, 0xdd, 0xe2, 0x2d, 0x94, 0x9d, 0x0d, 0x0a, 0xff, 0x27, 0xa1, 0x47, 0xfd, 0x80, 0xce, 0xa7, - 0x10, 0x17, 0x80, 0xf9, 0xe3, 0xe3, 0x8c, 0x8f, 0x8d, 0x7c, 0xe6, 0x10, 0xe2, 0x41, 0xbd, 0xb0, - 0xfa, 0x53, 0xc6, 0x55, 0xba, 0x80, 0x79, 0xff, 0xc0, 0xd6, 0x1c, 0xb3, 0xa9, 0xc4, 0xde, 0x2f, - 0x0b, 0x6a, 0x73, 0xbd, 0xcd, 0xdb, 0x44, 0xc0, 0x23, 0x2d, 0x74, 0x9a, 0x0d, 0xe3, 0xcc, 0x26, - 0x0f, 0xa1, 0xaa, 0x45, 0xc8, 0x95, 0x66, 0x61, 0x8c, 0x1c, 0x56, 0x68, 0x01, 0x18, 0x2f, 0x32, - 0xfe, 0x90, 0xc6, 0x3c, 0xeb, 0x5f, 0x00, 0xe4, 0x31, 0x34, 0x8c, 0xb0, 0xc2, 0x67, 0x5a, 0xc8, - 0xe8, 0x1d, 0x4f, 0x51, 0xb9, 0x32, 0xbd, 0x81, 0x9a, 0xc1, 0x53, 0x9c, 0x07, 0xce, 0xea, 0x74, - 0x11, 0xcc, 0xdd, 0x3b, 0x86, 0xc6, 0xa2, 0x42, 0xa4, 0x7d, 0x5b, 0xd0, 0xfa, 0xa2, 0x60, 0x86, - 0x8d, 0x18, 0x46, 0x4c, 0x4f, 0x12, 0x9e, 0xe9, 0x55, 0x00, 0xde, 0x3e, 0x6c, 0x2f, 0xd3, 0xdc, - 0x64, 0x25, 0xec, 0x6c, 0xa1, 0x6a, 0x01, 0x64, 0x4b, 0x62, 0xe7, 0x4b, 0xf2, 0xe4, 0x3d, 0x54, - 0x7a, 0x49, 0xf2, 0x46, 0x06, 0x5c, 0x91, 0x06, 0xc0, 0xc7, 0x88, 0x9f, 0xc7, 0xdc, 0xd7, 0x3c, - 0x68, 0x96, 0x48, 0x13, 0xea, 0x58, 0xfe, 0x50, 0x28, 0x25, 0xa2, 0x61, 0xd3, 0x22, 0x9b, 0x99, - 0xd0, 0xbd, 0x73, 0xa1, 0xb4, 0x6a, 0xda, 0x06, 0xe8, 0x25, 0x89, 0x4c, 0x8e, 0x4e, 0x4f, 0x15, - 0xd7, 0xcd, 0xa0, 0x7b, 0x69, 0xc1, 0x2a, 0x86, 0x90, 0x57, 0x50, 0xc9, 0xe7, 0x9b, 0xfc, 0xbf, - 0x6c, 0xe6, 0x71, 0xae, 0x5a, 0xad, 0xa5, 0xeb, 0x30, 0xdd, 0x9d, 0x7d, 0xa8, 0xce, 0xfe, 0x6d, - 0x32, 0x17, 0x78, 0x73, 0x38, 0x5b, 0x0f, 0x96, 0xfa, 0xb2, 0x2a, 0x07, 0xb0, 0x36, 0xd0, 0x09, - 0x67, 0x21, 0x99, 0x0b, 0xbb, 0xb5, 0x5f, 0xad, 0xfb, 0x9c, 0x1d, 0xeb, 0xa9, 0xf5, 0xfa, 0xc5, - 0xef, 0x2b, 0xd7, 0xba, 0xb8, 0x72, 0xad, 0x3f, 0x57, 0xae, 0xf5, 0xe3, 0xda, 0x2d, 0x5d, 0x5c, - 0xbb, 0xa5, 0xcb, 0x6b, 0xb7, 0xf4, 0xb9, 0x75, 0xf7, 0x37, 0xf9, 0x64, 0x0d, 0x7f, 0x9e, 0xff, - 0x0d, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x45, 0x08, 0x76, 0xb8, 0x05, 0x00, 0x00, + // 721 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x55, 0xcd, 0x6e, 0xd3, 0x4a, + 0x14, 0x8e, 0xdd, 0xb4, 0x4d, 0x4e, 0xd2, 0x34, 0x9d, 0xdb, 0xab, 0xeb, 0x1b, 0x90, 0x89, 0xbc, + 0x40, 0x11, 0x8b, 0x16, 0x02, 0xbb, 0x6e, 0xf8, 0x69, 0x2a, 0x22, 0x54, 0x5a, 0x4d, 0x40, 0x48, + 0x88, 0xcd, 0xd4, 0x9e, 0x26, 0x46, 0xfe, 0xc3, 0x33, 0x51, 0xeb, 0x05, 0xef, 0xc0, 0x12, 0x36, + 0x48, 0xbc, 0x0d, 0xcb, 0x2e, 0x59, 0xa2, 0xf6, 0x45, 0xd0, 0x9c, 0xd8, 0xb1, 0x9d, 0xa6, 0x5d, + 0xb3, 0x71, 0xe7, 0x7c, 0xe7, 0xef, 0x9b, 0x6f, 0xce, 0x69, 0xe0, 0x91, 0x1d, 0xfa, 0x7e, 0x18, + 0x88, 0x88, 0xd9, 0x7c, 0x17, 0xbf, 0x22, 0x09, 0xec, 0x28, 0x0e, 0x65, 0xb8, 0x8b, 0x5f, 0x91, + 0xa3, 0x3b, 0x08, 0x90, 0x1a, 0x0b, 0x92, 0x91, 0xc2, 0xac, 0x21, 0x6c, 0xbc, 0xe4, 0xcc, 0x19, + 0x25, 0x81, 0x4d, 0x59, 0x30, 0xe6, 0x84, 0x40, 0xf5, 0x34, 0x0e, 0x7d, 0x43, 0xeb, 0x6a, 0xbd, + 0x2a, 0xc5, 0x33, 0x69, 0x81, 0x2e, 0x43, 0x43, 0x47, 0x44, 0x97, 0x21, 0xd9, 0x86, 0x55, 0xcf, + 0xf5, 0x5d, 0x69, 0xac, 0x74, 0xb5, 0xde, 0x06, 0x9d, 0x19, 0xd6, 0x19, 0xb4, 0xe6, 0xa5, 0xb8, + 0x98, 0x7a, 0x52, 0xd5, 0x9a, 0x30, 0x31, 0xc1, 0x5a, 0x4d, 0x8a, 0x67, 0xb2, 0x07, 0x35, 0xee, + 0x71, 0x9f, 0x07, 0x52, 0x18, 0x7a, 0x77, 0xa5, 0xd7, 0xe8, 0xdf, 0xdb, 0xc9, 0xd8, 0xec, 0x94, + 0xf3, 0x07, 0xb3, 0x38, 0x3a, 0x4f, 0x50, 0x8d, 0xed, 0x70, 0x1a, 0xcc, 0x1b, 0xa3, 0x61, 0xed, + 0xc1, 0xbf, 0x4b, 0x13, 0x15, 0x6f, 0xd7, 0xc1, 0xee, 0x75, 0xaa, 0xbb, 0x0e, 0xf2, 0xe1, 0xcc, + 0xc1, 0x9b, 0xd4, 0x29, 0x9e, 0xad, 0x0f, 0xb0, 0x99, 0x27, 0x7f, 0x9a, 0x72, 0x21, 0x89, 0x01, + 0xeb, 0x28, 0xd8, 0x30, 0xcb, 0xcd, 0x4c, 0xb2, 0x0b, 0x6b, 0xb1, 0x52, 0x29, 0xa3, 0xfe, 0xdf, + 0x12, 0xea, 0xca, 0x4f, 0xd3, 0x30, 0xeb, 0x00, 0xda, 0x05, 0x6a, 0x51, 0x18, 0x08, 0x4e, 0xfa, + 0xb0, 0x1e, 0x23, 0x4d, 0x61, 0x68, 0x58, 0xc5, 0xb8, 0x49, 0x00, 0x9a, 0x05, 0x5a, 0x9f, 0x61, + 0xeb, 0xe8, 0xe4, 0x23, 0xb7, 0xa5, 0x72, 0x1e, 0x72, 0x21, 0xd8, 0x98, 0xdf, 0xc2, 0xd3, 0x50, + 0x2d, 0x22, 0x2f, 0x19, 0x66, 0x77, 0xcd, 0x4c, 0xe5, 0x89, 0x58, 0xe2, 0x85, 0xcc, 0x41, 0x0d, + 0x9b, 0x34, 0x33, 0x49, 0x07, 0x6a, 0x21, 0xb6, 0x18, 0x3a, 0x46, 0x15, 0x93, 0xe6, 0xb6, 0xf5, + 0x55, 0x83, 0xf6, 0xf1, 0x54, 0x4c, 0x90, 0x64, 0x26, 0xd3, 0x53, 0x68, 0x60, 0x3f, 0xc5, 0x99, + 0xc7, 0x48, 0xa1, 0xd1, 0x37, 0xf3, 0xbb, 0x50, 0x76, 0x36, 0xca, 0xfd, 0xef, 0x5c, 0x39, 0x19, + 0x3a, 0xb4, 0x98, 0x42, 0x4c, 0x00, 0x66, 0x7b, 0xc7, 0x29, 0x1f, 0x1d, 0xf9, 0x14, 0x10, 0x62, + 0x41, 0x33, 0xb7, 0x86, 0x33, 0xc6, 0x75, 0x5a, 0xc2, 0xac, 0x7f, 0x60, 0xab, 0xc0, 0x6c, 0x26, + 0xb1, 0x65, 0x29, 0xba, 0x9e, 0x57, 0xa2, 0xbb, 0x30, 0x0c, 0xd6, 0x37, 0x4d, 0x65, 0xce, 0x83, + 0xd2, 0xc7, 0xf9, 0x3b, 0x2e, 0xf5, 0x43, 0x83, 0x46, 0xa1, 0x8d, 0x7a, 0x1b, 0xd7, 0xe1, 0x81, + 0x74, 0x65, 0x92, 0x2e, 0xd3, 0xdc, 0x26, 0x77, 0xa1, 0x2e, 0x5d, 0x9f, 0x0b, 0xc9, 0xfc, 0x08, + 0xdb, 0xad, 0xd0, 0x1c, 0x50, 0x5e, 0x24, 0xf7, 0x26, 0x89, 0x78, 0xda, 0x2a, 0x07, 0xc8, 0x7d, + 0x68, 0xa9, 0xc1, 0x70, 0x6d, 0x26, 0xdd, 0x30, 0x78, 0xc5, 0x13, 0x7c, 0xf9, 0x2a, 0x5d, 0x40, + 0xd5, 0xe2, 0x08, 0xce, 0x1d, 0x63, 0x75, 0xb6, 0xc8, 0xea, 0x6c, 0x1d, 0x43, 0xab, 0x2c, 0x06, + 0xe9, 0x5e, 0xd7, 0xae, 0x59, 0xd6, 0x46, 0xb1, 0x71, 0xc7, 0x01, 0x93, 0xd3, 0x98, 0xa7, 0xd2, + 0xe4, 0x80, 0xb5, 0x0f, 0xdb, 0xcb, 0xe4, 0x55, 0x59, 0x31, 0x3b, 0x2b, 0x55, 0xcd, 0x81, 0xf4, + 0x5d, 0xf5, 0xec, 0x5d, 0x1f, 0xbc, 0x86, 0xda, 0x20, 0x8e, 0x5f, 0x84, 0x0e, 0x17, 0xa4, 0x05, + 0xf0, 0x36, 0xe0, 0xe7, 0x11, 0xb7, 0x25, 0x77, 0xda, 0x15, 0xd2, 0x86, 0x26, 0x96, 0x3f, 0x74, + 0x85, 0x70, 0x83, 0x71, 0x5b, 0x23, 0x9b, 0xa9, 0xd0, 0x83, 0x73, 0x57, 0x48, 0xd1, 0xd6, 0x15, + 0x30, 0x88, 0xe3, 0x30, 0x3e, 0x3a, 0x3d, 0x15, 0x5c, 0xb6, 0x9d, 0xfe, 0x77, 0x1d, 0x56, 0x31, + 0x84, 0x3c, 0x83, 0x5a, 0xb6, 0x9f, 0xe4, 0xff, 0x65, 0x3b, 0x8b, 0x83, 0xd6, 0xe9, 0x2c, 0x5d, + 0xe7, 0xd9, 0x78, 0xed, 0x43, 0x7d, 0x3e, 0xad, 0xa4, 0x10, 0xb8, 0xb8, 0x5c, 0x9d, 0x3b, 0x4b, + 0x7d, 0xc5, 0x2a, 0xe9, 0xe4, 0x96, 0xab, 0x94, 0x67, 0xbe, 0x5c, 0x65, 0x71, 0xd4, 0x0f, 0x60, + 0x6d, 0x24, 0x63, 0xce, 0x7c, 0x52, 0x08, 0xbb, 0xf6, 0x5f, 0xa6, 0x73, 0x9b, 0xb3, 0xa7, 0x3d, + 0xd4, 0x9e, 0x3f, 0xf9, 0x79, 0x69, 0x6a, 0x17, 0x97, 0xa6, 0xf6, 0xfb, 0xd2, 0xd4, 0xbe, 0x5c, + 0x99, 0x95, 0x8b, 0x2b, 0xb3, 0xf2, 0xeb, 0xca, 0xac, 0xbc, 0xef, 0xdc, 0xfc, 0xcb, 0x74, 0xb2, + 0x86, 0x7f, 0x1e, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0xf7, 0x95, 0x5b, 0xbc, 0xbe, 0x06, 0x00, + 0x00, } func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { @@ -1068,6 +1179,85 @@ func (m *PushSpaceResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *PullSpaceRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PullSpaceRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PullSpaceRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PullSpaceResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PullSpaceResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PullSpaceResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.AclPayloadId) > 0 { + i -= len(m.AclPayloadId) + copy(dAtA[i:], m.AclPayloadId) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.AclPayloadId))) + i-- + dAtA[i] = 0x1a + } + if len(m.AclPayload) > 0 { + i -= len(m.AclPayload) + copy(dAtA[i:], m.AclPayload) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.AclPayload))) + i-- + dAtA[i] = 0x12 + } + if m.SpaceHeader != nil { + { + size, err := m.SpaceHeader.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSpacesync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *SpaceHeader) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1353,6 +1543,40 @@ func (m *PushSpaceResponse) Size() (n int) { return n } +func (m *PullSpaceRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovSpacesync(uint64(l)) + } + return n +} + +func (m *PullSpaceResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SpaceHeader != nil { + l = m.SpaceHeader.Size() + n += 1 + l + sovSpacesync(uint64(l)) + } + l = len(m.AclPayload) + if l > 0 { + n += 1 + l + sovSpacesync(uint64(l)) + } + l = len(m.AclPayloadId) + if l > 0 { + n += 1 + l + sovSpacesync(uint64(l)) + } + return n +} + func (m *SpaceHeader) Size() (n int) { if m == nil { return 0 @@ -2360,6 +2584,240 @@ func (m *PushSpaceResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *PullSpaceRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PullSpaceRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PullSpaceRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSpacesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSpacesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PullSpaceResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PullSpaceResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PullSpaceResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceHeader", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SpaceHeader == nil { + m.SpaceHeader = &RawSpaceHeaderWithId{} + } + if err := m.SpaceHeader.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AclPayload", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.AclPayload = append(m.AclPayload[:0], dAtA[iNdEx:postIndex]...) + if m.AclPayload == nil { + m.AclPayload = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AclPayloadId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.AclPayloadId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSpacesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSpacesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *SpaceHeader) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/common/commonspace/spacesyncproto/spacesync_drpc.pb.go b/common/commonspace/spacesyncproto/spacesync_drpc.pb.go index c5af9cef..b1a0a929 100644 --- a/common/commonspace/spacesyncproto/spacesync_drpc.pb.go +++ b/common/commonspace/spacesyncproto/spacesync_drpc.pb.go @@ -42,6 +42,7 @@ type DRPCSpaceClient interface { HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) PushSpace(ctx context.Context, in *PushSpaceRequest) (*PushSpaceResponse, error) + PullSpace(ctx context.Context, in *PullSpaceRequest) (*PullSpaceResponse, error) Stream(ctx context.Context) (DRPCSpace_StreamClient, error) } @@ -73,6 +74,15 @@ func (c *drpcSpaceClient) PushSpace(ctx context.Context, in *PushSpaceRequest) ( return out, nil } +func (c *drpcSpaceClient) PullSpace(ctx context.Context, in *PullSpaceRequest) (*PullSpaceResponse, error) { + out := new(PullSpaceResponse) + err := c.cc.Invoke(ctx, "/anySpace.Space/PullSpace", drpcEncoding_File_commonspace_spacesyncproto_protos_spacesync_proto{}, in, out) + if err != nil { + return nil, err + } + return out, nil +} + func (c *drpcSpaceClient) Stream(ctx context.Context) (DRPCSpace_StreamClient, error) { stream, err := c.cc.NewStream(ctx, "/anySpace.Space/Stream", drpcEncoding_File_commonspace_spacesyncproto_protos_spacesync_proto{}) if err != nil { @@ -111,6 +121,7 @@ func (x *drpcSpace_StreamClient) RecvMsg(m *ObjectSyncMessage) error { type DRPCSpaceServer interface { HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) PushSpace(context.Context, *PushSpaceRequest) (*PushSpaceResponse, error) + PullSpace(context.Context, *PullSpaceRequest) (*PullSpaceResponse, error) Stream(DRPCSpace_StreamStream) error } @@ -124,13 +135,17 @@ func (s *DRPCSpaceUnimplementedServer) PushSpace(context.Context, *PushSpaceRequ return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } +func (s *DRPCSpaceUnimplementedServer) PullSpace(context.Context, *PullSpaceRequest) (*PullSpaceResponse, error) { + return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + func (s *DRPCSpaceUnimplementedServer) Stream(DRPCSpace_StreamStream) error { return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } type DRPCSpaceDescription struct{} -func (DRPCSpaceDescription) NumMethods() int { return 3 } +func (DRPCSpaceDescription) NumMethods() int { return 4 } func (DRPCSpaceDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { switch n { @@ -153,6 +168,15 @@ func (DRPCSpaceDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, ) }, DRPCSpaceServer.PushSpace, true case 2: + return "/anySpace.Space/PullSpace", drpcEncoding_File_commonspace_spacesyncproto_protos_spacesync_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCSpaceServer). + PullSpace( + ctx, + in1.(*PullSpaceRequest), + ) + }, DRPCSpaceServer.PullSpace, true + case 3: return "/anySpace.Space/Stream", drpcEncoding_File_commonspace_spacesyncproto_protos_spacesync_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return nil, srv.(DRPCSpaceServer). @@ -201,6 +225,22 @@ func (x *drpcSpace_PushSpaceStream) SendAndClose(m *PushSpaceResponse) error { return x.CloseSend() } +type DRPCSpace_PullSpaceStream interface { + drpc.Stream + SendAndClose(*PullSpaceResponse) error +} + +type drpcSpace_PullSpaceStream struct { + drpc.Stream +} + +func (x *drpcSpace_PullSpaceStream) SendAndClose(m *PullSpaceResponse) error { + if err := x.MsgSend(m, drpcEncoding_File_commonspace_spacesyncproto_protos_spacesync_proto{}); err != nil { + return err + } + return x.CloseSend() +} + type DRPCSpace_StreamStream interface { drpc.Stream Send(*ObjectSyncMessage) error diff --git a/common/commonspace/syncservice/streampool_test.go b/common/commonspace/syncservice/streampool_test.go index 47ed4ee0..a9cc2bae 100644 --- a/common/commonspace/syncservice/streampool_test.go +++ b/common/commonspace/syncservice/streampool_test.go @@ -27,6 +27,10 @@ func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.Push panic("implement me") } +func (t *testServer) PullSpace(ctx context.Context, request *spacesyncproto.PullSpaceRequest) (*spacesyncproto.PullSpaceResponse, error) { + panic("implement me") +} + func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { t.stream <- stream return <-t.releaseStream diff --git a/common/nodeconf/confconnector.go b/common/nodeconf/confconnector.go index e62be926..bb2c302a 100644 --- a/common/nodeconf/confconnector.go +++ b/common/nodeconf/confconnector.go @@ -21,19 +21,19 @@ func NewConfConnector(conf Configuration, pool pool.Pool) ConfConnector { } func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.dialOrConnect(ctx, spaceId, s.pool.Get, s.pool.GetOneOf) + return s.connectOneOrMany(ctx, spaceId, s.pool.Get, s.pool.GetOneOf) } func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.dialOrConnect(ctx, spaceId, s.pool.Dial, s.pool.DialOneOf) + return s.connectOneOrMany(ctx, spaceId, s.pool.Dial, s.pool.DialOneOf) } -func (s *confConnector) dialOrConnect( +func (s *confConnector) connectOneOrMany( ctx context.Context, spaceId string, connectOne func(context.Context, string) (peer.Peer, error), connectOneOf func(context.Context, []string) (peer.Peer, error)) (peers []peer.Peer, err error) { allNodes := s.conf.NodeIds(spaceId) - if !s.conf.IsResponsible(spaceId) { + if s.conf.IsResponsible(spaceId) { for _, id := range allNodes { var p peer.Peer p, err = connectOne(ctx, id) diff --git a/common/pkg/acl/list/list.go b/common/pkg/acl/list/list.go index 50a7b59a..6045fec4 100644 --- a/common/pkg/acl/list/list.go +++ b/common/pkg/acl/list/list.go @@ -25,7 +25,7 @@ type RWLocker interface { type ACLList interface { RWLocker ID() string - Root() *aclrecordproto.ACLRoot + Root() *aclrecordproto.RawACLRecordWithId Records() []*ACLRecord ACLState() *ACLState IsAfter(first string, second string) (bool, error) @@ -38,7 +38,7 @@ type ACLList interface { } type aclList struct { - root *aclrecordproto.ACLRoot + root *aclrecordproto.RawACLRecordWithId records []*ACLRecord indexes map[string]int id string @@ -114,13 +114,9 @@ func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder if err != nil { return } - aclRecRoot, err := recBuilder.ConvertFromRaw(rootWithId) - if err != nil { - return - } list = &aclList{ - root: aclRecRoot.Model.(*aclrecordproto.ACLRoot), + root: rootWithId, records: records, indexes: indexes, stateBuilder: stateBuilder, @@ -141,7 +137,7 @@ func (a *aclList) ID() string { return a.id } -func (a *aclList) Root() *aclrecordproto.ACLRoot { +func (a *aclList) Root() *aclrecordproto.RawACLRecordWithId { return a.root } diff --git a/common/pkg/acl/list/mock_list/mock_list.go b/common/pkg/acl/list/mock_list/mock_list.go index a4aeb396..4e1c0762 100644 --- a/common/pkg/acl/list/mock_list/mock_list.go +++ b/common/pkg/acl/list/mock_list/mock_list.go @@ -211,10 +211,10 @@ func (mr *MockACLListMockRecorder) Records() *gomock.Call { } // Root mocks base method. -func (m *MockACLList) Root() *aclrecordproto.ACLRoot { +func (m *MockACLList) Root() *aclrecordproto.RawACLRecordWithId { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Root") - ret0, _ := ret[0].(*aclrecordproto.ACLRoot) + ret0, _ := ret[0].(*aclrecordproto.RawACLRecordWithId) return ret0 } diff --git a/node/nodespace/rpchandler.go b/node/nodespace/rpchandler.go index f0692c8a..da8deca1 100644 --- a/node/nodespace/rpchandler.go +++ b/node/nodespace/rpchandler.go @@ -2,42 +2,43 @@ package nodespace import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" ) type rpcHandler struct { s *service } -func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpaceRequest) (resp *spacesyncproto.PushSpaceResponse, err error) { - _, err = r.s.GetSpace(ctx, req.SpaceHeader.Id) - if err == nil { - err = spacesyncproto.ErrSpaceExists - return - } - if err != storage.ErrSpaceStorageMissing { - err = spacesyncproto.ErrUnexpected - return - } - - payload := storage.SpaceStorageCreatePayload{ - RecWithId: &aclrecordproto.RawACLRecordWithId{ - Payload: req.AclPayload, - Id: req.AclPayloadId, - }, - SpaceHeaderWithId: req.SpaceHeader, - } - st, err := r.s.spaceStorageProvider.CreateSpaceStorage(payload) +func (r *rpcHandler) PullSpace(ctx context.Context, request *spacesyncproto.PullSpaceRequest) (resp *spacesyncproto.PullSpaceResponse, err error) { + sp, err := r.s.GetSpace(ctx, request.Id) if err != nil { - err = spacesyncproto.ErrUnexpected - if err == storage.ErrSpaceStorageExists { - err = spacesyncproto.ErrSpaceExists + if err != spacesyncproto.ErrSpaceMissing { + err = spacesyncproto.ErrUnexpected } return } - st.Close() + + description := sp.Description() + resp = &spacesyncproto.PullSpaceResponse{ + SpaceHeader: description.SpaceHeader, + AclPayload: description.AclPayload, + AclPayloadId: description.AclId, + } + return +} + +func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpaceRequest) (resp *spacesyncproto.PushSpaceResponse, err error) { + description := commonspace.SpaceDescription{ + SpaceHeader: req.SpaceHeader, + AclId: req.AclPayloadId, + AclPayload: req.AclPayload, + } + err = r.s.AddSpace(ctx, description) + if err != nil { + return + } + resp = &spacesyncproto.PushSpaceResponse{} return } diff --git a/node/nodespace/service.go b/node/nodespace/service.go index 3b4f8896..fdfa97a2 100644 --- a/node/nodespace/service.go +++ b/node/nodespace/service.go @@ -22,6 +22,7 @@ func New() Service { } type Service interface { + AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) GetSpace(ctx context.Context, id string) (commonspace.Space, error) app.ComponentRunnable } @@ -68,6 +69,10 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e return v.(commonspace.Space), nil } +func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) { + return s.commonSpace.AddSpace(ctx, description) +} + func (s *service) Close(ctx context.Context) (err error) { return s.spaceCache.Close() }