Added simple watcher and statusservice upgrades

This commit is contained in:
mcrakhman 2022-12-17 21:48:48 +01:00
parent 6444042a02
commit 364ec32a39
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
17 changed files with 1085 additions and 62 deletions

View File

@ -986,6 +986,182 @@ func (m *TreeParamsResponse) GetHeadIds() []string {
return nil
}
type WatchRequest struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"`
}
func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc31080c27db9707, []int{21}
}
func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *WatchRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_WatchRequest.Merge(m, src)
}
func (m *WatchRequest) XXX_Size() int {
return m.Size()
}
func (m *WatchRequest) XXX_DiscardUnknown() {
xxx_messageInfo_WatchRequest.DiscardUnknown(m)
}
var xxx_messageInfo_WatchRequest proto.InternalMessageInfo
func (m *WatchRequest) GetSpaceId() string {
if m != nil {
return m.SpaceId
}
return ""
}
func (m *WatchRequest) GetTreeId() string {
if m != nil {
return m.TreeId
}
return ""
}
type WatchResponse struct {
}
func (m *WatchResponse) Reset() { *m = WatchResponse{} }
func (m *WatchResponse) String() string { return proto.CompactTextString(m) }
func (*WatchResponse) ProtoMessage() {}
func (*WatchResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc31080c27db9707, []int{22}
}
func (m *WatchResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *WatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_WatchResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *WatchResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_WatchResponse.Merge(m, src)
}
func (m *WatchResponse) XXX_Size() int {
return m.Size()
}
func (m *WatchResponse) XXX_DiscardUnknown() {
xxx_messageInfo_WatchResponse.DiscardUnknown(m)
}
var xxx_messageInfo_WatchResponse proto.InternalMessageInfo
type UnwatchRequest struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"`
}
func (m *UnwatchRequest) Reset() { *m = UnwatchRequest{} }
func (m *UnwatchRequest) String() string { return proto.CompactTextString(m) }
func (*UnwatchRequest) ProtoMessage() {}
func (*UnwatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc31080c27db9707, []int{23}
}
func (m *UnwatchRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *UnwatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_UnwatchRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *UnwatchRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_UnwatchRequest.Merge(m, src)
}
func (m *UnwatchRequest) XXX_Size() int {
return m.Size()
}
func (m *UnwatchRequest) XXX_DiscardUnknown() {
xxx_messageInfo_UnwatchRequest.DiscardUnknown(m)
}
var xxx_messageInfo_UnwatchRequest proto.InternalMessageInfo
func (m *UnwatchRequest) GetSpaceId() string {
if m != nil {
return m.SpaceId
}
return ""
}
func (m *UnwatchRequest) GetTreeId() string {
if m != nil {
return m.TreeId
}
return ""
}
type UnwatchResponse struct {
}
func (m *UnwatchResponse) Reset() { *m = UnwatchResponse{} }
func (m *UnwatchResponse) String() string { return proto.CompactTextString(m) }
func (*UnwatchResponse) ProtoMessage() {}
func (*UnwatchResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc31080c27db9707, []int{24}
}
func (m *UnwatchResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *UnwatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_UnwatchResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *UnwatchResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_UnwatchResponse.Merge(m, src)
}
func (m *UnwatchResponse) XXX_Size() int {
return m.Size()
}
func (m *UnwatchResponse) XXX_DiscardUnknown() {
xxx_messageInfo_UnwatchResponse.DiscardUnknown(m)
}
var xxx_messageInfo_UnwatchResponse proto.InternalMessageInfo
func init() {
proto.RegisterType((*CreateSpaceRequest)(nil), "clientapi.CreateSpaceRequest")
proto.RegisterType((*CreateSpaceResponse)(nil), "clientapi.CreateSpaceResponse")
@ -1008,52 +1184,60 @@ func init() {
proto.RegisterType((*LoadSpaceResponse)(nil), "clientapi.LoadSpaceResponse")
proto.RegisterType((*TreeParamsRequest)(nil), "clientapi.TreeParamsRequest")
proto.RegisterType((*TreeParamsResponse)(nil), "clientapi.TreeParamsResponse")
proto.RegisterType((*WatchRequest)(nil), "clientapi.WatchRequest")
proto.RegisterType((*WatchResponse)(nil), "clientapi.WatchResponse")
proto.RegisterType((*UnwatchRequest)(nil), "clientapi.UnwatchRequest")
proto.RegisterType((*UnwatchResponse)(nil), "clientapi.UnwatchResponse")
}
func init() { proto.RegisterFile("api/apiproto/protos/api.proto", fileDescriptor_fc31080c27db9707) }
var fileDescriptor_fc31080c27db9707 = []byte{
// 630 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xda, 0x40,
0x10, 0x8e, 0x09, 0x49, 0xf0, 0x50, 0xf1, 0xb3, 0x49, 0x90, 0xeb, 0x06, 0x8b, 0xae, 0x94, 0x08,
0xa9, 0x11, 0xa8, 0xe9, 0xa9, 0xb7, 0x52, 0x50, 0x24, 0x54, 0x2a, 0xb5, 0x24, 0xbd, 0xf4, 0xb6,
0xc5, 0x2b, 0xc5, 0x12, 0x60, 0xd7, 0x36, 0x55, 0x4e, 0x7d, 0x86, 0x3e, 0x56, 0x8f, 0x39, 0xf6,
0x58, 0xc1, 0x6b, 0xf4, 0x50, 0xed, 0x7a, 0x6d, 0xef, 0xda, 0x26, 0x55, 0xc5, 0x05, 0x76, 0xfe,
0xbe, 0xf9, 0x76, 0x99, 0x6f, 0x80, 0x36, 0xf1, 0x9c, 0x3e, 0xf1, 0x1c, 0xcf, 0x77, 0x43, 0xb7,
0xcf, 0x3f, 0x03, 0x66, 0xf7, 0xf8, 0x11, 0xe9, 0xb3, 0xb9, 0x43, 0x97, 0x21, 0xf1, 0x1c, 0x7c,
0x02, 0x68, 0xe8, 0x53, 0x12, 0xd2, 0x1b, 0x8f, 0xcc, 0xe8, 0x94, 0x7e, 0x5d, 0xd1, 0x20, 0xc4,
0xe7, 0x70, 0xac, 0x78, 0x03, 0xcf, 0x5d, 0x06, 0x14, 0xd5, 0xa0, 0xe4, 0xd8, 0x86, 0xd6, 0xd1,
0xba, 0xfa, 0xb4, 0xe4, 0xd8, 0xac, 0x78, 0x44, 0x7d, 0xe7, 0x5b, 0xae, 0x58, 0xf1, 0x6e, 0x29,
0x7e, 0x09, 0xa7, 0x51, 0x8f, 0x91, 0x3b, 0x5b, 0x2d, 0xe8, 0x32, 0x14, 0xf5, 0xc8, 0x80, 0xa3,
0x80, 0x55, 0x8e, 0xe3, 0xec, 0xd8, 0xc4, 0x5d, 0x68, 0x65, 0x4b, 0xb6, 0x80, 0x7f, 0x84, 0xd3,
0x11, 0x9d, 0xd3, 0xff, 0x00, 0x47, 0x16, 0x80, 0x2d, 0x92, 0xc7, 0xb6, 0x51, 0xe2, 0x41, 0xc9,
0x83, 0x0d, 0x68, 0x65, 0x21, 0xa3, 0xe6, 0xf8, 0x3b, 0xd4, 0x06, 0xb6, 0x7d, 0x4b, 0xef, 0x77,
0xef, 0x82, 0x10, 0x94, 0x43, 0x7a, 0x1f, 0x1a, 0xfb, 0x3c, 0xc2, 0xcf, 0xac, 0xc6, 0x09, 0x6e,
0x96, 0xc4, 0x0b, 0xee, 0xdc, 0xd0, 0x28, 0x77, 0xb4, 0x6e, 0x65, 0x2a, 0x79, 0x30, 0x81, 0x7a,
0xd2, 0x5f, 0xbc, 0x87, 0xda, 0x46, 0xcb, 0xb5, 0x69, 0xc1, 0xe1, 0x1d, 0x25, 0x76, 0x42, 0x41,
0x58, 0xcc, 0xef, 0xbb, 0x2e, 0xab, 0x89, 0x08, 0x08, 0x0b, 0xbf, 0x83, 0xfa, 0x68, 0xb5, 0xf0,
0x6e, 0x7d, 0x4a, 0x77, 0x7f, 0xc9, 0x0b, 0x68, 0xa4, 0x60, 0x82, 0x30, 0x82, 0xb2, 0xbd, 0x5a,
0x78, 0x02, 0x8a, 0x9f, 0xf1, 0x0b, 0xa8, 0x0f, 0xe6, 0x73, 0x96, 0x16, 0xfc, 0x7b, 0x36, 0x2e,
0xa1, 0xcc, 0x32, 0xb3, 0x93, 0x80, 0x4e, 0xe0, 0x80, 0xdd, 0x2d, 0x30, 0x4a, 0x9d, 0xfd, 0xae,
0x3e, 0x8d, 0x0c, 0xfc, 0x1a, 0x1a, 0x29, 0xb4, 0xa0, 0x70, 0x0e, 0x07, 0x21, 0x73, 0x18, 0x5a,
0x67, 0xbf, 0x5b, 0xbd, 0xaa, 0xf7, 0x12, 0x95, 0xf4, 0x38, 0xd5, 0x28, 0x8a, 0x11, 0x2f, 0xe5,
0xb3, 0x1d, 0xd3, 0xc2, 0x7d, 0x68, 0x4a, 0x3e, 0x81, 0x67, 0x42, 0x45, 0x90, 0x8b, 0x20, 0xf5,
0x69, 0x62, 0xe3, 0x4b, 0x68, 0x4c, 0x5c, 0x62, 0xcb, 0xba, 0x79, 0xe4, 0x6e, 0xc7, 0xd0, 0x94,
0xb2, 0xc5, 0xd4, 0xbd, 0x87, 0x26, 0xa3, 0xf5, 0x81, 0xf8, 0x64, 0x11, 0xec, 0xfe, 0xa3, 0x5c,
0x03, 0x92, 0xe1, 0xc4, 0x1d, 0xd2, 0x79, 0xd0, 0xe4, 0x79, 0x60, 0x7d, 0xa2, 0x89, 0x89, 0xdf,
0x35, 0x36, 0xaf, 0xfe, 0x1c, 0x80, 0x3e, 0xe4, 0x0f, 0x37, 0xf0, 0x1c, 0x34, 0x81, 0xaa, 0xb4,
0x48, 0x50, 0x5b, 0x7a, 0xd3, 0xfc, 0xda, 0x31, 0xad, 0x6d, 0x61, 0xc1, 0x66, 0x02, 0x55, 0x69,
0xb3, 0x28, 0x68, 0xf9, 0x3d, 0xa4, 0xa0, 0x15, 0x2d, 0xa4, 0x4f, 0x50, 0x53, 0xb7, 0x09, 0xea,
0xe4, 0xfa, 0x67, 0xd6, 0x87, 0xf9, 0xfc, 0x91, 0x8c, 0x14, 0x56, 0xdd, 0x13, 0x0a, 0x6c, 0xe1,
0x56, 0x52, 0x60, 0x8b, 0x97, 0x0c, 0x7a, 0x03, 0x47, 0x42, 0xe4, 0xe8, 0xa9, 0x94, 0xad, 0x2e,
0x1e, 0xd3, 0x2c, 0x0a, 0x09, 0x84, 0x21, 0x54, 0x62, 0xd9, 0x21, 0x39, 0x2f, 0x23, 0x6c, 0xf3,
0x59, 0x61, 0x4c, 0x80, 0x8c, 0x01, 0xd2, 0x31, 0x41, 0x67, 0x19, 0x8d, 0x28, 0xc3, 0x68, 0xb6,
0xb7, 0x44, 0x53, 0x3e, 0xb1, 0x06, 0x15, 0x3e, 0x19, 0xcd, 0x2b, 0x7c, 0x72, 0xa2, 0xbd, 0x06,
0x3d, 0x51, 0x1e, 0xca, 0x64, 0x2a, 0x1a, 0x35, 0xcf, 0x8a, 0x83, 0x29, 0x4e, 0x22, 0x31, 0x05,
0x27, 0x2b, 0x53, 0x05, 0x27, 0xa7, 0xca, 0xb7, 0x17, 0x3f, 0xd7, 0x96, 0xf6, 0xb0, 0xb6, 0xb4,
0xdf, 0x6b, 0x4b, 0xfb, 0xb1, 0xb1, 0xf6, 0x1e, 0x36, 0xd6, 0xde, 0xaf, 0x8d, 0xb5, 0xf7, 0xf9,
0x89, 0xfc, 0x9f, 0xfc, 0xe5, 0x90, 0x7f, 0xbd, 0xfa, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x82, 0x87,
0xed, 0x38, 0xaa, 0x07, 0x00, 0x00,
// 700 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xcd, 0x4e, 0xdb, 0x40,
0x10, 0xc6, 0x21, 0x04, 0x3c, 0xd0, 0x98, 0x2c, 0x90, 0xba, 0x2e, 0x58, 0xe9, 0x4a, 0xa0, 0x48,
0x45, 0xa0, 0xd2, 0x53, 0x7b, 0xe2, 0x27, 0x42, 0x42, 0x4d, 0xa5, 0x36, 0x80, 0x2a, 0xf5, 0xb6,
0x8d, 0x57, 0x8a, 0xa5, 0x24, 0x76, 0x6d, 0xa7, 0xe5, 0xd4, 0x67, 0xe8, 0x63, 0xf5, 0xc8, 0xb1,
0xc7, 0x2a, 0x79, 0x8b, 0x9e, 0xaa, 0x5d, 0xaf, 0xed, 0x5d, 0xdb, 0x01, 0x55, 0xb9, 0x80, 0xe7,
0xef, 0x9b, 0x6f, 0x77, 0x67, 0x3e, 0x05, 0xf6, 0x88, 0xef, 0x1e, 0x13, 0xdf, 0xf5, 0x03, 0x2f,
0xf2, 0x8e, 0xf9, 0xdf, 0x90, 0xd9, 0x47, 0xfc, 0x13, 0xe9, 0xfd, 0xa1, 0x4b, 0xc7, 0x11, 0xf1,
0x5d, 0xbc, 0x0d, 0xe8, 0x22, 0xa0, 0x24, 0xa2, 0xd7, 0x3e, 0xe9, 0xd3, 0x1e, 0xfd, 0x3a, 0xa1,
0x61, 0x84, 0xf7, 0x61, 0x4b, 0xf1, 0x86, 0xbe, 0x37, 0x0e, 0x29, 0xaa, 0x43, 0xc5, 0x75, 0x4c,
0xad, 0xa5, 0xb5, 0xf5, 0x5e, 0xc5, 0x75, 0x58, 0x71, 0x87, 0x06, 0xee, 0xb7, 0x42, 0xb1, 0xe2,
0x9d, 0x53, 0xfc, 0x0a, 0x76, 0xe2, 0x1e, 0x1d, 0xaf, 0x3f, 0x19, 0xd1, 0x71, 0x24, 0xea, 0x91,
0x09, 0xab, 0x21, 0xab, 0xbc, 0x4a, 0xb2, 0x13, 0x13, 0xb7, 0xa1, 0x99, 0x2f, 0x99, 0x03, 0xfe,
0x11, 0x76, 0x3a, 0x74, 0x48, 0xff, 0x03, 0x1c, 0xd9, 0x00, 0x8e, 0x48, 0xbe, 0x72, 0xcc, 0x0a,
0x0f, 0x4a, 0x1e, 0x6c, 0x42, 0x33, 0x0f, 0x19, 0x37, 0xc7, 0x3f, 0xa0, 0x7e, 0xe6, 0x38, 0x37,
0xf4, 0x6e, 0xf1, 0x2e, 0x08, 0x41, 0x35, 0xa2, 0x77, 0x91, 0xb9, 0xcc, 0x23, 0xfc, 0x9b, 0xd5,
0xb8, 0xe1, 0xf5, 0x98, 0xf8, 0xe1, 0xc0, 0x8b, 0xcc, 0x6a, 0x4b, 0x6b, 0xaf, 0xf5, 0x24, 0x0f,
0x26, 0x60, 0xa4, 0xfd, 0xc5, 0x7d, 0xa8, 0x6d, 0xb4, 0x42, 0x9b, 0x26, 0xd4, 0x06, 0x94, 0x38,
0x29, 0x05, 0x61, 0x31, 0x7f, 0xe0, 0x79, 0xac, 0x26, 0x26, 0x20, 0x2c, 0xfc, 0x0e, 0x8c, 0xce,
0x64, 0xe4, 0xdf, 0x04, 0x94, 0x2e, 0x7e, 0x93, 0x07, 0xb0, 0x99, 0x81, 0x09, 0xc2, 0x08, 0xaa,
0xce, 0x64, 0xe4, 0x0b, 0x28, 0xfe, 0x8d, 0x5f, 0x82, 0x71, 0x36, 0x1c, 0xb2, 0xb4, 0xf0, 0xf1,
0xd9, 0x38, 0x84, 0x2a, 0xcb, 0xcc, 0x4f, 0x02, 0xda, 0x86, 0x15, 0x76, 0xb6, 0xd0, 0xac, 0xb4,
0x96, 0xdb, 0x7a, 0x2f, 0x36, 0xf0, 0x1b, 0xd8, 0xcc, 0xa0, 0x05, 0x85, 0x7d, 0x58, 0x89, 0x98,
0xc3, 0xd4, 0x5a, 0xcb, 0xed, 0xf5, 0x13, 0xe3, 0x28, 0xdd, 0x92, 0x23, 0x4e, 0x35, 0x8e, 0x62,
0xc4, 0x4b, 0xf9, 0x6c, 0x27, 0xb4, 0xf0, 0x31, 0x34, 0x24, 0x9f, 0xc0, 0xb3, 0x60, 0x4d, 0x90,
0x8b, 0x21, 0xf5, 0x5e, 0x6a, 0xe3, 0x43, 0xd8, 0xec, 0x7a, 0xc4, 0x91, 0xf7, 0xe6, 0x81, 0xb3,
0x6d, 0x41, 0x43, 0xca, 0x16, 0x53, 0xf7, 0x1e, 0x1a, 0x8c, 0xd6, 0x07, 0x12, 0x90, 0x51, 0xb8,
0xf8, 0xa3, 0x5c, 0x02, 0x92, 0xe1, 0xc4, 0x19, 0xb2, 0x79, 0xd0, 0xe4, 0x79, 0x60, 0x7d, 0xe2,
0x89, 0x49, 0xee, 0x35, 0x31, 0xf1, 0x29, 0x6c, 0x7c, 0x22, 0x51, 0x7f, 0xf0, 0x38, 0xa3, 0x26,
0xd4, 0xd8, 0x8d, 0x66, 0x33, 0x18, 0x5b, 0xd8, 0x80, 0x27, 0x02, 0x41, 0x9c, 0xf4, 0x1c, 0xea,
0xb7, 0xe3, 0xef, 0x8b, 0x81, 0x36, 0xc0, 0x48, 0x31, 0x62, 0xd8, 0x93, 0xbf, 0x35, 0xd0, 0x2f,
0xf8, 0x13, 0x9f, 0xf9, 0x2e, 0xea, 0xc2, 0xba, 0x24, 0x79, 0x68, 0x4f, 0x7a, 0xfd, 0xa2, 0x40,
0x5a, 0xf6, 0xbc, 0xb0, 0xb8, 0xb7, 0x2e, 0xac, 0x4b, 0x1a, 0xa8, 0xa0, 0x15, 0x15, 0x53, 0x41,
0x2b, 0x93, 0xce, 0x5b, 0xa8, 0xab, 0xba, 0x87, 0x5a, 0x85, 0xfe, 0x39, 0xa1, 0xb3, 0x5e, 0x3c,
0x90, 0x91, 0xc1, 0xaa, 0x8a, 0xa6, 0xc0, 0x96, 0xea, 0xa7, 0x02, 0x5b, 0x2e, 0x87, 0xe8, 0x14,
0x56, 0x85, 0x1c, 0xa1, 0x67, 0x52, 0xb6, 0x2a, 0x91, 0x96, 0x55, 0x16, 0x12, 0x08, 0x17, 0xb0,
0x96, 0x08, 0x04, 0x92, 0xf3, 0x72, 0x12, 0x64, 0x3d, 0x2f, 0x8d, 0x09, 0x90, 0x2b, 0x80, 0x6c,
0xa0, 0xd1, 0x6e, 0x6e, 0x9b, 0x95, 0xb5, 0xb1, 0xf6, 0xe6, 0x44, 0x33, 0x3e, 0x89, 0x5a, 0x28,
0x7c, 0x72, 0xea, 0xa4, 0xf0, 0x29, 0xc8, 0xcb, 0x25, 0xe8, 0xa9, 0x46, 0xa0, 0x5c, 0xa6, 0xa2,
0x26, 0xd6, 0x6e, 0x79, 0x30, 0xc3, 0x49, 0xc5, 0x40, 0xc1, 0xc9, 0x0b, 0x8a, 0x82, 0x53, 0xd0,
0x0f, 0xf4, 0x16, 0x56, 0xf8, 0x9a, 0xa1, 0xa7, 0x52, 0x9a, 0xbc, 0xba, 0x96, 0x59, 0x0c, 0x64,
0x4f, 0x2c, 0xb6, 0x49, 0x79, 0x62, 0x75, 0x4b, 0x95, 0x27, 0xce, 0x2d, 0xdf, 0xf9, 0xc1, 0xaf,
0xa9, 0xad, 0xdd, 0x4f, 0x6d, 0xed, 0xcf, 0xd4, 0xd6, 0x7e, 0xce, 0xec, 0xa5, 0xfb, 0x99, 0xbd,
0xf4, 0x7b, 0x66, 0x2f, 0x7d, 0xde, 0x90, 0x7f, 0xbb, 0x7c, 0xa9, 0xf1, 0x7f, 0xaf, 0xff, 0x05,
0x00, 0x00, 0xff, 0xff, 0x7d, 0xa4, 0xb1, 0xae, 0xd2, 0x08, 0x00, 0x00,
}
func (m *CreateSpaceRequest) Marshal() (dAtA []byte, err error) {
@ -1737,6 +1921,126 @@ func (m *TreeParamsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *WatchRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WatchRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *WatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.TreeId) > 0 {
i -= len(m.TreeId)
copy(dAtA[i:], m.TreeId)
i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId)))
i--
dAtA[i] = 0x12
}
if len(m.SpaceId) > 0 {
i -= len(m.SpaceId)
copy(dAtA[i:], m.SpaceId)
i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *WatchResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WatchResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *WatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func (m *UnwatchRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *UnwatchRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *UnwatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.TreeId) > 0 {
i -= len(m.TreeId)
copy(dAtA[i:], m.TreeId)
i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId)))
i--
dAtA[i] = 0x12
}
if len(m.SpaceId) > 0 {
i -= len(m.SpaceId)
copy(dAtA[i:], m.SpaceId)
i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *UnwatchResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *UnwatchResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *UnwatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func encodeVarintApi(dAtA []byte, offset int, v uint64) int {
offset -= sovApi(v)
base := offset
@ -2048,6 +2352,58 @@ func (m *TreeParamsResponse) Size() (n int) {
return n
}
func (m *WatchRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.SpaceId)
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
l = len(m.TreeId)
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
return n
}
func (m *WatchResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func (m *UnwatchRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.SpaceId)
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
l = len(m.TreeId)
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
return n
}
func (m *UnwatchResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func sovApi(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@ -3926,6 +4282,334 @@ func (m *TreeParamsResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *WatchRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WatchRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WatchRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SpaceId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TreeId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *WatchResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WatchResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WatchResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *UnwatchRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: UnwatchRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: UnwatchRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SpaceId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TreeId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *UnwatchResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: UnwatchResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: UnwatchResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipApi(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0

View File

@ -50,6 +50,8 @@ type DRPCClientApiClient interface {
AllTrees(ctx context.Context, in *AllTreesRequest) (*AllTreesResponse, error)
AllSpaces(ctx context.Context, in *AllSpacesRequest) (*AllSpacesResponse, error)
LoadSpace(ctx context.Context, in *LoadSpaceRequest) (*LoadSpaceResponse, error)
Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error)
Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error)
}
type drpcClientApiClient struct {
@ -152,6 +154,24 @@ func (c *drpcClientApiClient) LoadSpace(ctx context.Context, in *LoadSpaceReques
return out, nil
}
func (c *drpcClientApiClient) Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error) {
out := new(WatchResponse)
err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error) {
out := new(UnwatchResponse)
err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCClientApiServer interface {
CreateSpace(context.Context, *CreateSpaceRequest) (*CreateSpaceResponse, error)
DeriveSpace(context.Context, *DeriveSpaceRequest) (*DeriveSpaceResponse, error)
@ -163,6 +183,8 @@ type DRPCClientApiServer interface {
AllTrees(context.Context, *AllTreesRequest) (*AllTreesResponse, error)
AllSpaces(context.Context, *AllSpacesRequest) (*AllSpacesResponse, error)
LoadSpace(context.Context, *LoadSpaceRequest) (*LoadSpaceResponse, error)
Watch(context.Context, *WatchRequest) (*WatchResponse, error)
Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error)
}
type DRPCClientApiUnimplementedServer struct{}
@ -207,9 +229,17 @@ func (s *DRPCClientApiUnimplementedServer) LoadSpace(context.Context, *LoadSpace
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) Watch(context.Context, *WatchRequest) (*WatchResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCClientApiDescription struct{}
func (DRPCClientApiDescription) NumMethods() int { return 10 }
func (DRPCClientApiDescription) NumMethods() int { return 12 }
func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
@ -303,6 +333,24 @@ func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Recei
in1.(*LoadSpaceRequest),
)
}, DRPCClientApiServer.LoadSpace, true
case 10:
return "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
Watch(
ctx,
in1.(*WatchRequest),
)
}, DRPCClientApiServer.Watch, true
case 11:
return "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
Unwatch(
ctx,
in1.(*UnwatchRequest),
)
}, DRPCClientApiServer.Unwatch, true
default:
return "", nil, nil, nil, false
}
@ -471,3 +519,35 @@ func (x *drpcClientApi_LoadSpaceStream) SendAndClose(m *LoadSpaceResponse) error
}
return x.CloseSend()
}
type DRPCClientApi_WatchStream interface {
drpc.Stream
SendAndClose(*WatchResponse) error
}
type drpcClientApi_WatchStream struct {
drpc.Stream
}
func (x *drpcClientApi_WatchStream) SendAndClose(m *WatchResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_UnwatchStream interface {
drpc.Stream
SendAndClose(*UnwatchResponse) error
}
type drpcClientApi_UnwatchStream struct {
drpc.Stream
}
func (x *drpcClientApi_UnwatchStream) SendAndClose(m *UnwatchResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}

View File

@ -14,6 +14,8 @@ service ClientApi {
rpc AllTrees(AllTreesRequest) returns(AllTreesResponse);
rpc AllSpaces(AllSpacesRequest) returns(AllSpacesResponse);
rpc LoadSpace(LoadSpaceRequest) returns(LoadSpaceResponse);
rpc Watch(WatchRequest) returns(WatchResponse);
rpc Unwatch(UnwatchRequest) returns(UnwatchResponse);
}
message CreateSpaceRequest {
@ -103,4 +105,20 @@ message TreeParamsRequest {
message TreeParamsResponse {
string rootId = 1;
repeated string headIds = 2;
}
message WatchRequest {
string spaceId = 1;
string treeId = 2;
}
message WatchResponse {
}
message UnwatchRequest {
string spaceId = 1;
string treeId = 2;
}
message UnwatchResponse {
}

View File

@ -10,6 +10,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric"
"math/rand"
"sync"
)
type rpcHandler struct {
@ -17,6 +18,49 @@ type rpcHandler struct {
storageService storage.ClientStorage
docService document.Service
account account.Service
treeWatcher *watcher
sync.Mutex
}
func (r *rpcHandler) Watch(ctx context.Context, request *apiproto.WatchRequest) (resp *apiproto.WatchResponse, err error) {
space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId)
if err != nil {
return
}
r.Lock()
defer r.Unlock()
ch := make(chan bool)
r.treeWatcher = newWatcher(request.SpaceId, request.TreeId, ch)
space.StatusService().Watch(request.TreeId, ch)
go r.treeWatcher.run()
resp = &apiproto.WatchResponse{}
return
}
func (r *rpcHandler) Unwatch(ctx context.Context, request *apiproto.UnwatchRequest) (resp *apiproto.UnwatchResponse, err error) {
space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId)
if err != nil {
return
}
var treeWatcher *watcher
space.StatusService().Unwatch(request.TreeId)
r.Lock()
if r.treeWatcher != nil {
treeWatcher = r.treeWatcher
}
r.Unlock()
treeWatcher.close()
r.Lock()
if r.treeWatcher == treeWatcher {
r.treeWatcher = nil
}
r.Unlock()
resp = &apiproto.UnwatchResponse{}
return
}
func (r *rpcHandler) LoadSpace(ctx context.Context, request *apiproto.LoadSpaceRequest) (resp *apiproto.LoadSpaceResponse, err error) {

View File

@ -67,7 +67,12 @@ func (s *service) Run(ctx context.Context) (err error) {
if err != nil {
return
}
return apiproto.DRPCRegisterClientApi(s, &rpcHandler{s.spaceService, s.storageService, s.docService, s.account})
return apiproto.DRPCRegisterClientApi(s, &rpcHandler{
spaceService: s.spaceService,
storageService: s.storageService,
docService: s.docService,
account: s.account,
})
}
func (s *service) Close(ctx context.Context) (err error) {

37
client/api/watcher.go Normal file
View File

@ -0,0 +1,37 @@
package api
import "go.uber.org/zap"
type watcher struct {
spaceId string
treeId string
watcher chan bool
watcherDone chan struct{}
}
func newWatcher(spaceId, treeId string, ch chan bool) *watcher {
return &watcher{
spaceId: spaceId,
treeId: treeId,
watcher: ch,
watcherDone: make(chan struct{}),
}
}
func (w *watcher) run() {
log := log.With(zap.String("spaceId", w.spaceId), zap.String("treeId", w.treeId))
log.Debug("started watching")
defer close(w.watcherDone)
for {
synced, ok := <-w.watcher
if !ok {
log.Debug("stopped watching")
return
}
log.With(zap.Bool("synced", synced)).Debug("updated sync status")
}
}
func (w *watcher) close() {
<-w.watcherDone
}

View File

@ -8,14 +8,19 @@ require (
github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000
github.com/dgraph-io/badger/v3 v3.2103.3
github.com/gogo/protobuf v1.3.2
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.23.0
storj.io/drpc v0.0.32
)
require (
github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 // indirect
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
@ -34,7 +39,6 @@ require (
github.com/klauspost/cpuid/v2 v2.1.1 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.23.2 // indirect
github.com/libp2p/go-libp2p-core v0.20.1 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
@ -49,6 +53,7 @@ require (
github.com/multiformats/go-multihash v0.2.1 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
@ -60,12 +65,12 @@ require (
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5 // indirect
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
storj.io/drpc v0.0.32 // indirect
)

View File

@ -40,6 +40,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 h1:yIyGIb7bRkEngKtQ0Ja5bome2SEnErwTaEvR8dA/WtU=
github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3/go.mod h1:w0i62cRB2jVpjFb2CpPNj5J+ihKqqmBBG9X2+Odekjw=
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 h1:kMPPZYmJgbs4AJfodbg2OCXg5cp+9LPAJcLZJqmcghk=
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232/go.mod h1:+PeHBAWp7gUh/yw6uAauKc5ku0w4cFNg6DUddGxoGq0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@ -56,6 +58,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c h1:+bD75daSbsxyTzkKpNplC4xls+7/tGwty+zruzOnOmk=
github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -214,8 +218,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE=
github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw=
github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@ -305,12 +307,15 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -353,8 +358,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -365,6 +370,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg=

View File

@ -7,6 +7,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter"
@ -108,12 +109,20 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast()
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
var statusService statusservice.StatusService
// this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) {
statusService = statusservice.NewStatusService(st.Id(), lastConfiguration)
}
diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log)
syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod)
sp := &space{
id: id,
syncService: syncService,
diffService: diffService,
statusService: statusService,
cache: s.treeGetter,
account: s.account,
configuration: lastConfiguration,

View File

@ -9,6 +9,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
@ -80,6 +81,8 @@ type Space interface {
BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (tree.ObjectTree, error)
DeleteTree(ctx context.Context, id string) (err error)
StatusService() statusservice.StatusService
Close() error
}
@ -92,6 +95,7 @@ type space struct {
syncService syncservice.SyncService
diffService diffservice.DiffService
statusService statusservice.StatusService
storage storage.SpaceStorage
cache treegetter.TreeGetter
account account.Service
@ -202,6 +206,10 @@ func (s *space) DiffService() diffservice.DiffService {
return s.diffService
}
func (s *space) StatusService() statusservice.StatusService {
return s.statusService
}
func (s *space) StoredIds() []string {
return s.diffService.AllIds()
}
@ -222,6 +230,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
StatusService: s.statusService,
}
return synctree.DeriveSyncTree(ctx, deps)
}
@ -238,6 +247,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay
Configuration: s.configuration,
AclList: s.aclList,
SpaceStorage: s.storage,
StatusService: s.statusService,
}
return synctree.CreateSyncTree(ctx, deps)
}
@ -256,6 +266,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene
AclList: s.aclList,
SpaceStorage: s.storage,
TreeUsage: &s.treesUsed,
StatusService: s.statusService,
}
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}
@ -286,5 +297,10 @@ func (s *space) Close() error {
if err := s.storage.Close(); err != nil {
mError.Add(err)
}
if s.statusService != nil {
if err := s.statusService.Close(); err != nil {
mError.Add(err)
}
}
return mError.Err()
}

