From b59e2c259364970466d25ce944bbea615dd725b4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 17 Dec 2022 21:48:48 +0100 Subject: [PATCH] Added simple watcher and statusservice upgrades --- client/api/apiproto/api.pb.go | 842 +++++------------- client/api/apiproto/api_drpc.pb.go | 98 +- client/api/apiproto/protos/api.proto | 31 +- client/api/rpchandler.go | 97 +- client/api/service.go | 10 +- client/api/watcher.go | 37 + client/go.mod | 67 +- client/go.sum | 15 +- common/commonspace/service.go | 9 + common/commonspace/space.go | 16 + .../statusservice/statusservice.go | 33 +- common/commonspace/synctree/synctree.go | 23 +- .../commonspace/synctree/synctreehandler.go | 9 +- common/go.mod | 1 + common/go.sum | 2 + util/cmd/debug/commands/client/service.go | 21 +- util/cmd/debug/commands/clientcmds.go | 66 +- 17 files changed, 549 insertions(+), 828 deletions(-) create mode 100644 client/api/watcher.go diff --git a/client/api/apiproto/api.pb.go b/client/api/apiproto/api.pb.go index 7c11b5fa..cb7865a0 100644 --- a/client/api/apiproto/api.pb.go +++ b/client/api/apiproto/api.pb.go @@ -986,23 +986,23 @@ func (m *TreeParamsResponse) GetHeadIds() []string { return nil } -type PutFileRequest struct { - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - SpaceId string `protobuf:"bytes,2,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +type WatchRequest struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"` } -func (m *PutFileRequest) Reset() { *m = PutFileRequest{} } -func (m *PutFileRequest) String() string { return proto.CompactTextString(m) } -func (*PutFileRequest) ProtoMessage() {} -func (*PutFileRequest) Descriptor() ([]byte, []int) { +func (m *WatchRequest) Reset() { *m = WatchRequest{} } +func (m *WatchRequest) String() string { return proto.CompactTextString(m) } +func (*WatchRequest) ProtoMessage() {} +func (*WatchRequest) Descriptor() ([]byte, []int) { return fileDescriptor_fc31080c27db9707, []int{21} } -func (m *PutFileRequest) XXX_Unmarshal(b []byte) error { +func (m *WatchRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *PutFileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_PutFileRequest.Marshal(b, m, deterministic) + return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -1012,48 +1012,47 @@ func (m *PutFileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro return b[:n], nil } } -func (m *PutFileRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_PutFileRequest.Merge(m, src) +func (m *WatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchRequest.Merge(m, src) } -func (m *PutFileRequest) XXX_Size() int { +func (m *WatchRequest) XXX_Size() int { return m.Size() } -func (m *PutFileRequest) XXX_DiscardUnknown() { - xxx_messageInfo_PutFileRequest.DiscardUnknown(m) +func (m *WatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WatchRequest.DiscardUnknown(m) } -var xxx_messageInfo_PutFileRequest proto.InternalMessageInfo +var xxx_messageInfo_WatchRequest proto.InternalMessageInfo -func (m *PutFileRequest) GetPath() string { - if m != nil { - return m.Path - } - return "" -} - -func (m *PutFileRequest) GetSpaceId() string { +func (m *WatchRequest) GetSpaceId() string { if m != nil { return m.SpaceId } return "" } -type PutFileResponse struct { - Hash string `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` +func (m *WatchRequest) GetTreeId() string { + if m != nil { + return m.TreeId + } + return "" } -func (m *PutFileResponse) Reset() { *m = PutFileResponse{} } -func (m *PutFileResponse) String() string { return proto.CompactTextString(m) } -func (*PutFileResponse) ProtoMessage() {} -func (*PutFileResponse) Descriptor() ([]byte, []int) { +type WatchResponse struct { +} + +func (m *WatchResponse) Reset() { *m = WatchResponse{} } +func (m *WatchResponse) String() string { return proto.CompactTextString(m) } +func (*WatchResponse) ProtoMessage() {} +func (*WatchResponse) Descriptor() ([]byte, []int) { return fileDescriptor_fc31080c27db9707, []int{22} } -func (m *PutFileResponse) XXX_Unmarshal(b []byte) error { +func (m *WatchResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *PutFileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *WatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_PutFileResponse.Marshal(b, m, deterministic) + return xxx_messageInfo_WatchResponse.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -1063,42 +1062,35 @@ func (m *PutFileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } -func (m *PutFileResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_PutFileResponse.Merge(m, src) +func (m *WatchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchResponse.Merge(m, src) } -func (m *PutFileResponse) XXX_Size() int { +func (m *WatchResponse) XXX_Size() int { return m.Size() } -func (m *PutFileResponse) XXX_DiscardUnknown() { - xxx_messageInfo_PutFileResponse.DiscardUnknown(m) +func (m *WatchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WatchResponse.DiscardUnknown(m) } -var xxx_messageInfo_PutFileResponse proto.InternalMessageInfo +var xxx_messageInfo_WatchResponse proto.InternalMessageInfo -func (m *PutFileResponse) GetHash() string { - if m != nil { - return m.Hash - } - return "" +type UnwatchRequest struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + TreeId string `protobuf:"bytes,2,opt,name=treeId,proto3" json:"treeId,omitempty"` } -type GetFileRequest struct { - Hash string `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` - Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` -} - -func (m *GetFileRequest) Reset() { *m = GetFileRequest{} } -func (m *GetFileRequest) String() string { return proto.CompactTextString(m) } -func (*GetFileRequest) ProtoMessage() {} -func (*GetFileRequest) Descriptor() ([]byte, []int) { +func (m *UnwatchRequest) Reset() { *m = UnwatchRequest{} } +func (m *UnwatchRequest) String() string { return proto.CompactTextString(m) } +func (*UnwatchRequest) ProtoMessage() {} +func (*UnwatchRequest) Descriptor() ([]byte, []int) { return fileDescriptor_fc31080c27db9707, []int{23} } -func (m *GetFileRequest) XXX_Unmarshal(b []byte) error { +func (m *UnwatchRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *GetFileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *UnwatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_GetFileRequest.Marshal(b, m, deterministic) + return xxx_messageInfo_UnwatchRequest.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -1108,48 +1100,47 @@ func (m *GetFileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro return b[:n], nil } } -func (m *GetFileRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetFileRequest.Merge(m, src) +func (m *UnwatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnwatchRequest.Merge(m, src) } -func (m *GetFileRequest) XXX_Size() int { +func (m *UnwatchRequest) XXX_Size() int { return m.Size() } -func (m *GetFileRequest) XXX_DiscardUnknown() { - xxx_messageInfo_GetFileRequest.DiscardUnknown(m) +func (m *UnwatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnwatchRequest.DiscardUnknown(m) } -var xxx_messageInfo_GetFileRequest proto.InternalMessageInfo +var xxx_messageInfo_UnwatchRequest proto.InternalMessageInfo -func (m *GetFileRequest) GetHash() string { +func (m *UnwatchRequest) GetSpaceId() string { if m != nil { - return m.Hash + return m.SpaceId } return "" } -func (m *GetFileRequest) GetPath() string { +func (m *UnwatchRequest) GetTreeId() string { if m != nil { - return m.Path + return m.TreeId } return "" } -type GetFileResponse struct { - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` +type UnwatchResponse struct { } -func (m *GetFileResponse) Reset() { *m = GetFileResponse{} } -func (m *GetFileResponse) String() string { return proto.CompactTextString(m) } -func (*GetFileResponse) ProtoMessage() {} -func (*GetFileResponse) Descriptor() ([]byte, []int) { +func (m *UnwatchResponse) Reset() { *m = UnwatchResponse{} } +func (m *UnwatchResponse) String() string { return proto.CompactTextString(m) } +func (*UnwatchResponse) ProtoMessage() {} +func (*UnwatchResponse) Descriptor() ([]byte, []int) { return fileDescriptor_fc31080c27db9707, []int{24} } -func (m *GetFileResponse) XXX_Unmarshal(b []byte) error { +func (m *UnwatchResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *GetFileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *UnwatchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_GetFileResponse.Marshal(b, m, deterministic) + return xxx_messageInfo_UnwatchResponse.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -1159,104 +1150,17 @@ func (m *GetFileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } -func (m *GetFileResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetFileResponse.Merge(m, src) +func (m *UnwatchResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnwatchResponse.Merge(m, src) } -func (m *GetFileResponse) XXX_Size() int { +func (m *UnwatchResponse) XXX_Size() int { return m.Size() } -func (m *GetFileResponse) XXX_DiscardUnknown() { - xxx_messageInfo_GetFileResponse.DiscardUnknown(m) +func (m *UnwatchResponse) XXX_DiscardUnknown() { + xxx_messageInfo_UnwatchResponse.DiscardUnknown(m) } -var xxx_messageInfo_GetFileResponse proto.InternalMessageInfo - -func (m *GetFileResponse) GetPath() string { - if m != nil { - return m.Path - } - return "" -} - -type DeleteFileRequest struct { - Hash string `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` -} - -func (m *DeleteFileRequest) Reset() { *m = DeleteFileRequest{} } -func (m *DeleteFileRequest) String() string { return proto.CompactTextString(m) } -func (*DeleteFileRequest) ProtoMessage() {} -func (*DeleteFileRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_fc31080c27db9707, []int{25} -} -func (m *DeleteFileRequest) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *DeleteFileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_DeleteFileRequest.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *DeleteFileRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_DeleteFileRequest.Merge(m, src) -} -func (m *DeleteFileRequest) XXX_Size() int { - return m.Size() -} -func (m *DeleteFileRequest) XXX_DiscardUnknown() { - xxx_messageInfo_DeleteFileRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_DeleteFileRequest proto.InternalMessageInfo - -func (m *DeleteFileRequest) GetHash() string { - if m != nil { - return m.Hash - } - return "" -} - -type DeleteFileResponse struct { -} - -func (m *DeleteFileResponse) Reset() { *m = DeleteFileResponse{} } -func (m *DeleteFileResponse) String() string { return proto.CompactTextString(m) } -func (*DeleteFileResponse) ProtoMessage() {} -func (*DeleteFileResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_fc31080c27db9707, []int{26} -} -func (m *DeleteFileResponse) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *DeleteFileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_DeleteFileResponse.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *DeleteFileResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_DeleteFileResponse.Merge(m, src) -} -func (m *DeleteFileResponse) XXX_Size() int { - return m.Size() -} -func (m *DeleteFileResponse) XXX_DiscardUnknown() { - xxx_messageInfo_DeleteFileResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_DeleteFileResponse proto.InternalMessageInfo +var xxx_messageInfo_UnwatchResponse proto.InternalMessageInfo func init() { proto.RegisterType((*CreateSpaceRequest)(nil), "clientapi.CreateSpaceRequest") @@ -1280,65 +1184,60 @@ func init() { proto.RegisterType((*LoadSpaceResponse)(nil), "clientapi.LoadSpaceResponse") proto.RegisterType((*TreeParamsRequest)(nil), "clientapi.TreeParamsRequest") proto.RegisterType((*TreeParamsResponse)(nil), "clientapi.TreeParamsResponse") - proto.RegisterType((*PutFileRequest)(nil), "clientapi.PutFileRequest") - proto.RegisterType((*PutFileResponse)(nil), "clientapi.PutFileResponse") - proto.RegisterType((*GetFileRequest)(nil), "clientapi.GetFileRequest") - proto.RegisterType((*GetFileResponse)(nil), "clientapi.GetFileResponse") - proto.RegisterType((*DeleteFileRequest)(nil), "clientapi.DeleteFileRequest") - proto.RegisterType((*DeleteFileResponse)(nil), "clientapi.DeleteFileResponse") + proto.RegisterType((*WatchRequest)(nil), "clientapi.WatchRequest") + proto.RegisterType((*WatchResponse)(nil), "clientapi.WatchResponse") + proto.RegisterType((*UnwatchRequest)(nil), "clientapi.UnwatchRequest") + proto.RegisterType((*UnwatchResponse)(nil), "clientapi.UnwatchResponse") } func init() { proto.RegisterFile("api/apiproto/protos/api.proto", fileDescriptor_fc31080c27db9707) } var fileDescriptor_fc31080c27db9707 = []byte{ - // 746 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4f, 0x4f, 0xdb, 0x4e, - 0x10, 0xc5, 0x21, 0x10, 0x32, 0xfc, 0x94, 0x90, 0xe5, 0x8f, 0xfc, 0x73, 0x89, 0x95, 0xae, 0x04, - 0x8d, 0x54, 0x04, 0x2a, 0xbd, 0xb4, 0x97, 0x0a, 0x4a, 0x44, 0x15, 0x95, 0x4a, 0x34, 0xd0, 0x4b, - 0x6f, 0x5b, 0xbc, 0x52, 0x2c, 0x25, 0xb1, 0x1b, 0x3b, 0x15, 0xa7, 0x7e, 0x86, 0x7e, 0xac, 0x4a, - 0xbd, 0x70, 0xec, 0xb1, 0x82, 0x2f, 0x52, 0xad, 0xbd, 0xf6, 0xee, 0xac, 0x1d, 0x50, 0xc5, 0x05, - 0x76, 0x77, 0x66, 0xde, 0x7b, 0xeb, 0x9d, 0x79, 0x0a, 0xb4, 0x59, 0xe8, 0x1f, 0xb0, 0xd0, 0x0f, - 0xa7, 0x41, 0x1c, 0x1c, 0x24, 0x7f, 0x23, 0xb1, 0xdf, 0x4f, 0x96, 0xa4, 0x7e, 0x35, 0xf2, 0xf9, - 0x24, 0x66, 0xa1, 0x4f, 0x37, 0x80, 0x9c, 0x4c, 0x39, 0x8b, 0xf9, 0x45, 0xc8, 0xae, 0xf8, 0x80, - 0x7f, 0x9d, 0xf1, 0x28, 0xa6, 0x3b, 0xb0, 0x8e, 0x4e, 0xa3, 0x30, 0x98, 0x44, 0x9c, 0x34, 0xa0, - 0xe2, 0x7b, 0xb6, 0xd5, 0xb1, 0xba, 0xf5, 0x41, 0xc5, 0xf7, 0x44, 0x71, 0x8f, 0x4f, 0xfd, 0x6f, - 0x85, 0x62, 0x74, 0x3a, 0xa7, 0xf8, 0x05, 0x6c, 0xa6, 0x1c, 0xbd, 0xe0, 0x6a, 0x36, 0xe6, 0x93, - 0x58, 0xd6, 0x13, 0x1b, 0x6a, 0x91, 0xa8, 0xec, 0x67, 0xd9, 0xd9, 0x96, 0x76, 0x61, 0xcb, 0x2c, - 0x99, 0x03, 0xfe, 0x11, 0x36, 0x7b, 0x7c, 0xc4, 0xff, 0x01, 0x9c, 0xb8, 0x00, 0x9e, 0x4c, 0xee, - 0x7b, 0x76, 0x25, 0x09, 0x6a, 0x27, 0xd4, 0x86, 0x2d, 0x13, 0x32, 0x25, 0xa7, 0xdf, 0xa1, 0x71, - 0xec, 0x79, 0x97, 0xfc, 0xfa, 0xf1, 0x2c, 0x84, 0x40, 0x35, 0xe6, 0xd7, 0xb1, 0xbd, 0x98, 0x44, - 0x92, 0xb5, 0xa8, 0xf1, 0xa3, 0x8b, 0x09, 0x0b, 0xa3, 0x61, 0x10, 0xdb, 0xd5, 0x8e, 0xd5, 0x5d, - 0x19, 0x68, 0x27, 0x94, 0x41, 0x33, 0xe7, 0x97, 0xdf, 0x03, 0xd3, 0x58, 0x05, 0x9a, 0x2d, 0x58, - 0x1e, 0x72, 0xe6, 0xe5, 0x12, 0xe4, 0x4e, 0x9c, 0x4f, 0x83, 0x40, 0xd4, 0xa4, 0x02, 0xe4, 0x8e, - 0xbe, 0x87, 0x66, 0x6f, 0x36, 0x0e, 0x2f, 0xa7, 0x9c, 0x3f, 0xfe, 0x4b, 0xee, 0xc2, 0x9a, 0x02, - 0x93, 0x82, 0x09, 0x54, 0xbd, 0xd9, 0x38, 0x94, 0x50, 0xc9, 0x9a, 0x3e, 0x87, 0xe6, 0xf1, 0x68, - 0x24, 0xd2, 0xa2, 0x87, 0x7b, 0x63, 0x0f, 0xaa, 0x22, 0xd3, 0xec, 0x04, 0xb2, 0x01, 0x4b, 0xe2, - 0x6e, 0x91, 0x5d, 0xe9, 0x2c, 0x76, 0xeb, 0x83, 0x74, 0x43, 0x5f, 0xc3, 0x9a, 0x82, 0x96, 0x12, - 0x76, 0x60, 0x29, 0x16, 0x07, 0xb6, 0xd5, 0x59, 0xec, 0xae, 0x1e, 0x36, 0xf7, 0xf3, 0x29, 0xd9, - 0x4f, 0xa4, 0xa6, 0x51, 0x4a, 0x92, 0xd2, 0xa4, 0xb7, 0x33, 0x59, 0xf4, 0x00, 0x5a, 0xda, 0x99, - 0xc4, 0x73, 0x60, 0x45, 0x8a, 0x4b, 0x21, 0xeb, 0x83, 0x7c, 0x4f, 0xf7, 0x60, 0xed, 0x2c, 0x60, - 0x9e, 0x3e, 0x37, 0xf7, 0xdc, 0x6d, 0x1d, 0x5a, 0x5a, 0xb6, 0xec, 0xba, 0x0f, 0xd0, 0x12, 0xb2, - 0xce, 0xd9, 0x94, 0x8d, 0xa3, 0xc7, 0x3f, 0xca, 0x29, 0x10, 0x1d, 0x4e, 0xde, 0x41, 0xf5, 0x83, - 0xa5, 0xf7, 0x83, 0xe0, 0x49, 0x3b, 0x26, 0xfb, 0xae, 0xd9, 0x96, 0xbe, 0x81, 0xc6, 0xf9, 0x2c, - 0x3e, 0xf5, 0x47, 0xf9, 0xbd, 0x08, 0x54, 0x43, 0x16, 0x0f, 0xb3, 0xa7, 0x15, 0x6b, 0x5d, 0x67, - 0x05, 0xdf, 0x75, 0x07, 0x9a, 0x79, 0xbd, 0xea, 0x8d, 0x21, 0x8b, 0x72, 0x00, 0xb1, 0xa6, 0xaf, - 0xa0, 0xf1, 0x8e, 0x9b, 0x34, 0x66, 0x56, 0x4e, 0x5d, 0x51, 0xd4, 0x82, 0x20, 0xaf, 0x54, 0x04, - 0xa6, 0x42, 0xfa, 0x0c, 0x5a, 0xe9, 0xb8, 0x3f, 0xc0, 0x91, 0x9a, 0xa0, 0x4a, 0x4c, 0x21, 0x0f, - 0x7f, 0xd5, 0xa0, 0x7e, 0x92, 0xf4, 0xcf, 0x71, 0xe8, 0x93, 0x33, 0x58, 0xd5, 0xfc, 0x94, 0xb4, - 0xb5, 0xd6, 0x2a, 0xba, 0xaf, 0xe3, 0xce, 0x0b, 0x4b, 0xb9, 0x67, 0xb0, 0xaa, 0x19, 0x2c, 0x42, - 0x2b, 0xda, 0x31, 0x42, 0x2b, 0xf3, 0xe5, 0x4f, 0xd0, 0xc0, 0xa6, 0x4a, 0x3a, 0x05, 0x7e, 0xc3, - 0x45, 0x9d, 0xa7, 0xf7, 0x64, 0x28, 0x58, 0x6c, 0x97, 0x08, 0xb6, 0xd4, 0x9c, 0x11, 0x6c, 0xb9, - 0xd7, 0x92, 0x23, 0xa8, 0x49, 0xaf, 0x23, 0xff, 0x6b, 0xd9, 0xd8, 0x7f, 0x1d, 0xa7, 0x2c, 0x24, - 0x11, 0x4e, 0x60, 0x25, 0x73, 0x1f, 0xa2, 0xe7, 0x19, 0xfe, 0xe6, 0x3c, 0x29, 0x8d, 0x49, 0x90, - 0x3e, 0x80, 0x9a, 0x16, 0xb2, 0x6d, 0x58, 0x05, 0x9a, 0x49, 0xa7, 0x3d, 0x27, 0xaa, 0xf4, 0x64, - 0x56, 0x84, 0xf4, 0x18, 0xd6, 0x87, 0xf4, 0x14, 0xbc, 0xeb, 0x14, 0xea, 0xb9, 0x01, 0x11, 0x23, - 0x13, 0x59, 0x95, 0xb3, 0x5d, 0x1e, 0x54, 0x38, 0xb9, 0xd3, 0x20, 0x1c, 0xd3, 0xad, 0x10, 0x4e, - 0xc1, 0x9c, 0xc4, 0x33, 0xc9, 0x29, 0x46, 0xcf, 0x84, 0x9d, 0x01, 0x3d, 0x93, 0x39, 0xf4, 0x47, - 0x50, 0x93, 0x63, 0x8a, 0x10, 0xf0, 0xd0, 0x23, 0x04, 0x73, 0xaa, 0xfb, 0x00, 0x6a, 0x30, 0xd1, - 0x1b, 0x15, 0x06, 0xdb, 0x69, 0xcf, 0x89, 0xa6, 0x50, 0x6f, 0x77, 0x7f, 0xde, 0xba, 0xd6, 0xcd, - 0xad, 0x6b, 0xfd, 0xb9, 0x75, 0xad, 0x1f, 0x77, 0xee, 0xc2, 0xcd, 0x9d, 0xbb, 0xf0, 0xfb, 0xce, - 0x5d, 0xf8, 0xfc, 0x9f, 0xfe, 0x4b, 0xeb, 0xcb, 0x72, 0xf2, 0xef, 0xe5, 0xdf, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x0e, 0x8d, 0x9b, 0x46, 0x80, 0x09, 0x00, 0x00, + // 700 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xcd, 0x4e, 0xdb, 0x40, + 0x10, 0xc6, 0x21, 0x04, 0x3c, 0xd0, 0x98, 0x2c, 0x90, 0xba, 0x2e, 0x58, 0xe9, 0x4a, 0xa0, 0x48, + 0x45, 0xa0, 0xd2, 0x53, 0x7b, 0xe2, 0x27, 0x42, 0x42, 0x4d, 0xa5, 0x36, 0x80, 0x2a, 0xf5, 0xb6, + 0x8d, 0x57, 0x8a, 0xa5, 0x24, 0x76, 0x6d, 0xa7, 0xe5, 0xd4, 0x67, 0xe8, 0x63, 0xf5, 0xc8, 0xb1, + 0xc7, 0x2a, 0x79, 0x8b, 0x9e, 0xaa, 0x5d, 0xaf, 0xed, 0x5d, 0xdb, 0x01, 0x55, 0xb9, 0x80, 0xe7, + 0xef, 0x9b, 0x6f, 0x77, 0x67, 0x3e, 0x05, 0xf6, 0x88, 0xef, 0x1e, 0x13, 0xdf, 0xf5, 0x03, 0x2f, + 0xf2, 0x8e, 0xf9, 0xdf, 0x90, 0xd9, 0x47, 0xfc, 0x13, 0xe9, 0xfd, 0xa1, 0x4b, 0xc7, 0x11, 0xf1, + 0x5d, 0xbc, 0x0d, 0xe8, 0x22, 0xa0, 0x24, 0xa2, 0xd7, 0x3e, 0xe9, 0xd3, 0x1e, 0xfd, 0x3a, 0xa1, + 0x61, 0x84, 0xf7, 0x61, 0x4b, 0xf1, 0x86, 0xbe, 0x37, 0x0e, 0x29, 0xaa, 0x43, 0xc5, 0x75, 0x4c, + 0xad, 0xa5, 0xb5, 0xf5, 0x5e, 0xc5, 0x75, 0x58, 0x71, 0x87, 0x06, 0xee, 0xb7, 0x42, 0xb1, 0xe2, + 0x9d, 0x53, 0xfc, 0x0a, 0x76, 0xe2, 0x1e, 0x1d, 0xaf, 0x3f, 0x19, 0xd1, 0x71, 0x24, 0xea, 0x91, + 0x09, 0xab, 0x21, 0xab, 0xbc, 0x4a, 0xb2, 0x13, 0x13, 0xb7, 0xa1, 0x99, 0x2f, 0x99, 0x03, 0xfe, + 0x11, 0x76, 0x3a, 0x74, 0x48, 0xff, 0x03, 0x1c, 0xd9, 0x00, 0x8e, 0x48, 0xbe, 0x72, 0xcc, 0x0a, + 0x0f, 0x4a, 0x1e, 0x6c, 0x42, 0x33, 0x0f, 0x19, 0x37, 0xc7, 0x3f, 0xa0, 0x7e, 0xe6, 0x38, 0x37, + 0xf4, 0x6e, 0xf1, 0x2e, 0x08, 0x41, 0x35, 0xa2, 0x77, 0x91, 0xb9, 0xcc, 0x23, 0xfc, 0x9b, 0xd5, + 0xb8, 0xe1, 0xf5, 0x98, 0xf8, 0xe1, 0xc0, 0x8b, 0xcc, 0x6a, 0x4b, 0x6b, 0xaf, 0xf5, 0x24, 0x0f, + 0x26, 0x60, 0xa4, 0xfd, 0xc5, 0x7d, 0xa8, 0x6d, 0xb4, 0x42, 0x9b, 0x26, 0xd4, 0x06, 0x94, 0x38, + 0x29, 0x05, 0x61, 0x31, 0x7f, 0xe0, 0x79, 0xac, 0x26, 0x26, 0x20, 0x2c, 0xfc, 0x0e, 0x8c, 0xce, + 0x64, 0xe4, 0xdf, 0x04, 0x94, 0x2e, 0x7e, 0x93, 0x07, 0xb0, 0x99, 0x81, 0x09, 0xc2, 0x08, 0xaa, + 0xce, 0x64, 0xe4, 0x0b, 0x28, 0xfe, 0x8d, 0x5f, 0x82, 0x71, 0x36, 0x1c, 0xb2, 0xb4, 0xf0, 0xf1, + 0xd9, 0x38, 0x84, 0x2a, 0xcb, 0xcc, 0x4f, 0x02, 0xda, 0x86, 0x15, 0x76, 0xb6, 0xd0, 0xac, 0xb4, + 0x96, 0xdb, 0x7a, 0x2f, 0x36, 0xf0, 0x1b, 0xd8, 0xcc, 0xa0, 0x05, 0x85, 0x7d, 0x58, 0x89, 0x98, + 0xc3, 0xd4, 0x5a, 0xcb, 0xed, 0xf5, 0x13, 0xe3, 0x28, 0xdd, 0x92, 0x23, 0x4e, 0x35, 0x8e, 0x62, + 0xc4, 0x4b, 0xf9, 0x6c, 0x27, 0xb4, 0xf0, 0x31, 0x34, 0x24, 0x9f, 0xc0, 0xb3, 0x60, 0x4d, 0x90, + 0x8b, 0x21, 0xf5, 0x5e, 0x6a, 0xe3, 0x43, 0xd8, 0xec, 0x7a, 0xc4, 0x91, 0xf7, 0xe6, 0x81, 0xb3, + 0x6d, 0x41, 0x43, 0xca, 0x16, 0x53, 0xf7, 0x1e, 0x1a, 0x8c, 0xd6, 0x07, 0x12, 0x90, 0x51, 0xb8, + 0xf8, 0xa3, 0x5c, 0x02, 0x92, 0xe1, 0xc4, 0x19, 0xb2, 0x79, 0xd0, 0xe4, 0x79, 0x60, 0x7d, 0xe2, + 0x89, 0x49, 0xee, 0x35, 0x31, 0xf1, 0x29, 0x6c, 0x7c, 0x22, 0x51, 0x7f, 0xf0, 0x38, 0xa3, 0x26, + 0xd4, 0xd8, 0x8d, 0x66, 0x33, 0x18, 0x5b, 0xd8, 0x80, 0x27, 0x02, 0x41, 0x9c, 0xf4, 0x1c, 0xea, + 0xb7, 0xe3, 0xef, 0x8b, 0x81, 0x36, 0xc0, 0x48, 0x31, 0x62, 0xd8, 0x93, 0xbf, 0x35, 0xd0, 0x2f, + 0xf8, 0x13, 0x9f, 0xf9, 0x2e, 0xea, 0xc2, 0xba, 0x24, 0x79, 0x68, 0x4f, 0x7a, 0xfd, 0xa2, 0x40, + 0x5a, 0xf6, 0xbc, 0xb0, 0xb8, 0xb7, 0x2e, 0xac, 0x4b, 0x1a, 0xa8, 0xa0, 0x15, 0x15, 0x53, 0x41, + 0x2b, 0x93, 0xce, 0x5b, 0xa8, 0xab, 0xba, 0x87, 0x5a, 0x85, 0xfe, 0x39, 0xa1, 0xb3, 0x5e, 0x3c, + 0x90, 0x91, 0xc1, 0xaa, 0x8a, 0xa6, 0xc0, 0x96, 0xea, 0xa7, 0x02, 0x5b, 0x2e, 0x87, 0xe8, 0x14, + 0x56, 0x85, 0x1c, 0xa1, 0x67, 0x52, 0xb6, 0x2a, 0x91, 0x96, 0x55, 0x16, 0x12, 0x08, 0x17, 0xb0, + 0x96, 0x08, 0x04, 0x92, 0xf3, 0x72, 0x12, 0x64, 0x3d, 0x2f, 0x8d, 0x09, 0x90, 0x2b, 0x80, 0x6c, + 0xa0, 0xd1, 0x6e, 0x6e, 0x9b, 0x95, 0xb5, 0xb1, 0xf6, 0xe6, 0x44, 0x33, 0x3e, 0x89, 0x5a, 0x28, + 0x7c, 0x72, 0xea, 0xa4, 0xf0, 0x29, 0xc8, 0xcb, 0x25, 0xe8, 0xa9, 0x46, 0xa0, 0x5c, 0xa6, 0xa2, + 0x26, 0xd6, 0x6e, 0x79, 0x30, 0xc3, 0x49, 0xc5, 0x40, 0xc1, 0xc9, 0x0b, 0x8a, 0x82, 0x53, 0xd0, + 0x0f, 0xf4, 0x16, 0x56, 0xf8, 0x9a, 0xa1, 0xa7, 0x52, 0x9a, 0xbc, 0xba, 0x96, 0x59, 0x0c, 0x64, + 0x4f, 0x2c, 0xb6, 0x49, 0x79, 0x62, 0x75, 0x4b, 0x95, 0x27, 0xce, 0x2d, 0xdf, 0xf9, 0xc1, 0xaf, + 0xa9, 0xad, 0xdd, 0x4f, 0x6d, 0xed, 0xcf, 0xd4, 0xd6, 0x7e, 0xce, 0xec, 0xa5, 0xfb, 0x99, 0xbd, + 0xf4, 0x7b, 0x66, 0x2f, 0x7d, 0xde, 0x90, 0x7f, 0xbb, 0x7c, 0xa9, 0xf1, 0x7f, 0xaf, 0xff, 0x05, + 0x00, 0x00, 0xff, 0xff, 0x7d, 0xa4, 0xb1, 0xae, 0xd2, 0x08, 0x00, 0x00, } func (m *CreateSpaceRequest) Marshal() (dAtA []byte, err error) { @@ -2022,7 +1921,7 @@ func (m *TreeParamsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *PutFileRequest) Marshal() (dAtA []byte, err error) { +func (m *WatchRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2032,34 +1931,34 @@ func (m *PutFileRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *PutFileRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *WatchRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *PutFileRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *WatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l + if len(m.TreeId) > 0 { + i -= len(m.TreeId) + copy(dAtA[i:], m.TreeId) + i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId))) + i-- + dAtA[i] = 0x12 + } if len(m.SpaceId) > 0 { i -= len(m.SpaceId) copy(dAtA[i:], m.SpaceId) i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId))) i-- - dAtA[i] = 0x12 - } - if len(m.Path) > 0 { - i -= len(m.Path) - copy(dAtA[i:], m.Path) - i = encodeVarintApi(dAtA, i, uint64(len(m.Path))) - i-- dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *PutFileResponse) Marshal() (dAtA []byte, err error) { +func (m *WatchResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2069,27 +1968,20 @@ func (m *PutFileResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *PutFileResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *WatchResponse) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *PutFileResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *WatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Hash) > 0 { - i -= len(m.Hash) - copy(dAtA[i:], m.Hash) - i = encodeVarintApi(dAtA, i, uint64(len(m.Hash))) - i-- - dAtA[i] = 0xa - } return len(dAtA) - i, nil } -func (m *GetFileRequest) Marshal() (dAtA []byte, err error) { +func (m *UnwatchRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2099,34 +1991,34 @@ func (m *GetFileRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *GetFileRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *UnwatchRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *GetFileRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *UnwatchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Path) > 0 { - i -= len(m.Path) - copy(dAtA[i:], m.Path) - i = encodeVarintApi(dAtA, i, uint64(len(m.Path))) + if len(m.TreeId) > 0 { + i -= len(m.TreeId) + copy(dAtA[i:], m.TreeId) + i = encodeVarintApi(dAtA, i, uint64(len(m.TreeId))) i-- dAtA[i] = 0x12 } - if len(m.Hash) > 0 { - i -= len(m.Hash) - copy(dAtA[i:], m.Hash) - i = encodeVarintApi(dAtA, i, uint64(len(m.Hash))) + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintApi(dAtA, i, uint64(len(m.SpaceId))) i-- dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *GetFileResponse) Marshal() (dAtA []byte, err error) { +func (m *UnwatchResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2136,72 +2028,12 @@ func (m *GetFileResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *GetFileResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *UnwatchResponse) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *GetFileResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Path) > 0 { - i -= len(m.Path) - copy(dAtA[i:], m.Path) - i = encodeVarintApi(dAtA, i, uint64(len(m.Path))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *DeleteFileRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *DeleteFileRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *DeleteFileRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Hash) > 0 { - i -= len(m.Hash) - copy(dAtA[i:], m.Hash) - i = encodeVarintApi(dAtA, i, uint64(len(m.Hash))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *DeleteFileResponse) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *DeleteFileResponse) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *DeleteFileResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *UnwatchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -2520,80 +2352,50 @@ func (m *TreeParamsResponse) Size() (n int) { return n } -func (m *PutFileRequest) Size() (n int) { +func (m *WatchRequest) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Path) - if l > 0 { - n += 1 + l + sovApi(uint64(l)) - } l = len(m.SpaceId) if l > 0 { n += 1 + l + sovApi(uint64(l)) } + l = len(m.TreeId) + if l > 0 { + n += 1 + l + sovApi(uint64(l)) + } return n } -func (m *PutFileResponse) Size() (n int) { +func (m *WatchResponse) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Hash) - if l > 0 { - n += 1 + l + sovApi(uint64(l)) - } return n } -func (m *GetFileRequest) Size() (n int) { +func (m *UnwatchRequest) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Hash) + l = len(m.SpaceId) if l > 0 { n += 1 + l + sovApi(uint64(l)) } - l = len(m.Path) + l = len(m.TreeId) if l > 0 { n += 1 + l + sovApi(uint64(l)) } return n } -func (m *GetFileResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Path) - if l > 0 { - n += 1 + l + sovApi(uint64(l)) - } - return n -} - -func (m *DeleteFileRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Hash) - if l > 0 { - n += 1 + l + sovApi(uint64(l)) - } - return n -} - -func (m *DeleteFileResponse) Size() (n int) { +func (m *UnwatchResponse) Size() (n int) { if m == nil { return 0 } @@ -4480,7 +4282,7 @@ func (m *TreeParamsResponse) Unmarshal(dAtA []byte) error { } return nil } -func (m *PutFileRequest) Unmarshal(dAtA []byte) error { +func (m *WatchRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -4503,45 +4305,13 @@ func (m *PutFileRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: PutFileRequest: wiretype end group for non-group") + return fmt.Errorf("proto: WatchRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: PutFileRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: WatchRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthApi - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthApi - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Path = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) } @@ -4573,59 +4343,9 @@ func (m *PutFileRequest) Unmarshal(dAtA []byte) error { } m.SpaceId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipApi(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthApi - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *PutFileResponse) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: PutFileResponse: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: PutFileResponse: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -4653,7 +4373,7 @@ func (m *PutFileResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Hash = string(dAtA[iNdEx:postIndex]) + m.TreeId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -4676,7 +4396,7 @@ func (m *PutFileResponse) Unmarshal(dAtA []byte) error { } return nil } -func (m *GetFileRequest) Unmarshal(dAtA []byte) error { +func (m *WatchResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -4699,15 +4419,65 @@ func (m *GetFileRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: GetFileRequest: wiretype end group for non-group") + return fmt.Errorf("proto: WatchResponse: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: GetFileRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: WatchResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UnwatchRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UnwatchRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UnwatchRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -4735,11 +4505,11 @@ func (m *GetFileRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Hash = string(dAtA[iNdEx:postIndex]) + m.SpaceId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field TreeId", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -4767,7 +4537,7 @@ func (m *GetFileRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Path = string(dAtA[iNdEx:postIndex]) + m.TreeId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -4790,7 +4560,7 @@ func (m *GetFileRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *GetFileResponse) Unmarshal(dAtA []byte) error { +func (m *UnwatchResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -4813,174 +4583,10 @@ func (m *GetFileResponse) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: GetFileResponse: wiretype end group for non-group") + return fmt.Errorf("proto: UnwatchResponse: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: GetFileResponse: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthApi - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthApi - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Path = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipApi(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthApi - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *DeleteFileRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: DeleteFileRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: DeleteFileRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthApi - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthApi - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Hash = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipApi(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthApi - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *DeleteFileResponse) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowApi - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: DeleteFileResponse: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: DeleteFileResponse: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: UnwatchResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { default: diff --git a/client/api/apiproto/api_drpc.pb.go b/client/api/apiproto/api_drpc.pb.go index e3cc1c63..6bf49f0c 100644 --- a/client/api/apiproto/api_drpc.pb.go +++ b/client/api/apiproto/api_drpc.pb.go @@ -50,9 +50,8 @@ type DRPCClientApiClient interface { AllTrees(ctx context.Context, in *AllTreesRequest) (*AllTreesResponse, error) AllSpaces(ctx context.Context, in *AllSpacesRequest) (*AllSpacesResponse, error) LoadSpace(ctx context.Context, in *LoadSpaceRequest) (*LoadSpaceResponse, error) - PutFile(ctx context.Context, in *PutFileRequest) (*PutFileResponse, error) - GetFile(ctx context.Context, in *GetFileRequest) (*GetFileResponse, error) - DeleteFile(ctx context.Context, in *DeleteFileRequest) (*DeleteFileResponse, error) + Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error) + Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error) } type drpcClientApiClient struct { @@ -155,27 +154,18 @@ func (c *drpcClientApiClient) LoadSpace(ctx context.Context, in *LoadSpaceReques return out, nil } -func (c *drpcClientApiClient) PutFile(ctx context.Context, in *PutFileRequest) (*PutFileResponse, error) { - out := new(PutFileResponse) - err := c.cc.Invoke(ctx, "/clientapi.ClientApi/PutFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) +func (c *drpcClientApiClient) Watch(ctx context.Context, in *WatchRequest) (*WatchResponse, error) { + out := new(WatchResponse) + err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) if err != nil { return nil, err } return out, nil } -func (c *drpcClientApiClient) GetFile(ctx context.Context, in *GetFileRequest) (*GetFileResponse, error) { - out := new(GetFileResponse) - err := c.cc.Invoke(ctx, "/clientapi.ClientApi/GetFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *drpcClientApiClient) DeleteFile(ctx context.Context, in *DeleteFileRequest) (*DeleteFileResponse, error) { - out := new(DeleteFileResponse) - err := c.cc.Invoke(ctx, "/clientapi.ClientApi/DeleteFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) +func (c *drpcClientApiClient) Unwatch(ctx context.Context, in *UnwatchRequest) (*UnwatchResponse, error) { + out := new(UnwatchResponse) + err := c.cc.Invoke(ctx, "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out) if err != nil { return nil, err } @@ -193,9 +183,8 @@ type DRPCClientApiServer interface { AllTrees(context.Context, *AllTreesRequest) (*AllTreesResponse, error) AllSpaces(context.Context, *AllSpacesRequest) (*AllSpacesResponse, error) LoadSpace(context.Context, *LoadSpaceRequest) (*LoadSpaceResponse, error) - PutFile(context.Context, *PutFileRequest) (*PutFileResponse, error) - GetFile(context.Context, *GetFileRequest) (*GetFileResponse, error) - DeleteFile(context.Context, *DeleteFileRequest) (*DeleteFileResponse, error) + Watch(context.Context, *WatchRequest) (*WatchResponse, error) + Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error) } type DRPCClientApiUnimplementedServer struct{} @@ -240,21 +229,17 @@ func (s *DRPCClientApiUnimplementedServer) LoadSpace(context.Context, *LoadSpace return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } -func (s *DRPCClientApiUnimplementedServer) PutFile(context.Context, *PutFileRequest) (*PutFileResponse, error) { +func (s *DRPCClientApiUnimplementedServer) Watch(context.Context, *WatchRequest) (*WatchResponse, error) { return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } -func (s *DRPCClientApiUnimplementedServer) GetFile(context.Context, *GetFileRequest) (*GetFileResponse, error) { - return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) -} - -func (s *DRPCClientApiUnimplementedServer) DeleteFile(context.Context, *DeleteFileRequest) (*DeleteFileResponse, error) { +func (s *DRPCClientApiUnimplementedServer) Unwatch(context.Context, *UnwatchRequest) (*UnwatchResponse, error) { return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } type DRPCClientApiDescription struct{} -func (DRPCClientApiDescription) NumMethods() int { return 13 } +func (DRPCClientApiDescription) NumMethods() int { return 12 } func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { switch n { @@ -349,32 +334,23 @@ func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Recei ) }, DRPCClientApiServer.LoadSpace, true case 10: - return "/clientapi.ClientApi/PutFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, + return "/clientapi.ClientApi/Watch", drpcEncoding_File_api_apiproto_protos_api_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCClientApiServer). - PutFile( + Watch( ctx, - in1.(*PutFileRequest), + in1.(*WatchRequest), ) - }, DRPCClientApiServer.PutFile, true + }, DRPCClientApiServer.Watch, true case 11: - return "/clientapi.ClientApi/GetFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, + return "/clientapi.ClientApi/Unwatch", drpcEncoding_File_api_apiproto_protos_api_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCClientApiServer). - GetFile( + Unwatch( ctx, - in1.(*GetFileRequest), + in1.(*UnwatchRequest), ) - }, DRPCClientApiServer.GetFile, true - case 12: - return "/clientapi.ClientApi/DeleteFile", drpcEncoding_File_api_apiproto_protos_api_proto{}, - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return srv.(DRPCClientApiServer). - DeleteFile( - ctx, - in1.(*DeleteFileRequest), - ) - }, DRPCClientApiServer.DeleteFile, true + }, DRPCClientApiServer.Unwatch, true default: return "", nil, nil, nil, false } @@ -544,48 +520,32 @@ func (x *drpcClientApi_LoadSpaceStream) SendAndClose(m *LoadSpaceResponse) error return x.CloseSend() } -type DRPCClientApi_PutFileStream interface { +type DRPCClientApi_WatchStream interface { drpc.Stream - SendAndClose(*PutFileResponse) error + SendAndClose(*WatchResponse) error } -type drpcClientApi_PutFileStream struct { +type drpcClientApi_WatchStream struct { drpc.Stream } -func (x *drpcClientApi_PutFileStream) SendAndClose(m *PutFileResponse) error { +func (x *drpcClientApi_WatchStream) SendAndClose(m *WatchResponse) error { if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil { return err } return x.CloseSend() } -type DRPCClientApi_GetFileStream interface { +type DRPCClientApi_UnwatchStream interface { drpc.Stream - SendAndClose(*GetFileResponse) error + SendAndClose(*UnwatchResponse) error } -type drpcClientApi_GetFileStream struct { +type drpcClientApi_UnwatchStream struct { drpc.Stream } -func (x *drpcClientApi_GetFileStream) SendAndClose(m *GetFileResponse) error { - if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil { - return err - } - return x.CloseSend() -} - -type DRPCClientApi_DeleteFileStream interface { - drpc.Stream - SendAndClose(*DeleteFileResponse) error -} - -type drpcClientApi_DeleteFileStream struct { - drpc.Stream -} - -func (x *drpcClientApi_DeleteFileStream) SendAndClose(m *DeleteFileResponse) error { +func (x *drpcClientApi_UnwatchStream) SendAndClose(m *UnwatchResponse) error { if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil { return err } diff --git a/client/api/apiproto/protos/api.proto b/client/api/apiproto/protos/api.proto index 6b7e98dd..2eff85b8 100644 --- a/client/api/apiproto/protos/api.proto +++ b/client/api/apiproto/protos/api.proto @@ -14,9 +14,8 @@ service ClientApi { rpc AllTrees(AllTreesRequest) returns(AllTreesResponse); rpc AllSpaces(AllSpacesRequest) returns(AllSpacesResponse); rpc LoadSpace(LoadSpaceRequest) returns(LoadSpaceResponse); - rpc PutFile(PutFileRequest) returns(PutFileResponse); - rpc GetFile(GetFileRequest) returns(GetFileResponse); - rpc DeleteFile(DeleteFileRequest) returns(DeleteFileResponse); + rpc Watch(WatchRequest) returns(WatchResponse); + rpc Unwatch(UnwatchRequest) returns(UnwatchResponse); } message CreateSpaceRequest { @@ -108,26 +107,18 @@ message TreeParamsResponse { repeated string headIds = 2; } -message PutFileRequest { - string path = 1; - string spaceId = 2; +message WatchRequest { + string spaceId = 1; + string treeId = 2; } -message PutFileResponse { - string hash = 1; +message WatchResponse { } -message GetFileRequest { - string hash = 1; - string path = 2; +message UnwatchRequest { + string spaceId = 1; + string treeId = 2; } -message GetFileResponse { - string path = 1; -} - -message DeleteFileRequest { - string hash = 1; -} - -message DeleteFileResponse {} \ No newline at end of file +message UnwatchResponse { +} \ No newline at end of file diff --git a/client/api/rpchandler.go b/client/api/rpchandler.go index 8fabfdfb..3c3edcba 100644 --- a/client/api/rpchandler.go +++ b/client/api/rpchandler.go @@ -7,14 +7,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/client/document" "github.com/anytypeio/go-anytype-infrastructure-experiments/client/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" - "github.com/ipfs/go-cid" - "go.uber.org/zap" - "io" "math/rand" - "os" + "sync" ) type rpcHandler struct { @@ -22,7 +18,49 @@ type rpcHandler struct { storageService storage.ClientStorage docService document.Service account account.Service - file fileservice.FileService + treeWatcher *watcher + sync.Mutex +} + +func (r *rpcHandler) Watch(ctx context.Context, request *apiproto.WatchRequest) (resp *apiproto.WatchResponse, err error) { + space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId) + if err != nil { + return + } + r.Lock() + defer r.Unlock() + + ch := make(chan bool) + r.treeWatcher = newWatcher(request.SpaceId, request.TreeId, ch) + space.StatusService().Watch(request.TreeId, ch) + go r.treeWatcher.run() + resp = &apiproto.WatchResponse{} + return +} + +func (r *rpcHandler) Unwatch(ctx context.Context, request *apiproto.UnwatchRequest) (resp *apiproto.UnwatchResponse, err error) { + space, err := r.spaceService.GetSpace(context.Background(), request.SpaceId) + if err != nil { + return + } + var treeWatcher *watcher + space.StatusService().Unwatch(request.TreeId) + + r.Lock() + if r.treeWatcher != nil { + treeWatcher = r.treeWatcher + } + r.Unlock() + + treeWatcher.close() + + r.Lock() + if r.treeWatcher == treeWatcher { + r.treeWatcher = nil + } + r.Unlock() + resp = &apiproto.UnwatchResponse{} + return } func (r *rpcHandler) LoadSpace(ctx context.Context, request *apiproto.LoadSpaceRequest) (resp *apiproto.LoadSpaceResponse, err error) { @@ -150,50 +188,3 @@ func (r *rpcHandler) TreeParams(ctx context.Context, request *apiproto.TreeParam } return } - -func (r *rpcHandler) PutFile(ctx context.Context, request *apiproto.PutFileRequest) (*apiproto.PutFileResponse, error) { - f, err := os.Open(request.Path) - if err != nil { - return nil, err - } - defer f.Close() - n, err := r.file.AddFile(ctx, f) - if err != nil { - return nil, err - } - return &apiproto.PutFileResponse{ - Hash: n.Cid().String(), - }, nil -} - -func (r *rpcHandler) GetFile(ctx context.Context, request *apiproto.GetFileRequest) (*apiproto.GetFileResponse, error) { - c, err := cid.Parse(request.Hash) - if err != nil { - return nil, err - } - - f, err := os.Create(request.Path) - if err != nil { - return nil, err - } - defer f.Close() - - rd, err := r.file.GetFile(ctx, c) - if err != nil { - return nil, err - } - defer rd.Close() - wr, err := io.Copy(f, rd) - if err != nil && err != io.EOF { - return nil, err - } - log.Info("copied bytes", zap.Int64("size", wr)) - return &apiproto.GetFileResponse{ - Path: request.Path, - }, nil -} - -func (r *rpcHandler) DeleteFile(ctx context.Context, request *apiproto.DeleteFileRequest) (*apiproto.DeleteFileResponse, error) { - //TODO implement me - panic("implement me") -} diff --git a/client/api/service.go b/client/api/service.go index b0a7ec62..8e7ea5c9 100644 --- a/client/api/service.go +++ b/client/api/service.go @@ -9,7 +9,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonfile/fileservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" @@ -37,7 +36,6 @@ type service struct { storageService clientstorage.ClientStorage docService document.Service account account.Service - file fileservice.FileService *server.BaseDrpcServer } @@ -48,7 +46,6 @@ func (s *service) Init(a *app.App) (err error) { s.account = a.MustComponent(account.CName).(account.Service) s.cfg = a.MustComponent(config.CName).(*config.Config) s.transport = a.MustComponent(secure.CName).(secure.Service) - s.file = a.MustComponent(fileservice.CName).(fileservice.FileService) return nil } @@ -70,7 +67,12 @@ func (s *service) Run(ctx context.Context) (err error) { if err != nil { return } - return apiproto.DRPCRegisterClientApi(s, &rpcHandler{s.spaceService, s.storageService, s.docService, s.account, s.file}) + return apiproto.DRPCRegisterClientApi(s, &rpcHandler{ + spaceService: s.spaceService, + storageService: s.storageService, + docService: s.docService, + account: s.account, + }) } func (s *service) Close(ctx context.Context) (err error) { diff --git a/client/api/watcher.go b/client/api/watcher.go new file mode 100644 index 00000000..e7d13a9b --- /dev/null +++ b/client/api/watcher.go @@ -0,0 +1,37 @@ +package api + +import "go.uber.org/zap" + +type watcher struct { + spaceId string + treeId string + watcher chan bool + watcherDone chan struct{} +} + +func newWatcher(spaceId, treeId string, ch chan bool) *watcher { + return &watcher{ + spaceId: spaceId, + treeId: treeId, + watcher: ch, + watcherDone: make(chan struct{}), + } +} + +func (w *watcher) run() { + log := log.With(zap.String("spaceId", w.spaceId), zap.String("treeId", w.treeId)) + log.Debug("started watching") + defer close(w.watcherDone) + for { + synced, ok := <-w.watcher + if !ok { + log.Debug("stopped watching") + return + } + log.With(zap.Bool("synced", synced)).Debug("updated sync status") + } +} + +func (w *watcher) close() { + <-w.watcherDone +} diff --git a/client/go.mod b/client/go.mod index 949bcd38..44453872 100644 --- a/client/go.mod +++ b/client/go.mod @@ -2,10 +2,75 @@ module github.com/anytypeio/go-anytype-infrastructure-experiments/client replace github.com/anytypeio/go-anytype-infrastructure-experiments/common => ../common - go 1.19 require ( github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000 github.com/dgraph-io/badger/v3 v3.2103.3 + github.com/gogo/protobuf v1.3.2 + github.com/stretchr/testify v1.8.0 + go.uber.org/zap v1.23.0 + storj.io/drpc v0.0.32 +) + +require ( + github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 // indirect + github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/fogleman/gg v1.3.0 // indirect + github.com/goccy/go-graphviz v0.0.9 // indirect + github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.3 // indirect + github.com/google/flatbuffers v1.12.1 // indirect + github.com/huandu/skiplist v1.2.0 // indirect + github.com/ipfs/go-cid v0.3.2 // indirect + github.com/ipfs/go-log/v2 v2.5.1 // indirect + github.com/klauspost/compress v1.15.10 // indirect + github.com/klauspost/cpuid/v2 v2.1.1 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect + github.com/libp2p/go-libp2p v0.23.2 // indirect + github.com/libp2p/go-openssl v0.1.0 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-pointer v0.0.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/minio/sha256-simd v1.0.0 // indirect + github.com/mr-tron/base58 v1.2.0 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.1.0 // indirect + github.com/multiformats/go-multiaddr v0.7.0 // indirect + github.com/multiformats/go-multibase v0.1.1 // indirect + github.com/multiformats/go-multicodec v0.6.0 // indirect + github.com/multiformats/go-multihash v0.2.1 // indirect + github.com/multiformats/go-varint v0.0.6 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.13.0 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.37.0 // indirect + github.com/prometheus/procfs v0.8.0 // indirect + github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/zeebo/blake3 v0.2.3 // indirect + github.com/zeebo/errs v1.3.0 // indirect + go.opencensus.io v0.23.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.8.0 // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect + golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect + golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5 // indirect + golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect + google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/client/go.sum b/client/go.sum index b91db95f..cf6293a0 100644 --- a/client/go.sum +++ b/client/go.sum @@ -40,6 +40,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3 h1:yIyGIb7bRkEngKtQ0Ja5bome2SEnErwTaEvR8dA/WtU= +github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-20221217135026-4eba413631b3/go.mod h1:w0i62cRB2jVpjFb2CpPNj5J+ihKqqmBBG9X2+Odekjw= github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 h1:kMPPZYmJgbs4AJfodbg2OCXg5cp+9LPAJcLZJqmcghk= github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232/go.mod h1:+PeHBAWp7gUh/yw6uAauKc5ku0w4cFNg6DUddGxoGq0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -56,6 +58,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c h1:+bD75daSbsxyTzkKpNplC4xls+7/tGwty+zruzOnOmk= +github.com/cheggaaa/mb/v3 v3.0.0-20221122160120-e9034545510c/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -214,8 +218,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE= github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI= -github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= -github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -305,12 +307,15 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -353,8 +358,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -365,6 +370,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg= diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 6b279140..30d2eff2 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" @@ -108,12 +109,20 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) + + var statusService statusservice.StatusService + // this will work only for clients, not the best solution, but... + if !lastConfiguration.IsResponsible(st.Id()) { + statusService = statusservice.NewStatusService(st.Id(), lastConfiguration) + } + diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, log) syncService := syncservice.NewSyncService(id, confConnector, s.config.SyncPeriod) sp := &space{ id: id, syncService: syncService, diffService: diffService, + statusService: statusService, cache: s.treeGetter, account: s.account, configuration: lastConfiguration, diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 9a656ee7..fdf6e633 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -9,6 +9,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsdocument/deletionstate" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/statusservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" @@ -80,6 +81,8 @@ type Space interface { BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (tree.ObjectTree, error) DeleteTree(ctx context.Context, id string) (err error) + StatusService() statusservice.StatusService + Close() error } @@ -92,6 +95,7 @@ type space struct { syncService syncservice.SyncService diffService diffservice.DiffService + statusService statusservice.StatusService storage storage.SpaceStorage cache treegetter.TreeGetter account account.Service @@ -202,6 +206,10 @@ func (s *space) DiffService() diffservice.DiffService { return s.diffService } +func (s *space) StatusService() statusservice.StatusService { + return s.statusService +} + func (s *space) StoredIds() []string { return s.diffService.AllIds() } @@ -222,6 +230,7 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, + StatusService: s.statusService, } return synctree.DeriveSyncTree(ctx, deps) } @@ -238,6 +247,7 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay Configuration: s.configuration, AclList: s.aclList, SpaceStorage: s.storage, + StatusService: s.statusService, } return synctree.CreateSyncTree(ctx, deps) } @@ -256,6 +266,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene AclList: s.aclList, SpaceStorage: s.storage, TreeUsage: &s.treesUsed, + StatusService: s.statusService, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } @@ -286,5 +297,10 @@ func (s *space) Close() error { if err := s.storage.Close(); err != nil { mError.Add(err) } + if s.statusService != nil { + if err := s.statusService.Close(); err != nil { + mError.Add(err) + } + } return mError.Err() } diff --git a/common/commonspace/statusservice/statusservice.go b/common/commonspace/statusservice/statusservice.go index 59e73637..02f803d8 100644 --- a/common/commonspace/statusservice/statusservice.go +++ b/common/commonspace/statusservice/statusservice.go @@ -17,6 +17,7 @@ type StatusService interface { Unwatch(treeId string) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) + Close() error } type statusEntry struct { @@ -31,6 +32,7 @@ type statusService struct { watchers map[string]chan bool configuration nodeconf.Configuration stateCounter uint64 + closed bool } func NewStatusService(spaceId string, configuration nodeconf.Configuration) StatusService { @@ -46,6 +48,10 @@ func NewStatusService(spaceId string, configuration nodeconf.Configuration) Stat func (s *statusService) HeadsChange(treeId string, heads []string) { s.Lock() defer s.Unlock() + if s.closed { + return + } + s.treeHeads[treeId] = statusEntry{ head: heads[0], stateCounter: s.stateCounter, @@ -62,6 +68,10 @@ func (s *statusService) HeadsChange(treeId string, heads []string) { func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { s.Lock() defer s.Unlock() + if s.closed { + return + } + curHead, ok := s.treeHeads[treeId] if !ok { return @@ -84,13 +94,22 @@ func (s *statusService) HeadsReceive(senderId, treeId string, heads []string) { func (s *statusService) Watch(treeId string, ch chan bool) { s.Lock() defer s.Unlock() + if s.closed { + return + } s.watchers[treeId] = ch } func (s *statusService) Unwatch(treeId string) { s.Lock() defer s.Unlock() - delete(s.watchers, treeId) + if s.closed { + return + } + if ch, ok := s.watchers[treeId]; ok { + close(ch) + delete(s.watchers, treeId) + } } func (s *statusService) StateCounter() uint64 { @@ -99,6 +118,18 @@ func (s *statusService) StateCounter() uint64 { return s.stateCounter } +func (s *statusService) Close() (err error) { + s.Lock() + defer s.Unlock() + if s.closed { + return + } + for _, ch := range s.watchers { + close(ch) + } + return +} + func (s *statusService) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { return diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index ddbdd19e..2541f318 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -63,6 +63,7 @@ type CreateDeps struct { SyncService syncservice.SyncService AclList list.ACLList SpaceStorage spacestorage.SpaceStorage + StatusService statusservice.StatusService } type BuildDeps struct { @@ -75,9 +76,14 @@ type BuildDeps struct { SpaceStorage spacestorage.SpaceStorage TreeStorage storage.TreeStorage TreeUsage *atomic.Int32 + StatusService statusservice.StatusService } -func newWrappedSyncClient(spaceId string, factory RequestFactory, syncService syncservice.SyncService, configuration nodeconf.Configuration) SyncClient { +func newWrappedSyncClient( + spaceId string, + factory RequestFactory, + syncService syncservice.SyncService, + configuration nodeconf.Configuration) SyncClient { syncClient := newSyncClient(spaceId, syncService.StreamPool(), factory, configuration, syncService.StreamChecker()) return newQueuedClient(syncClient, syncService.ActionQueue()) } @@ -95,6 +101,7 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -112,6 +119,7 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (id string, err error) deps.Configuration) headUpdate := syncClient.CreateHeadUpdate(objTree, nil) + deps.StatusService.HeadsChange(objTree.ID(), objTree.Heads()) syncClient.BroadcastAsync(headUpdate) id = objTree.ID() return @@ -197,13 +205,14 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy deps.SyncService, deps.Configuration) syncTree := &syncTree{ - ObjectTree: objTree, - syncClient: syncClient, - notifiable: deps.HeadNotifiable, - treeUsage: deps.TreeUsage, - listener: deps.Listener, + ObjectTree: objTree, + syncClient: syncClient, + notifiable: deps.HeadNotifiable, + treeUsage: deps.TreeUsage, + listener: deps.Listener, + statusService: deps.StatusService, } - syncHandler := newSyncTreeHandler(syncTree, syncClient) + syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.StatusService) syncTree.SyncHandler = syncHandler t = syncTree syncTree.Lock() diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index 96b6bdcb..7994e1f1 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -23,11 +23,12 @@ type syncTreeHandler struct { const maxQueueSize = 5 -func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler { +func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient, statusService statusservice.StatusService) synchandler.SyncHandler { return &syncTreeHandler{ - objTree: objTree, - syncClient: syncClient, - queue: newReceiveQueue(maxQueueSize), + objTree: objTree, + syncClient: syncClient, + statusService: statusService, + queue: newReceiveQueue(maxQueueSize), } } diff --git a/common/go.mod b/common/go.mod index f8522376..59a109c7 100644 --- a/common/go.mod +++ b/common/go.mod @@ -57,6 +57,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/common/go.sum b/common/go.sum index 3359eaee..bfe4ab84 100644 --- a/common/go.sum +++ b/common/go.sum @@ -326,6 +326,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= +golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg= diff --git a/util/cmd/debug/commands/client/service.go b/util/cmd/debug/commands/client/service.go index 4eb321e7..778f4c5e 100644 --- a/util/cmd/debug/commands/client/service.go +++ b/util/cmd/debug/commands/client/service.go @@ -24,9 +24,8 @@ type Service interface { AllTrees(ctx context.Context, ip string, request *apiproto.AllTreesRequest) (resp *apiproto.AllTreesResponse, err error) AllSpaces(ctx context.Context, ip string, request *apiproto.AllSpacesRequest) (resp *apiproto.AllSpacesResponse, err error) LoadSpace(ctx context.Context, ip string, request *apiproto.LoadSpaceRequest) (res *apiproto.LoadSpaceResponse, err error) - PutFile(ctx context.Context, ip string, request *apiproto.PutFileRequest) (resp *apiproto.PutFileResponse, err error) - GetFile(ctx context.Context, ip string, request *apiproto.GetFileRequest) (resp *apiproto.GetFileResponse, err error) - DeleteFile(ctx context.Context, ip string, request *apiproto.DeleteFileRequest) (resp *apiproto.DeleteFileResponse, err error) + Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error) + Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error) } type service struct { @@ -126,26 +125,18 @@ func (s *service) LoadSpace(ctx context.Context, ip string, request *apiproto.Lo return cl.LoadSpace(ctx, request) } -func (s *service) PutFile(ctx context.Context, ip string, request *apiproto.PutFileRequest) (resp *apiproto.PutFileResponse, err error) { +func (s *service) Watch(ctx context.Context, ip string, request *apiproto.WatchRequest) (res *apiproto.WatchResponse, err error) { cl, err := s.client.GetClient(ctx, ip) if err != nil { return } - return cl.PutFile(ctx, request) + return cl.Watch(ctx, request) } -func (s *service) GetFile(ctx context.Context, ip string, request *apiproto.GetFileRequest) (resp *apiproto.GetFileResponse, err error) { +func (s *service) Unwatch(ctx context.Context, ip string, request *apiproto.UnwatchRequest) (res *apiproto.UnwatchResponse, err error) { cl, err := s.client.GetClient(ctx, ip) if err != nil { return } - return cl.GetFile(ctx, request) -} - -func (s *service) DeleteFile(ctx context.Context, ip string, request *apiproto.DeleteFileRequest) (resp *apiproto.DeleteFileResponse, err error) { - cl, err := s.client.GetClient(ctx, ip) - if err != nil { - return - } - return cl.DeleteFile(ctx, request) + return cl.Unwatch(ctx, request) } diff --git a/util/cmd/debug/commands/clientcmds.go b/util/cmd/debug/commands/clientcmds.go index e1315a9a..d7bf156b 100644 --- a/util/cmd/debug/commands/clientcmds.go +++ b/util/cmd/debug/commands/clientcmds.go @@ -291,57 +291,59 @@ func (s *service) registerClientCommands() { } s.clientCommands = append(s.clientCommands, cmdAllSpaces) - cmdPutFile := &cobra.Command{ - Use: "put-file", - Short: "put new file by path", + cmdTreeWatch := &cobra.Command{ + Use: "tree-watch [document]", + Short: "start watching the tree (prints in logs the status on the client side)", + Args: cobra.RangeArgs(1, 1), Run: func(cmd *cobra.Command, args []string) { cli, _ := cmd.Flags().GetString("client") + space, _ := cmd.Flags().GetString("space") addr, ok := s.peers[cli] if !ok { fmt.Println("no such client") return } - path, _ := cmd.Flags().GetString("path") - spaceId, _ := cmd.Flags().GetString("spaceId") - resp, err := s.client.PutFile(context.Background(), addr, &clientproto.PutFileRequest{ - Path: path, - SpaceId: spaceId, - }) - if err != nil { - fmt.Println("error:", err) - return - } - fmt.Println("hash:", resp.Hash) - }, - } - cmdPutFile.Flags().String("path", "", "path to file") - cmdPutFile.Flags().String("spaceId", "", "spaceId") - s.clientCommands = append(s.clientCommands, cmdPutFile) - cmdGetFile := &cobra.Command{ - Use: "get-file", - Short: "get file by hash and save", + _, err := s.client.Watch(context.Background(), addr, &clientproto.WatchRequest{ + SpaceId: space, + TreeId: args[0], + }) + if err != nil { + fmt.Println("couldn't start watching tree", err) + return + } + fmt.Println(args[0]) + }, + } + cmdTreeWatch.Flags().String("space", "", "the space where something is happening :-)") + cmdTreeWatch.MarkFlagRequired("space") + s.clientCommands = append(s.clientCommands, cmdTreeWatch) + + cmdTreeUnwatch := &cobra.Command{ + Use: "tree-unwatch [document]", + Short: "stop watching the tree (prints in logs the status on the client side)", + Args: cobra.RangeArgs(1, 1), Run: func(cmd *cobra.Command, args []string) { cli, _ := cmd.Flags().GetString("client") + space, _ := cmd.Flags().GetString("space") addr, ok := s.peers[cli] if !ok { fmt.Println("no such client") return } - hash, _ := cmd.Flags().GetString("hash") - path, _ := cmd.Flags().GetString("path") - resp, err := s.client.GetFile(context.Background(), addr, &clientproto.GetFileRequest{ - Hash: hash, - Path: path, + + _, err := s.client.Unwatch(context.Background(), addr, &clientproto.UnwatchRequest{ + SpaceId: space, + TreeId: args[0], }) if err != nil { - fmt.Println("error:", err) + fmt.Println("couldn't stop watching tree", err) return } - fmt.Println("path:", resp.Path) + fmt.Println(args[0]) }, } - cmdGetFile.Flags().String("path", "", "path to file") - cmdGetFile.Flags().String("hash", "", "CID") - s.clientCommands = append(s.clientCommands, cmdGetFile) + cmdTreeUnwatch.Flags().String("space", "", "the space where something is happening :-)") + cmdTreeUnwatch.MarkFlagRequired("space") + s.clientCommands = append(s.clientCommands, cmdTreeUnwatch) }