diff --git a/client/api/apiproto/api.pb.go b/client/api/apiproto/api.pb.go index e0fd415f..cb7865a0 100644 --- a/client/api/apiproto/api.pb.go +++ b/client/api/apiproto/api.pb.go @@ -986,6 +986,182 @@ func (m *TreeParamsResponse) GetHeadIds() []string { return nil } +type WatchRequest struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"` +} + +func (m *WatchRequest) Reset() { *m = WatchRequest{} } +func (m *WatchRequest) String() string { return proto.CompactTextString(m) } +func (*WatchRequest) ProtoMessage() {} +func (*WatchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_fc31080c27db9707, []int{21} +} +func (m *WatchRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *WatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchRequest.Merge(m, src) +} +func (m *WatchRequest) XXX_Size() int { + return m.Size() +} +func (m *WatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WatchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WatchRequest proto.InternalMessageInfo + +func (m *WatchRequest) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +func (m *WatchRequest) GetTreeId() string { + if m != nil { + return m.TreeId + } + return "" +} + +type WatchResponse struct { +} + +func (m *WatchResponse) Reset() { *m = WatchResponse{} } +func (m *WatchResponse) String() string { return proto.CompactTextString(m) } +func (*WatchResponse) ProtoMessage() {} +func (*WatchResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fc31080c27db9707, []int{22} +} +func (m *WatchResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WatchResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *WatchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchResponse.Merge(m, src) +} +func (m *WatchResponse) XXX_Size() int { + return m.Size() +} +func (m *WatchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WatchResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_WatchResponse proto.InternalMessageInfo + +type UnwatchRequest struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"` +} + +func (m *UnwatchRequest) Reset() { *m = UnwatchRequest{} } +func (m *UnwatchRequest) String() string { return proto.CompactTextString(m) } +func (*UnwatchRequest) ProtoMessage() {} +func (*UnwatchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_fc31080c27db9707, []int{23} +} +func (m *UnwatchRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *UnwatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_UnwatchRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *UnwatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnwatchRequest.Merge(m, src) +} +func (m *UnwatchRequest) XXX_Size() int { + return m.Size() +} +func (m *UnwatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnwatchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnwatchRequest proto.InternalMessageInfo + +func (m *UnwatchRequest) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +func (m *UnwatchRequest) GetTreeId() string { + if m != nil { + return m.TreeId + } + return "" +} + +type UnwatchResponse struct { +} + +func (m *UnwatchResponse) Reset() { *m = UnwatchResponse{} } +func (m *UnwatchResponse) String() string { return proto.CompactTextString(m) } +func (*UnwatchResponse) ProtoMessage() {} +func (*UnwatchResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fc31080c27db9707, []int{24} +} +func (m *UnwatchResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *UnwatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_UnwatchResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *UnwatchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnwatchResponse.Merge(m, src) +} +func (m *UnwatchResponse) XXX_Size() int { + return m.Size() +} +func (m *UnwatchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_UnwatchResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_UnwatchResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*CreateSpaceRequest)(nil), "clientapi.CreateSpaceRequest") proto.RegisterType((*CreateSpaceResponse)(nil), "clientapi.CreateSpaceResponse") @@ -1008,52 +1184,60 @@ func init() { proto.RegisterType((*LoadSpaceResponse)(nil), "clientapi.LoadSpaceResponse") proto.RegisterType((*TreeParamsRequest)(nil), "clientapi.TreeParamsRequest") proto.RegisterType((*TreeParamsResponse)(nil), "clientapi.TreeParamsResponse") + proto.RegisterType((*WatchRequest)(nil), "clientapi.WatchRequest") + proto.RegisterType((*WatchResponse)(nil), "clientapi.WatchResponse") + proto.RegisterType((*UnwatchRequest)(nil), "clientapi.UnwatchRequest") + proto.RegisterType((*UnwatchResponse)(nil), "clientapi.UnwatchResponse") } func init() { proto.RegisterFile("api/apiproto/protos/api.proto", fileDescriptor_fc31080c27db9707) } var fileDescriptor_fc31080c27db9707 = []byte{ - // 630 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xda, 0x40, - 0x10, 0x8e, 0x09, 0x49, 0xf0, 0x50, 0xf1, 0xb3, 0x49, 0x90, 0xeb, 0x06, 0x8b, 0xae, 0x94, 0x08, - 0xa9, 0x11, 0xa8, 0xe9, 0xa9, 0xb7, 0x52, 0x50, 0x24, 0x54, 0x2a, 0xb5, 0x24, 0xbd, 0xf4, 0xb6, - 0xc5, 0x2b, 0xc5, 0x12, 0x60, 0xd7, 0x36, 0x55, 0x4e, 0x7d, 0x86, 0x3e, 0x56, 0x8f, 0x39, 0xf6, - 0x58, 0xc1, 0x6b, 0xf4, 0x50, 0xed, 0x7a, 0x6d, 0xef, 0xda, 0x26, 0x55, 0xc5, 0x05, 0x76, 0xfe, - 0xbe, 0xf9, 0x76, 0x99, 0x6f, 0x80, 0x36, 0xf1, 0x9c, 0x3e, 0xf1, 0x1c, 0xcf, 0x77, 0x43, 0xb7, - 0xcf, 0x3f, 0x03, 0x66, 0xf7, 0xf8, 0x11, 0xe9, 0xb3, 0xb9, 0x43, 0x97, 0x21, 0xf1, 0x1c, 0x7c, - 0x02, 0x68, 0xe8, 0x53, 0x12, 0xd2, 0x1b, 0x8f, 0xcc, 0xe8, 0x94, 0x7e, 0x5d, 0xd1, 0x20, 0xc4, - 0xe7, 0x70, 0xac, 0x78, 0x03, 0xcf, 0x5d, 0x06, 0x14, 0xd5, 0xa0, 0xe4, 0xd8, 0x86, 0xd6, 0xd1, - 0xba, 0xfa, 0xb4, 0xe4, 0xd8, 0xac, 0x78, 0x44, 0x7d, 0xe7, 0x5b, 0xae, 0x58, 0xf1, 0x6e, 0x29, - 0x7e, 0x09, 0xa7, 0x51, 0x8f, 0x91, 0x3b, 0x5b, 0x2d, 0xe8, 0x32, 0x14, 0xf5, 0xc8, 0x80, 0xa3, - 0x80, 0x55, 0x8e, 0xe3, 0xec, 0xd8, 0xc4, 0x5d, 0x68, 0x65, 0x4b, 0xb6, 0x80, 0x7f, 0x84, 0xd3, - 0x11, 0x9d, 0xd3, 0xff, 0x00, 0x47, 0x16, 0x80, 0x2d, 0x92, 0xc7, 0xb6, 0x51, 0xe2, 0x41, 0xc9, - 0x83, 0x0d, 0x68, 0x65, 0x21, 0xa3, 0xe6, 0xf8, 0x3b, 0xd4, 0x06, 0xb6, 0x7d, 0x4b, 0xef, 0x77, - 0xef, 0x82, 0x10, 0x94, 0x43, 0x7a, 0x1f, 0x1a, 0xfb, 0x3c, 0xc2, 0xcf, 0xac, 0xc6, 0x09, 0x6e, - 0x96, 0xc4, 0x0b, 0xee, 0xdc, 0xd0, 0x28, 0x77, 0xb4, 0x6e, 0x65, 0x2a, 0x79, 0x30, 0x81, 0x7a, - 0xd2, 0x5f, 0xbc, 0x87, 0xda, 0x46, 0xcb, 0xb5, 0x69, 0xc1, 0xe1, 0x1d, 0x25, 0x76, 0x42, 0x41, - 0x58, 0xcc, 0xef, 0xbb, 0x2e, 0xab, 0x89, 0x08, 0x08, 0x0b, 0xbf, 0x83, 0xfa, 0x68, 0xb5, 0xf0, - 0x6e, 0x7d, 0x4a, 0x77, 0x7f, 0xc9, 0x0b, 0x68, 0xa4, 0x60, 0x82, 0x30, 0x82, 0xb2, 0xbd, 0x5a, - 0x78, 0x02, 0x8a, 0x9f, 0xf1, 0x0b, 0xa8, 0x0f, 0xe6, 0x73, 0x96, 0x16, 0xfc, 0x7b, 0x36, 0x2e, - 0xa1, 0xcc, 0x32, 0xb3, 0x93, 0x80, 0x4e, 0xe0, 0x80, 0xdd, 0x2d, 0x30, 0x4a, 0x9d, 0xfd, 0xae, - 0x3e, 0x8d, 0x0c, 0xfc, 0x1a, 0x1a, 0x29, 0xb4, 0xa0, 0x70, 0x0e, 0x07, 0x21, 0x73, 0x18, 0x5a, - 0x67, 0xbf, 0x5b, 0xbd, 0xaa, 0xf7, 0x12, 0x95, 0xf4, 0x38, 0xd5, 0x28, 0x8a, 0x11, 0x2f, 0xe5, - 0xb3, 0x1d, 0xd3, 0xc2, 0x7d, 0x68, 0x4a, 0x3e, 0x81, 0x67, 0x42, 0x45, 0x90, 0x8b, 0x20, 0xf5, - 0x69, 0x62, 0xe3, 0x4b, 0x68, 0x4c, 0x5c, 0x62, 0xcb, 0xba, 0x79, 0xe4, 0x6e, 0xc7, 0xd0, 0x94, - 0xb2, 0xc5, 0xd4, 0xbd, 0x87, 0x26, 0xa3, 0xf5, 0x81, 0xf8, 0x64, 0x11, 0xec, 0xfe, 0xa3, 0x5c, - 0x03, 0x92, 0xe1, 0xc4, 0x1d, 0xd2, 0x79, 0xd0, 0xe4, 0x79, 0x60, 0x7d, 0xa2, 0x89, 0x89, 0xdf, - 0x35, 0x36, 0xaf, 0xfe, 0x1c, 0x80, 0x3e, 0xe4, 0x0f, 0x37, 0xf0, 0x1c, 0x34, 0x81, 0xaa, 0xb4, - 0x48, 0x50, 0x5b, 0x7a, 0xd3, 0xfc, 0xda, 0x31, 0xad, 0x6d, 0x61, 0xc1, 0x66, 0x02, 0x55, 0x69, - 0xb3, 0x28, 0x68, 0xf9, 0x3d, 0xa4, 0xa0, 0x15, 0x2d, 0xa4, 0x4f, 0x50, 0x53, 0xb7, 0x09, 0xea, - 0xe4, 0xfa, 0x67, 0xd6, 0x87, 0xf9, 0xfc, 0x91, 0x8c, 0x14, 0x56, 0xdd, 0x13, 0x0a, 0x6c, 0xe1, - 0x56, 0x52, 0x60, 0x8b, 0x97, 0x0c, 0x7a, 0x03, 0x47, 0x42, 0xe4, 0xe8, 0xa9, 0x94, 0xad, 0x2e, - 0x1e, 0xd3, 0x2c, 0x0a, 0x09, 0x84, 0x21, 0x54, 0x62, 0xd9, 0x21, 0x39, 0x2f, 0x23, 0x6c, 0xf3, - 0x59, 0x61, 0x4c, 0x80, 0x8c, 0x01, 0xd2, 0x31, 0x41, 0x67, 0x19, 0x8d, 0x28, 0xc3, 0x68, 0xb6, - 0xb7, 0x44, 0x53, 0x3e, 0xb1, 0x06, 0x15, 0x3e, 0x19, 0xcd, 0x2b, 0x7c, 0x72, 0xa2, 0xbd, 0x06, - 0x3d, 0x51, 0x1e, 0xca, 0x64, 0x2a, 0x1a, 0x35, 0xcf, 0x8a, 0x83, 0x29, 0x4e, 0x22, 0x31, 0x05, - 0x27, 0x2b, 0x53, 0x05, 0x27, 0xa7, 0xca, 0xb7, 0x17, 0x3f, 0xd7, 0x96, 0xf6, 0xb0, 0xb6, 0xb4, - 0xdf, 0x6b, 0x4b, 0xfb, 0xb1, 0xb1, 0xf6, 0x1e, 0x36, 0xd6, 0xde, 0xaf, 0x8d, 0xb5, 0xf7, 0xf9, - 0x89, 0xfc, 0x9f, 0xfc, 0xe5, 0x90, 0x7f, 0xbd, 0xfa, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x82, 0x87, - 0xed, 0x38, 0xaa, 0x07, 0x00, 0x00, + // 700 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xcd, 0x4e, 0xdb, 0x40, + 0x10, 0xc6, 0x21, 0x04, 0x3c, 0xd0, 0x98, 0x2c, 0x90, 0xba, 0x2e, 0x58, 0xe9, 0x4a, 0xa0, 0x48, + 0x45, 0xa0, 0xd2, 0x53, 0x7b, 0xe2, 0x27, 0x42, 0x42, 0x4d, 0xa5, 0x36, 0x80, 0x2a, 0xf5, 0xb6, + 0x8d, 0x57, 0x8a, 0xa5, 0x24, 0x76, 0x6d, 0xa7, 0xe5, 0xd4, 0x67, 0xe8, 0x63, 0xf5, 0xc8, 0xb1, + 0xc7, 0x2a, 0x79, 0x8b, 0x9e, 0xaa, 0x5d, 0xaf, 0xed, 0x5d, 0xdb, 0x01, 0x55, 0xb9, 0x80, 0xe7, + 0xef, 0x9b, 0x6f, 0x77, 0x67, 0x3e, 0x05, 0xf6, 0x88, 0xef, 0x1e, 0x13, 0xdf, 0xf5, 0x03, 0x2f, + 0xf2, 0x8e, 0xf9, 0xdf, 0x90, 0xd9, 0x47, 0xfc, 0x13, 0xe9, 0xfd, 0xa1, 0x4b, 0xc7, 0x11, 0xf1, + 0x5d, 0xbc, 0x0d, 0xe8, 0x22, 0xa0, 0x24, 0xa2, 0xd7, 0x3e, 0xe9, 0xd3, 0x1e, 0xfd, 0x3a, 0xa1, + 0x61, 0x84, 0xf7, 0x61, 0x4b, 0xf1, 0x86, 0xbe, 0x37, 0x0e, 0x29, 0xaa, 0x43, 0xc5, 0x75, 0x4c, + 0xad, 0xa5, 0xb5, 0xf5, 0x5e, 0xc5, 0x75, 0x58, 0x71, 0x87, 0x06, 0xee, 0xb7, 0x42, 0xb1, 0xe2, + 0x9d, 0x53, 0xfc, 0x0a, 0x76, 0xe2, 0x1e, 0x1d, 0xaf, 0x3f, 0x19, 0xd1, 0x71, 0x24, 0xea, 0x91, + 0x09, 0xab, 0x21, 0xab, 0xbc, 0x4a, 0xb2, 0x13, 0x13, 0xb7, 0xa1, 0x99, 0x2f, 0x99, 0x03, 0xfe, + 0x11, 0x76, 0x3a, 0x74, 0x48, 0xff, 0x03, 0x1c, 0xd9, 0x00, 0x8e, 0x48, 0xbe, 0x72, 0xcc, 0x0a, + 0x0f, 0x4a, 0x1e, 0x6c, 0x42, 0x33, 0x0f, 0x19, 0x37, 0xc7, 0x3f, 0xa0, 0x7e, 0xe6, 0x38, 0x37, + 0xf4, 0x6e, 0xf1, 0x2e, 0x08, 0x41, 0x35, 0xa2, 0x77, 0x91, 0xb9, 0xcc, 0x23, 0xfc, 0x9b, 0xd5, + 0xb8, 0xe1, 0xf5, 0x98, 0xf8, 0xe1, 0xc0, 0x8b, 0xcc, 0x6a, 0x4b, 0x6b, 0xaf, 0xf5, 0x24, 0x0f, + 0x26, 0x60, 0xa4, 0xfd, 0xc5, 0x7d, 0xa8, 0x6d, 0xb4, 0x42, 0x9b, 0x26, 0xd4, 0x06, 0x94, 0x38, + 0x29, 0x05, 0x61, 0x31, 0x7f, 0xe0, 0x79, 0xac, 0x26, 0x26, 0x20, 0x2c, 0xfc, 0x0e, 0x8c, 0xce, + 0x64, 0xe4, 0xdf, 0x04, 0x94, 0x2e, 0x7e, 0x93, 0x07, 0xb0, 0x99, 0x81, 0x09, 0xc2, 0x08, 0xaa, + 0xce, 0x64, 0xe4, 0x0b, 0x28, 0xfe, 0x8d, 0x5f, 0x82, 0x71, 0x36, 0x1c, 0xb2, 0xb4, 0xf0, 0xf1, + 0xd9, 0x38, 0x84, 0x2a, 0xcb, 0xcc, 0x4f, 0x02, 0xda, 0x86, 0x15, 0x76, 0xb6, 0xd0, 0xac, 0xb4, + 0x96, 0xdb, 0x7a, 0x2f, 0x36, 0xf0, 0x1b, 0xd8, 0xcc, 0xa0, 0x05, 0x85, 0x7d, 0x58, 0x89, 0x98, + 0xc3, 0xd4, 0x5a, 0xcb, 0xed, 0xf5, 0x13, 0xe3, 0x28, 0xdd, 0x92, 0x23, 0x4e, 0x35, 0x8e, 0x62, + 0xc4, 0x4b, 0xf9, 0x6c, 0x27, 0xb4, 0xf0, 0x31, 0x34, 0x24, 0x9f, 0xc0, 0xb3, 0x60, 0x4d, 0x90, + 0x8b, 0x21, 0xf5, 0x5e, 0x6a, 0xe3, 0x43, 0xd8, 0xec, 0x7a, 0xc4, 0x91, 0xf7, 0xe6, 0x81, 0xb3, + 0x6d, 0x41, 0x43, 0xca, 0x16, 0x53, 0xf7, 0x1e, 0x1a, 0x8c, 0xd6, 0x07, 0x12, 0x90, 0x51, 0xb8, + 0xf8, 0xa3, 0x5c, 0x02, 0x92, 0xe1, 0xc4, 0x19, 0xb2, 0x79, 0xd0, 0xe4, 0x79, 0x60, 0x7d, 0xe2, + 0x89, 0x49, 0xee, 0x35, 0x31, 0xf1, 0x29, 0x6c, 0x7c, 0x22, 0x51, 0x7f, 0xf0, 0x38, 0xa3, 0x26, + 0xd4, 0xd8, 0x8d, 0x66, 0x33, 0x18, 0x5b, 0xd8, 0x80, 0x27, 0x02, 0x41, 0x9c, 0xf4, 0x1c, 0xea, + 0xb7, 0xe3, 0xef, 0x8b, 0x81, 0x36, 0xc0, 0x48, 0x31, 0x62, 0xd8, 0x93, 0xbf, 0x35, 0xd0, 0x2f, + 0xf8, 0x13, 0x9f, 0xf9, 0x2e, 0xea, 0xc2, 0xba, 0x24, 0x79, 0x68, 0x4f, 0x7a, 0xfd, 0xa2, 0x40, + 0x5a, 0xf6, 0xbc, 0xb0, 0xb8, 0xb7, 0x2e, 0xac, 0x4b, 0x1a, 0xa8, 0xa0, 0x15, 0x15, 0x53, 0x41, + 0x2b, 0x93, 0xce, 0x5b, 0xa8, 0xab, 0xba, 0x87, 0x5a, 0x85, 0xfe, 0x39, 0xa1, 0xb3, 0x5e, 0x3c, + 0x90, 0x91, 0xc1, 0xaa, 0x8a, 0xa6, 0xc0, 0x96, 0xea, 0xa7, 0x02, 0x5b, 0x2e, 0x87, 0xe8, 0x14, + 0x56, 0x85, 0x1c, 0xa1, 0x67, 0x52, 0xb6, 0x2a, 0x91, 0x96, 0x55, 0x16, 0x12, 0x08, 0x17, 0xb0, + 0x96, 0x08, 0x04, 0x92, 0xf3, 0x72, 0x12, 0x64, 0x3d, 0x2f, 0x8d, 0x09, 0x90, 0x2b, 0x80, 0x6c, + 0xa0, 0xd1, 0x6e, 0x6e, 0x9b, 0x95, 0xb5, 0xb1, 0xf6, 0xe6, 0x44, 0x33, 0x3e, 0x89, 0x5a, 0x28, + 0x7c, 0x72, 0xea, 0xa4, 0xf0, 0x29, 0xc8, 0xcb, 0x25, 0xe8, 0xa9, 0x46, 0xa0, 0x5c, 0xa6, 0xa2, + 0x26, 0xd6, 0x6e, 0x79, 0x30, 0xc3, 0x49, 0xc5, 0x40, 0xc1, 0xc9, 0x0b, 0x8a, 0x82, 0x53, 0xd0, + 0x0f, 0xf4, 0x16, 0x56, 0xf8, 0x9a, 0xa1, 0xa7, 0x52, 0x9a, 0xbc, 0xba, 0x96, 0x59, 0x0c, 0x64, + 0x4f, 0x2c, 0xb6, 0x49, 0x79, 0x62, 0x75, 0x4b, 0x95, 0x27, 0xce, 0x2d, 0xdf, 0xf9, 0xc1, 0xaf, + 0xa9, 0xad, 0xdd, 0x4f, 0x6d, 0xed, 0xcf, 0xd4, 0xd6, 0x7e, 0xce, 0xec, 0xa5, 0xfb, 0x99, 0xbd, + 0xf4, 0x7b, 0x66, 0x2f, 0x7d, 0xde, 0x90, 0x7f, 0xbb, 0x7c, 0xa9, 0xf1, 0x7f, 0xaf, 0xff, 0x05, + 0x00, 0x00, 0xff, 0xff, 0x7d, 0xa4, 0xb1, 0xae, 0xd2, 0x08, 0x00, 0x00, } func (m *CreateSpaceRequest) Marshal() (dAtA []byte, err error) { @@ -1737,6 +1921,126 @@ func (m *TreeParamsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *WatchRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WatchRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *WatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.TreeId) > 0 { + i -= len(m.TreeId) + copy(dAtA[i:], m.TreeId) + i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId))) + i-- + dAtA[i] = 0x12 + } + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *WatchResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WatchResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *WatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *UnwatchRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *UnwatchRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *UnwatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.TreeId) > 0 { + i -= len(m.TreeId) + copy(dAtA[i:], m.TreeId) + i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId))) + i-- + dAtA[i] = 0x12 + } + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *UnwatchResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *UnwatchResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *UnwatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func encodeVarintApi(dAtA []byte, offset int, v uint64) int { offset -= sovApi(v) base := offset @@ -2048,6 +2352,58 @@ func (m *TreeParamsResponse) Size() (n int) { return n } +func (m *WatchRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.TreeId) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + return n +} + +func (m *WatchResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *UnwatchRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + l = len(m.TreeId) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } + return n +} + +func (m *UnwatchResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovApi(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -3926,6 +4282,334 @@ func (m *TreeParamsResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *WatchRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WatchRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WatchRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TreeId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WatchResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WatchResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WatchResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UnwatchRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UnwatchRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UnwatchRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TreeId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UnwatchResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UnwatchResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UnwatchResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipApi(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/client/api/apiproto/api_drpc.pb.go b/client/api/apiproto/api_drpc.pb.go index adfe74ac..6bf49f0c 100644 --- a/client/api/apiproto/api_drpc.pb.go +++ b/client/api/apiproto/api_drpc.pb.go @@ -50,6 +50,8 @@ type DRPCClientApiClient interface { AllTrees(ctx context.Context, in *AllTreesRequest) (*AllTreesResponse, error) AllSpaces(ctx context.Context, in *AllSpacesRequest) (*AllSpacesResponse, error) LoadSpace(ctx context.Context, in *LoadSpaceRequest) (*LoadSpaceResponse, error) + Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error) + Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error) } type drpcClientApiClient struct { @@ -152,6 +154,24 @@ func (c *drpcClientApiClient) LoadSpace(ctx context.Context, in *LoadSpaceReques return out, nil } +func (c *drpcClientApiClient) Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error) { + out := new(WatchResponse) + err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *drpcClientApiClient) Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error) { + out := new(UnwatchResponse) + err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) + if err != nil { + return nil, err + } + return out, nil +} + type DRPCClientApiServer interface { CreateSpace(context.Context, *CreateSpaceRequest) (*CreateSpaceResponse, error) DeriveSpace(context.Context, *DeriveSpaceRequest) (*DeriveSpaceResponse, error) @@ -163,6 +183,8 @@ type DRPCClientApiServer interface { AllTrees(context.Context, *AllTreesRequest) (*AllTreesResponse, error) AllSpaces(context.Context, *AllSpacesRequest) (*AllSpacesResponse, error) LoadSpace(context.Context, *LoadSpaceRequest) (*LoadSpaceResponse, error) + Watch(context.Context, *WatchRequest) (*WatchResponse, error) + Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error) } type DRPCClientApiUnimplementedServer struct{} @@ -207,9 +229,17 @@ func (s *DRPCClientApiUnimplementedServer) LoadSpace(context.Context, *LoadSpace return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } +func (s *DRPCClientApiUnimplementedServer) Watch(context.Context, *WatchRequest) (*WatchResponse, error) { + return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + +func (s *DRPCClientApiUnimplementedServer) Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error) { + return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + type DRPCClientApiDescription struct{} -func (DRPCClientApiDescription) NumMethods() int { return 10 } +func (DRPCClientApiDescription) NumMethods() int { return 12 } func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { switch n { @@ -303,6 +333,24 @@ func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Recei in1.(*LoadSpaceRequest), ) }, DRPCClientApiServer.LoadSpace, true + case 10: + return "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCClientApiServer). + Watch( + ctx, + in1.(*WatchRequest), + ) + }, DRPCClientApiServer.Watch, true + case 11: + return "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCClientApiServer). + Unwatch( + ctx, + in1.(*UnwatchRequest), + ) + }, DRPCClientApiServer.Unwatch, true default: return "", nil, nil, nil, false } @@ -471,3 +519,35 @@ func (x *drpcClientApi_LoadSpaceStream) SendAndClose(m *LoadSpaceResponse) error } return x.CloseSend() } + +type DRPCClientApi_WatchStream interface { + drpc.Stream + SendAndClose(*WatchResponse) error +} + +type drpcClientApi_WatchStream struct { + drpc.Stream +} + +func (x *drpcClientApi_WatchStream) SendAndClose(m *WatchResponse) error { + if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil { + return err + } + return x.CloseSend() +} + +type DRPCClientApi_UnwatchStream interface { + drpc.Stream + SendAndClose(*UnwatchResponse) error +} + +type drpcClientApi_UnwatchStream struct { + drpc.Stream +} + +func (x *drpcClientApi_UnwatchStream) SendAndClose(m *UnwatchResponse) error { + if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil { + return err + } + return x.CloseSend() +} diff --git a/client/api/apiproto/protos/api.proto b/client/api/apiproto/protos/api.proto index 0c5b7256..2eff85b8 100644 --- a/client/api/apiproto/protos/api.proto +++ b/client/api/apiproto/protos/api.proto @@ -14,6 +14,8 @@ service ClientApi { rpc AllTrees(AllTreesRequest) returns(AllTreesResponse); rpc AllSpaces(AllSpacesRequest) returns(AllSpacesResponse); rpc LoadSpace(LoadSpaceRequest) returns(LoadSpaceResponse); + rpc Watch(WatchRequest) returns(WatchResponse); + rpc Unwatch(UnwatchRequest) returns(UnwatchResponse); } message CreateSpaceRequest { @@ -103,4 +105,20 @@ message TreeParamsRequest { message TreeParamsResponse { string rootId = 1; repeated string headIds = 2; +} + +message WatchRequest { + string spaceId = 1; + string treeId = 2; +} + +message WatchResponse { +} + +message UnwatchRequest { + string spaceId = 1; + string treeId = 2; +} + +message UnwatchResponse { } \ No newline at end of file diff --git a/client/api/rpchandler.go b/client/api/rpchandler.go index e1c85c2a..3c3edcba 100644 --- a/client/api/rpchandler.go +++ b/client/api/rpchandler.go @@ -10,6 +10,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" "math/rand" + "sync" ) type rpcHandler struct { @@ -17,6 +18,49 @@ type rpcHandler struct { storageService storage.ClientStorage docService document.Service account account.Service + treeWatcher *watcher + sync.Mutex +} + +func (r *rpcHandler) Watch(ctx context.Context, request *apiproto.WatchRequest) (resp *apiproto.WatchResponse, err error) { + space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId) + if err != nil { + return + } + r.Lock() + defer r.Unlock() + + ch := make(chan bool) + r.treeWatcher = newWatcher(request.SpaceId, request.TreeId, ch) + space.StatusService().Watch(request.TreeId, ch) + go r.treeWatcher.run() + resp = &apiproto.WatchResponse{} + return +} + +func (r *rpcHandler) Unwatch(ctx context.Context, request *apiproto.UnwatchRequest) (resp *apiproto.UnwatchResponse, err error) { + space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId) + if err != nil { + return + } + var treeWatcher *watcher + space.StatusService().Unwatch(request.TreeId) + + r.Lock() + if r.treeWatcher != nil { + treeWatcher = r.treeWatcher + } + r.Unlock() + + treeWatcher.close() + + r.Lock() + if r.treeWatcher == treeWatcher { + r.treeWatcher = nil + } + r.Unlock() + resp = &apiproto.UnwatchResponse{} + return } func (r *rpcHandler) LoadSpace(ctx context.Context, request *apiproto.LoadSpaceRequest) (resp *apiproto.LoadSpaceResponse, err error) { diff --git a/client/api/service.go b/client/api/service.go index bd606651..8e7ea5c9 100644 --- a/client/api/service.go +++ b/client/api/service.go @@ -67,7 +67,12 @@ func (s *service) Run(ctx context.Context) (err error) { if err != nil { return } - return apiproto.DRPCRegisterClientApi(s, &rpcHandler{s.spaceService, s.storageService, s.docService, s.account}) + return apiproto.DRPCRegisterClientApi(s, &rpcHandler{ + spaceService: s.spaceService, + storageService: s.storageService, + docService: s.docService, + account: s.account, + }) } func (s *service) Close(ctx context.Context) (err error) { diff --git a/client/api/watcher.go b/client/api/watcher.go new file mode 100644 index 00000000..e7d13a9b --- /dev/null +++ b/client/api/watcher.go @@ -0,0 +1,37 @@ +package api + +import "go.uber.org/zap" + +type watcher struct { + spaceId string + treeId string + watcher chan bool + watcherDone chan struct{} +} + +func newWatcher(spaceId, treeId string, ch chan bool) *watcher { + return &watcher{ + spaceId: spaceId, + treeId: treeId, + watcher: ch, + watcherDone: make(chan struct{}), + } +} + +func (w *watcher) run() { + log := log.With(zap.String("spaceId", w.spaceId), zap.String("treeId", w.treeId)) + log.Debug("started watching") + defer close(w.watcherDone) + for { + synced, ok := <-w.watcher + if !ok { + log.Debug("stopped watching") + return + } + log.With(zap.Bool("synced", synced)).Debug("updated sync status") + } +} + +func (w *watcher) close() { + <-w.watcherDone +} diff --git a/client/go.mod b/client/go.mod index 8aeae7af..44453872 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,14 +8,19 @@ require ( github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000 github.com/dgraph-io/badger/v3 v3.2103.3 github.com/gogo/protobuf v1.3.2 + github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.23.0 + storj.io/drpc v0.0.32 ) require ( + github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 // indirect github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -34,7 +39,6 @@ require ( github.com/klauspost/cpuid/v2 v2.1.1 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-libp2p v0.23.2 // indirect - github.com/libp2p/go-libp2p-core v0.20.1 // indirect github.com/libp2p/go-openssl v0.1.0 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-pointer v0.0.1 // indirect @@ -49,6 +53,7 @@ require ( github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-varint v0.0.6 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -60,12 +65,12 @@ require ( go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5 // indirect golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect - storj.io/drpc v0.0.32 // indirect ) diff --git a/client/go.sum b/client/go.sum index b91db95f..cf6293a0 100644 --- a/client/go.sum +++ b/client/go.sum @@ -40,6 +40,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 h1:yIyGIb7bRkEngKtQ0Ja5bome2SEnErwTaEvR8dA/WtU= +github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3/go.mod h1:w0i62cRB2jVpjFb2CpPNj5J+ihKqqmBBG9X2+Odekjw= github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 h1:kMPPZYmJgbs4AJfodbg2OCXg5cp+9LPAJcLZJqmcghk= github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232/go.mod h1:+PeHBAWp7gUh/yw6uAauKc5ku0w4cFNg6DUddGxoGq0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -56,6 +58,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c h1:+bD75daSbsxyTzkKpNplC4xls+7/tGwty+zruzOnOmk= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -214,8 +218,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE= github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI= -github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= -github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -305,12 +307,15 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -353,8 +358,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -365,6 +370,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg= diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 6b279140..30d2eff2 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" @@ -108,12 +109,20 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) + + var statusService statusservice.StatusService + // this will work only for clients, not the best solution, but... + if !lastConfiguration.IsResponsible(st.Id()) { + statusService = statusservice.NewStatusService(st.Id(), lastConfiguration) + } + diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log) syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod) sp := &space{ id: id, syncService: syncService, diffService: diffService, + statusService: statusService, cache: s.treeGetter, account: s.account, configuration: lastConfiguration, diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 9a656ee7..fdf6e633 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -9,6 +9,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" @@ -80,6 +81,8 @@ type Space interface { BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (tree.ObjectTree, error) DeleteTree(ctx context.Context, id string) (err error) + StatusService() statusservice.StatusService + Close() error } @@ -92,6 +95,7 @@ type space struct { syncService syncservice.SyncService diffService diffservice.DiffService + statusService statusservice.StatusService storage storage.SpaceStorage cache treegetter.TreeGetter account account.Service @@ -202,6 +206,10 @@ func (s *space) DiffService() diffservice.DiffService { return s.diffService } +func (s *space) StatusService() statusservice.StatusService { + return s.statusService +} + func (s *space) StoredIds() []string { return s.diffService.AllIds() } @@ -222,6 +230,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, + StatusService: s.statusService, } return synctree.DeriveSyncTree(ctx, deps) } @@ -238,6 +247,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, + StatusService: s.statusService, } return synctree.CreateSyncTree(ctx, deps) } @@ -256,6 +266,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene AclList: s.aclList, SpaceStorage: s.storage, TreeUsage: &s.treesUsed, + StatusService: s.statusService, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } @@ -286,5 +297,10 @@ func (s *space) Close() error { if err := s.storage.Close(); err != nil { mError.Add(err) } + if s.statusService != nil { + if err := s.statusService.Close(); err != nil { + mError.Add(err) + } + } return mError.Err() } diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go index 59e73637..02f803d8 100644 --- a/common/commonspace/statusservice/statusservice.go +++ b/common/commonspace/statusservice/statusservice.go @@ -17,6 +17,7 @@ type StatusService interface { Unwatch(treeId string) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) + Close() error } type statusEntry struct { @@ -31,6 +32,7 @@ type statusService struct { watchers map[string]chan bool configuration nodeconf.Configuration stateCounter uint64 + closed bool } func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService { @@ -46,6 +48,10 @@ func NewStatusService(spaceId string, configuration nodeconf.Configuration) Stat func (s *statusService) HeadsChange(treeId string, heads []string) { s.Lock() defer s.Unlock() + if s.closed { + return + } + s.treeHeads[treeId] = statusEntry{ head: heads[0], stateCounter: s.stateCounter, @@ -62,6 +68,10 @@ func (s *statusService) HeadsChange(treeId string, heads []string) { func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { s.Lock() defer s.Unlock() + if s.closed { + return + } + curHead, ok := s.treeHeads[treeId] if !ok { return @@ -84,13 +94,22 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { func (s *statusService) Watch(treeId string, ch chan bool) { s.Lock() defer s.Unlock() + if s.closed { + return + } s.watchers[treeId] = ch } func (s *statusService) Unwatch(treeId string) { s.Lock() defer s.Unlock() - delete(s.watchers, treeId) + if s.closed { + return + } + if ch, ok := s.watchers[treeId]; ok { + close(ch) + delete(s.watchers, treeId) + } } func (s *statusService) StateCounter() uint64 { @@ -99,6 +118,18 @@ func (s *statusService) StateCounter() uint64 { return s.stateCounter } +func (s *statusService) Close() (err error) { + s.Lock() + defer s.Unlock() + if s.closed { + return + } + for _, ch := range s.watchers { + close(ch) + } + return +} + func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { return diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index ddbdd19e..2541f318 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -63,6 +63,7 @@ type CreateDeps struct { SyncService syncservice.SyncService AclList list.ACLList SpaceStorage spacestorage.SpaceStorage + StatusService statusservice.StatusService } type BuildDeps struct { @@ -75,9 +76,14 @@ type BuildDeps struct { SpaceStorage spacestorage.SpaceStorage TreeStorage storage.TreeStorage TreeUsage *atomic.Int32 + StatusService statusservice.StatusService } -func newWrappedSyncClient(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient { +func newWrappedSyncClient( + spaceId string, + factory RequestFactory, + syncService syncservice.SyncService, + configuration nodeconf.Configuration) SyncClient { syncClient := newSyncClient(spaceId, syncService.StreamPool(), factory, configuration, syncService.StreamChecker()) return newQueuedClient(syncClient, syncService.ActionQueue()) } @@ -95,6 +101,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -112,6 +119,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -197,13 +205,14 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy deps.SyncService, deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - treeUsage: deps.TreeUsage, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + treeUsage: deps.TreeUsage, + listener: deps.Listener, + statusService: deps.StatusService, } - syncHandler := newSyncTreeHandler(syncTree, syncClient) + syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.StatusService) syncTree.SyncHandler = syncHandler t = syncTree syncTree.Lock() diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 96b6bdcb..7994e1f1 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -23,11 +23,12 @@ type syncTreeHandler struct { const maxQueueSize = 5 -func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler { +func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient, statusService statusservice.StatusService) synchandler.SyncHandler { return &syncTreeHandler{ - objTree: objTree, - syncClient: syncClient, - queue: newReceiveQueue(maxQueueSize), + objTree: objTree, + syncClient: syncClient, + statusService: statusService, + queue: newReceiveQueue(maxQueueSize), } } diff --git a/common/go.mod b/common/go.mod index f8522376..59a109c7 100644 --- a/common/go.mod +++ b/common/go.mod @@ -57,6 +57,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/common/go.sum b/common/go.sum index 3359eaee..bfe4ab84 100644 --- a/common/go.sum +++ b/common/go.sum @@ -326,6 +326,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg= diff --git a/util/cmd/debug/commands/client/service.go b/util/cmd/debug/commands/client/service.go index e5817b0e..778f4c5e 100644 --- a/util/cmd/debug/commands/client/service.go +++ b/util/cmd/debug/commands/client/service.go @@ -24,6 +24,8 @@ type Service interface { AllTrees(ctx context.Context, ip string, request *apiproto.AllTreesRequest) (resp *apiproto.AllTreesResponse, err error) AllSpaces(ctx context.Context, ip string, request *apiproto.AllSpacesRequest) (resp *apiproto.AllSpacesResponse, err error) LoadSpace(ctx context.Context, ip string, request *apiproto.LoadSpaceRequest) (res *apiproto.LoadSpaceResponse, err error) + Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error) + Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error) } type service struct { @@ -122,3 +124,19 @@ func (s *service) LoadSpace(ctx context.Context, ip string, request *apiproto.Lo } return cl.LoadSpace(ctx, request) } + +func (s *service) Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error) { + cl, err := s.client.GetClient(ctx, ip) + if err != nil { + return + } + return cl.Watch(ctx, request) +} + +func (s *service) Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error) { + cl, err := s.client.GetClient(ctx, ip) + if err != nil { + return + } + return cl.Unwatch(ctx, request) +} diff --git a/util/cmd/debug/commands/clientcmds.go b/util/cmd/debug/commands/clientcmds.go index 88791ea4..d7bf156b 100644 --- a/util/cmd/debug/commands/clientcmds.go +++ b/util/cmd/debug/commands/clientcmds.go @@ -290,4 +290,60 @@ func (s *service) registerClientCommands() { }, } s.clientCommands = append(s.clientCommands, cmdAllSpaces) + + cmdTreeWatch := &cobra.Command{ + Use: "tree-watch [document]", + Short: "start watching the tree (prints in logs the status on the client side)", + Args: cobra.RangeArgs(1, 1), + Run: func(cmd *cobra.Command, args []string) { + cli, _ := cmd.Flags().GetString("client") + space, _ := cmd.Flags().GetString("space") + addr, ok := s.peers[cli] + if !ok { + fmt.Println("no such client") + return + } + + _, err := s.client.Watch(context.Background(), addr, &clientproto.WatchRequest{ + SpaceId: space, + TreeId: args[0], + }) + if err != nil { + fmt.Println("couldn't start watching tree", err) + return + } + fmt.Println(args[0]) + }, + } + cmdTreeWatch.Flags().String("space", "", "the space where something is happening :-)") + cmdTreeWatch.MarkFlagRequired("space") + s.clientCommands = append(s.clientCommands, cmdTreeWatch) + + cmdTreeUnwatch := &cobra.Command{ + Use: "tree-unwatch [document]", + Short: "stop watching the tree (prints in logs the status on the client side)", + Args: cobra.RangeArgs(1, 1), + Run: func(cmd *cobra.Command, args []string) { + cli, _ := cmd.Flags().GetString("client") + space, _ := cmd.Flags().GetString("space") + addr, ok := s.peers[cli] + if !ok { + fmt.Println("no such client") + return + } + + _, err := s.client.Unwatch(context.Background(), addr, &clientproto.UnwatchRequest{ + SpaceId: space, + TreeId: args[0], + }) + if err != nil { + fmt.Println("couldn't stop watching tree", err) + return + } + fmt.Println(args[0]) + }, + } + cmdTreeUnwatch.Flags().String("space", "", "the space where something is happening :-)") + cmdTreeUnwatch.MarkFlagRequired("space") + s.clientCommands = append(s.clientCommands, cmdTreeUnwatch) }