View File

@ -17,6 +17,7 @@ type StatusService interface {
Unwatch(treeId string)
StateCounter() uint64
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
Close() error
}
type statusEntry struct {
@ -31,6 +32,7 @@ type statusService struct {
watchers map[string]chan bool
configuration nodeconf.Configuration
stateCounter uint64
closed bool
}
func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService {
@ -46,6 +48,10 @@ func NewStatusService(spaceId string, configuration nodeconf.Configuration) Stat
func (s *statusService) HeadsChange(treeId string, heads []string) {
s.Lock()
defer s.Unlock()
if s.closed {
return
}
s.treeHeads[treeId] = statusEntry{
head: heads[0],
stateCounter: s.stateCounter,
@ -62,6 +68,10 @@ func (s *statusService) HeadsChange(treeId string, heads []string) {
func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
s.Lock()
defer s.Unlock()
if s.closed {
return
}
curHead, ok := s.treeHeads[treeId]
if !ok {
return
@ -84,13 +94,22 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) {
func (s *statusService) Watch(treeId string, ch chan bool) {
s.Lock()
defer s.Unlock()
if s.closed {
return
}
s.watchers[treeId] = ch
}
func (s *statusService) Unwatch(treeId string) {
s.Lock()
defer s.Unlock()
delete(s.watchers, treeId)
if s.closed {
return
}
if ch, ok := s.watchers[treeId]; ok {
close(ch)
delete(s.watchers, treeId)
}
}
func (s *statusService) StateCounter() uint64 {
@ -99,6 +118,18 @@ func (s *statusService) StateCounter() uint64 {
return s.stateCounter
}
func (s *statusService) Close() (err error) {
s.Lock()
defer s.Unlock()
if s.closed {
return
}
for _, ch := range s.watchers {
close(ch)
}
return
}
func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) {
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
return

View File

@ -63,6 +63,7 @@ type CreateDeps struct {
SyncService syncservice.SyncService
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
StatusService statusservice.StatusService
}
type BuildDeps struct {
@ -75,9 +76,14 @@ type BuildDeps struct {
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
TreeUsage *atomic.Int32
StatusService statusservice.StatusService
}
func newWrappedSyncClient(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient {
func newWrappedSyncClient(
spaceId string,
factory RequestFactory,
syncService syncservice.SyncService,
configuration nodeconf.Configuration) SyncClient {
syncClient := newSyncClient(spaceId, syncService.StreamPool(), factory, configuration, syncService.StreamChecker())
return newQueuedClient(syncClient, syncService.ActionQueue())
}
@ -95,6 +101,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
deps.Configuration)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads())
syncClient.BroadcastAsync(headUpdate)
id = objTree.ID()
return
@ -112,6 +119,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error)
deps.Configuration)
headUpdate := syncClient.CreateHeadUpdate(objTree, nil)
deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads())
syncClient.BroadcastAsync(headUpdate)
id = objTree.ID()
return
@ -197,13 +205,14 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
deps.SyncService,
deps.Configuration)
syncTree := &syncTree{
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
treeUsage: deps.TreeUsage,
listener: deps.Listener,
ObjectTree: objTree,
syncClient: syncClient,
notifiable: deps.HeadNotifiable,
treeUsage: deps.TreeUsage,
listener: deps.Listener,
statusService: deps.StatusService,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.StatusService)
syncTree.SyncHandler = syncHandler
t = syncTree
syncTree.Lock()

View File

@ -23,11 +23,12 @@ type syncTreeHandler struct {
const maxQueueSize = 5
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler {
func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient, statusService statusservice.StatusService) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncClient: syncClient,
queue: newReceiveQueue(maxQueueSize),
objTree: objTree,
syncClient: syncClient,
statusService: statusService,
queue: newReceiveQueue(maxQueueSize),
}
}

View File

@ -57,6 +57,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
google.golang.org/protobuf v1.28.1 // indirect

View File

@ -326,6 +326,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4=
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg=

View File

@ -24,6 +24,8 @@ type Service interface {
AllTrees(ctx context.Context, ip string, request *apiproto.AllTreesRequest) (resp *apiproto.AllTreesResponse, err error)
AllSpaces(ctx context.Context, ip string, request *apiproto.AllSpacesRequest) (resp *apiproto.AllSpacesResponse, err error)
LoadSpace(ctx context.Context, ip string, request *apiproto.LoadSpaceRequest) (res *apiproto.LoadSpaceResponse, err error)
Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error)
Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error)
}
type service struct {
@ -122,3 +124,19 @@ func (s *service) LoadSpace(ctx context.Context, ip string, request *apiproto.Lo
}
return cl.LoadSpace(ctx, request)
}
func (s *service) Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error) {
cl, err := s.client.GetClient(ctx, ip)
if err != nil {
return
}
return cl.Watch(ctx, request)
}
func (s *service) Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error) {
cl, err := s.client.GetClient(ctx, ip)
if err != nil {
return
}
return cl.Unwatch(ctx, request)
}

View File

@ -290,4 +290,60 @@ func (s *service) registerClientCommands() {
},
}
s.clientCommands = append(s.clientCommands, cmdAllSpaces)
cmdTreeWatch := &cobra.Command{
Use: "tree-watch [document]",
Short: "start watching the tree (prints in logs the status on the client side)",
Args: cobra.RangeArgs(1, 1),
Run: func(cmd *cobra.Command, args []string) {
cli, _ := cmd.Flags().GetString("client")
space, _ := cmd.Flags().GetString("space")
addr, ok := s.peers[cli]
if !ok {
fmt.Println("no such client")
return
}
_, err := s.client.Watch(context.Background(), addr, &clientproto.WatchRequest{
SpaceId: space,
TreeId: args[0],
})
if err != nil {
fmt.Println("couldn't start watching tree", err)
return
}
fmt.Println(args[0])
},
}
cmdTreeWatch.Flags().String("space", "", "the space where something is happening :-)")
cmdTreeWatch.MarkFlagRequired("space")
s.clientCommands = append(s.clientCommands, cmdTreeWatch)
cmdTreeUnwatch := &cobra.Command{
Use: "tree-unwatch [document]",
Short: "stop watching the tree (prints in logs the status on the client side)",
Args: cobra.RangeArgs(1, 1),
Run: func(cmd *cobra.Command, args []string) {
cli, _ := cmd.Flags().GetString("client")
space, _ := cmd.Flags().GetString("space")
addr, ok := s.peers[cli]
if !ok {
fmt.Println("no such client")
return
}
_, err := s.client.Unwatch(context.Background(), addr, &clientproto.UnwatchRequest{
SpaceId: space,
TreeId: args[0],
})
if err != nil {
fmt.Println("couldn't stop watching tree", err)
return
}
fmt.Println(args[0])
},
}
cmdTreeUnwatch.Flags().String("space", "", "the space where something is happening :-)")
cmdTreeUnwatch.MarkFlagRequired("space")
s.clientCommands = append(s.clientCommands, cmdTreeUnwatch)
}