Abstract drpc server and add client drpc api

This commit is contained in:
mcrakhman 2022-11-30 22:09:18 +01:00 committed by Mikhail Iudin
parent a1e4e4f714
commit d02169c585
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
8 changed files with 3780 additions and 86 deletions

View File

@ -15,6 +15,7 @@ export PATH=$(GOPATH)/bin:$(shell echo $$PATH)
proto:
$(MAKE) -C common proto
$(MAKE) -C consensus proto
$(MAKE) -C client proto
build:
$(MAKE) -C node build

6
client/Makefile Normal file
View File

@ -0,0 +1,6 @@
.PHONY: proto
export GOPRIVATE=github.com/anytypeio
proto:
@$(eval GOGO_START := GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1)
$(GOGO_START) protoc --gogofaster_out=:. --go-drpc_out=protolib=github.com/gogo/protobuf:. api/apiproto/protos/*.proto

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,393 @@
// Code generated by protoc-gen-go-drpc. DO NOT EDIT.
// protoc-gen-go-drpc version: v0.0.32
// source: api/apiproto/protos/api.proto
package apiproto
import (
bytes "bytes"
context "context"
errors "errors"
jsonpb "github.com/gogo/protobuf/jsonpb"
proto "github.com/gogo/protobuf/proto"
drpc "storj.io/drpc"
drpcerr "storj.io/drpc/drpcerr"
)
type drpcEncoding_File_api_apiproto_protos_api_proto struct{}
func (drpcEncoding_File_api_apiproto_protos_api_proto) Marshal(msg drpc.Message) ([]byte, error) {
return proto.Marshal(msg.(proto.Message))
}
func (drpcEncoding_File_api_apiproto_protos_api_proto) Unmarshal(buf []byte, msg drpc.Message) error {
return proto.Unmarshal(buf, msg.(proto.Message))
}
func (drpcEncoding_File_api_apiproto_protos_api_proto) JSONMarshal(msg drpc.Message) ([]byte, error) {
var buf bytes.Buffer
err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (drpcEncoding_File_api_apiproto_protos_api_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error {
return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message))
}
type DRPCClientApiClient interface {
DRPCConn() drpc.Conn
CreateSpace(ctx context.Context, in *CreateSpaceRequest) (*CreateSpaceResponse, error)
DeriveSpace(ctx context.Context, in *DeriveSpaceRequest) (*DeriveSpaceResponse, error)
CreateDocument(ctx context.Context, in *CreateDocumentRequest) (*CreateDocumentResponse, error)
DeleteDocument(ctx context.Context, in *DeleteDocumentRequest) (*DeleteDocumentResponse, error)
AddText(ctx context.Context, in *AddTextRequest) (*AddTextResponse, error)
DumpTree(ctx context.Context, in *DumpTreeRequest) (*DumpTreeResponse, error)
AllTrees(ctx context.Context, in *AllTreesRequest) (*AllTreesResponse, error)
AllSpaces(ctx context.Context, in *AllSpacesRequest) (*AllSpacesResponse, error)
}
type drpcClientApiClient struct {
cc drpc.Conn
}
func NewDRPCClientApiClient(cc drpc.Conn) DRPCClientApiClient {
return &drpcClientApiClient{cc}
}
func (c *drpcClientApiClient) DRPCConn() drpc.Conn { return c.cc }
func (c *drpcClientApiClient) CreateSpace(ctx context.Context, in *CreateSpaceRequest) (*CreateSpaceResponse, error) {
out := new(CreateSpaceResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/CreateSpace", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) DeriveSpace(ctx context.Context, in *DeriveSpaceRequest) (*DeriveSpaceResponse, error) {
out := new(DeriveSpaceResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/DeriveSpace", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) CreateDocument(ctx context.Context, in *CreateDocumentRequest) (*CreateDocumentResponse, error) {
out := new(CreateDocumentResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/CreateDocument", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) DeleteDocument(ctx context.Context, in *DeleteDocumentRequest) (*DeleteDocumentResponse, error) {
out := new(DeleteDocumentResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/DeleteDocument", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) AddText(ctx context.Context, in *AddTextRequest) (*AddTextResponse, error) {
out := new(AddTextResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/AddText", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) DumpTree(ctx context.Context, in *DumpTreeRequest) (*DumpTreeResponse, error) {
out := new(DumpTreeResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/DumpTree", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) AllTrees(ctx context.Context, in *AllTreesRequest) (*AllTreesResponse, error) {
out := new(AllTreesResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/AllTrees", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcClientApiClient) AllSpaces(ctx context.Context, in *AllSpacesRequest) (*AllSpacesResponse, error) {
out := new(AllSpacesResponse)
err := c.cc.Invoke(ctx, "/api.ClientApi/AllSpaces", drpcEncoding_File_api_apiproto_protos_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCClientApiServer interface {
CreateSpace(context.Context, *CreateSpaceRequest) (*CreateSpaceResponse, error)
DeriveSpace(context.Context, *DeriveSpaceRequest) (*DeriveSpaceResponse, error)
CreateDocument(context.Context, *CreateDocumentRequest) (*CreateDocumentResponse, error)
DeleteDocument(context.Context, *DeleteDocumentRequest) (*DeleteDocumentResponse, error)
AddText(context.Context, *AddTextRequest) (*AddTextResponse, error)
DumpTree(context.Context, *DumpTreeRequest) (*DumpTreeResponse, error)
AllTrees(context.Context, *AllTreesRequest) (*AllTreesResponse, error)
AllSpaces(context.Context, *AllSpacesRequest) (*AllSpacesResponse, error)
}
type DRPCClientApiUnimplementedServer struct{}
func (s *DRPCClientApiUnimplementedServer) CreateSpace(context.Context, *CreateSpaceRequest) (*CreateSpaceResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) DeriveSpace(context.Context, *DeriveSpaceRequest) (*DeriveSpaceResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) CreateDocument(context.Context, *CreateDocumentRequest) (*CreateDocumentResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) DeleteDocument(context.Context, *DeleteDocumentRequest) (*DeleteDocumentResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) AddText(context.Context, *AddTextRequest) (*AddTextResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) DumpTree(context.Context, *DumpTreeRequest) (*DumpTreeResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) AllTrees(context.Context, *AllTreesRequest) (*AllTreesResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCClientApiUnimplementedServer) AllSpaces(context.Context, *AllSpacesRequest) (*AllSpacesResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCClientApiDescription struct{}
func (DRPCClientApiDescription) NumMethods() int { return 8 }
func (DRPCClientApiDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
case 0:
return "/api.ClientApi/CreateSpace", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
CreateSpace(
ctx,
in1.(*CreateSpaceRequest),
)
}, DRPCClientApiServer.CreateSpace, true
case 1:
return "/api.ClientApi/DeriveSpace", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
DeriveSpace(
ctx,
in1.(*DeriveSpaceRequest),
)
}, DRPCClientApiServer.DeriveSpace, true
case 2:
return "/api.ClientApi/CreateDocument", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
CreateDocument(
ctx,
in1.(*CreateDocumentRequest),
)
}, DRPCClientApiServer.CreateDocument, true
case 3:
return "/api.ClientApi/DeleteDocument", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
DeleteDocument(
ctx,
in1.(*DeleteDocumentRequest),
)
}, DRPCClientApiServer.DeleteDocument, true
case 4:
return "/api.ClientApi/AddText", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
AddText(
ctx,
in1.(*AddTextRequest),
)
}, DRPCClientApiServer.AddText, true
case 5:
return "/api.ClientApi/DumpTree", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
DumpTree(
ctx,
in1.(*DumpTreeRequest),
)
}, DRPCClientApiServer.DumpTree, true
case 6:
return "/api.ClientApi/AllTrees", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
AllTrees(
ctx,
in1.(*AllTreesRequest),
)
}, DRPCClientApiServer.AllTrees, true
case 7:
return "/api.ClientApi/AllSpaces", drpcEncoding_File_api_apiproto_protos_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCClientApiServer).
AllSpaces(
ctx,
in1.(*AllSpacesRequest),
)
}, DRPCClientApiServer.AllSpaces, true
default:
return "", nil, nil, nil, false
}
}
func DRPCRegisterClientApi(mux drpc.Mux, impl DRPCClientApiServer) error {
return mux.Register(impl, DRPCClientApiDescription{})
}
type DRPCClientApi_CreateSpaceStream interface {
drpc.Stream
SendAndClose(*CreateSpaceResponse) error
}
type drpcClientApi_CreateSpaceStream struct {
drpc.Stream
}
func (x *drpcClientApi_CreateSpaceStream) SendAndClose(m *CreateSpaceResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_DeriveSpaceStream interface {
drpc.Stream
SendAndClose(*DeriveSpaceResponse) error
}
type drpcClientApi_DeriveSpaceStream struct {
drpc.Stream
}
func (x *drpcClientApi_DeriveSpaceStream) SendAndClose(m *DeriveSpaceResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_CreateDocumentStream interface {
drpc.Stream
SendAndClose(*CreateDocumentResponse) error
}
type drpcClientApi_CreateDocumentStream struct {
drpc.Stream
}
func (x *drpcClientApi_CreateDocumentStream) SendAndClose(m *CreateDocumentResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_DeleteDocumentStream interface {
drpc.Stream
SendAndClose(*DeleteDocumentResponse) error
}
type drpcClientApi_DeleteDocumentStream struct {
drpc.Stream
}
func (x *drpcClientApi_DeleteDocumentStream) SendAndClose(m *DeleteDocumentResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_AddTextStream interface {
drpc.Stream
SendAndClose(*AddTextResponse) error
}
type drpcClientApi_AddTextStream struct {
drpc.Stream
}
func (x *drpcClientApi_AddTextStream) SendAndClose(m *AddTextResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_DumpTreeStream interface {
drpc.Stream
SendAndClose(*DumpTreeResponse) error
}
type drpcClientApi_DumpTreeStream struct {
drpc.Stream
}
func (x *drpcClientApi_DumpTreeStream) SendAndClose(m *DumpTreeResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_AllTreesStream interface {
drpc.Stream
SendAndClose(*AllTreesResponse) error
}
type drpcClientApi_AllTreesStream struct {
drpc.Stream
}
func (x *drpcClientApi_AllTreesStream) SendAndClose(m *AllTreesResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCClientApi_AllSpacesStream interface {
drpc.Stream
SendAndClose(*AllSpacesResponse) error
}
type drpcClientApi_AllSpacesStream struct {
drpc.Stream
}
func (x *drpcClientApi_AllSpacesStream) SendAndClose(m *AllSpacesResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_api_apiproto_protos_api_proto{}); err != nil {
return err
}
return x.CloseSend()
}

View File

@ -0,0 +1,85 @@
syntax = "proto3";
package api;
option go_package = "api/apiproto";
service ClientApi {
rpc CreateSpace(CreateSpaceRequest) returns(CreateSpaceResponse);
rpc DeriveSpace(DeriveSpaceRequest) returns(DeriveSpaceResponse);
rpc CreateDocument(CreateDocumentRequest) returns(CreateDocumentResponse);
rpc DeleteDocument(DeleteDocumentRequest) returns(DeleteDocumentResponse);
rpc AddText(AddTextRequest) returns(AddTextResponse);
rpc DumpTree(DumpTreeRequest) returns(DumpTreeResponse);
rpc AllTrees(AllTreesRequest) returns(AllTreesResponse);
rpc AllSpaces(AllSpacesRequest) returns(AllSpacesResponse);
}
message CreateSpaceRequest {
}
message CreateSpaceResponse {
string id = 1;
}
message DeriveSpaceRequest {
}
message DeriveSpaceResponse {
string id = 1;
}
message CreateDocumentRequest {
string spaceId = 1;
}
message CreateDocumentResponse {
string id = 1;
}
message DeleteDocumentRequest {
string spaceId = 1;
string documentId = 2;
}
message DeleteDocumentResponse {
}
message AddTextRequest {
string spaceId = 1;
string documentId = 2;
string text = 3;
}
message AddTextResponse {
string documentId = 1;
string headId = 2;
}
message DumpTreeRequest {
string spaceId = 1;
string documentId = 2;
}
message DumpTreeResponse {
string dump = 1;
}
message AllTreesRequest {
string spaceId = 1;
}
message Tree {
string id = 1;
repeated string heads = 2;
}
message AllTreesResponse {
repeated Tree trees = 1;
}
message AllSpacesRequest {
}
message AllSpacesResponse {
repeated string spaceIds = 1;
}

View File

@ -3,7 +3,7 @@ export GOPRIVATE=github.com/anytypeio
proto:
@echo 'Generating protobuf packages (Go)...'
# Uncomment if needed
@$(eval GOGO_START := GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1)
@$(eval P_ACL_RECORDS_PATH_PB := pkg/acl/aclrecordproto)
@$(eval P_TREE_CHANGES_PATH_PB := pkg/acl/treechangeproto)

View File

@ -0,0 +1,105 @@
package server
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/zeebo/errs"
"go.uber.org/zap"
"io"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"time"
)
type BaseDrpcServer struct {
drpcServer *drpcserver.Server
transport secure.Service
listeners []secure.ContextListener
cancel func()
*drpcmux.Mux
}
type DRPCHandlerWrapper func(handler drpc.Handler) drpc.Handler
func NewBaseDrpcServer() *BaseDrpcServer {
return &BaseDrpcServer{Mux: drpcmux.New()}
}
func (s *BaseDrpcServer) Init(transport secure.Service) {
s.transport = transport
}
func (s *BaseDrpcServer) Run(ctx context.Context, listenAddrs []string, wrapper DRPCHandlerWrapper) (err error) {
s.drpcServer = drpcserver.New(wrapper(s.Mux))
ctx, s.cancel = context.WithCancel(ctx)
for _, addr := range listenAddrs {
tcpList, err := net.Listen("tcp", addr)
if err != nil {
return err
}
tlsList := s.transport.TLSListener(tcpList)
go s.serve(ctx, tlsList)
}
return
}
func (s *BaseDrpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
l.Debug("drpc listener stopped")
}()
for {
select {
case <-ctx.Done():
return
default:
}
ctx, conn, err := lis.Accept(ctx)
if err != nil {
if isTemporary(err) {
l.Debug("listener temporary accept error", zap.Error(err))
t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
case <-ctx.Done():
return
}
continue
}
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}
l.Error("listener accept error", zap.Error(err))
return
}
go s.serveConn(ctx, conn)
}
}
func (s *BaseDrpcServer) serveConn(ctx context.Context, conn net.Conn) {
l := log.With(zap.String("remoteAddr", conn.RemoteAddr().String())).With(zap.String("localAddr", conn.LocalAddr().String()))
l.Debug("connection opened")
if err := s.drpcServer.ServeOne(ctx, conn); err != nil {
if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) {
l.Debug("connection closed")
} else {
l.Warn("serve connection error", zap.Error(err))
}
}
}
func (s *BaseDrpcServer) Close(ctx context.Context) (err error) {
if s.cancel != nil {
s.cancel()
}
for _, l := range s.listeners {
if e := l.Close(); e != nil {
log.Warn("close listener error", zap.Error(e))
}
}
return
}

View File

@ -8,14 +8,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/metric"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/prometheus/client_golang/prometheus"
"github.com/zeebo/errs"
"go.uber.org/zap"
"io"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"time"
)
const CName = "common.net.drpcserver"
@ -23,7 +16,7 @@ const CName = "common.net.drpcserver"
var log = logger.NewNamed(CName)
func New() DRPCServer {
return &drpcServer{Mux: drpcmux.New()}
return &drpcServer{BaseDrpcServer: NewBaseDrpcServer()}
}
type DRPCServer interface {
@ -36,19 +29,15 @@ type configGetter interface {
}
type drpcServer struct {
config config.GrpcServer
drpcServer *drpcserver.Server
transport secure.Service
listeners []secure.ContextListener
metric metric.Metric
cancel func()
*drpcmux.Mux
config config.GrpcServer
metric metric.Metric
*BaseDrpcServer
}
func (s *drpcServer) Init(a *app.App) (err error) {
s.config = a.MustComponent(config.CName).(configGetter).GetGRPCServer()
s.transport = a.MustComponent(secure.CName).(secure.Service)
s.metric = a.MustComponent(metric.CName).(metric.Metric)
s.BaseDrpcServer.Init(a.MustComponent(secure.CName).(secure.Service))
return nil
}
@ -68,80 +57,17 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
0.99: 0.0001,
},
}, []string{"rpc"})
s.drpcServer = drpcserver.New(&metric.PrometheusDRPC{
Handler: s.Mux,
SummaryVec: histVec,
})
if err = s.metric.Registry().Register(histVec); err != nil {
return
}
ctx, s.cancel = context.WithCancel(ctx)
for _, addr := range s.config.ListenAddrs {
tcpList, err := net.Listen("tcp", addr)
if err != nil {
return err
return s.BaseDrpcServer.Run(ctx, s.config.ListenAddrs, func(handler drpc.Handler) drpc.Handler {
return &metric.PrometheusDRPC{
Handler: handler,
SummaryVec: histVec,
}
tlsList := s.transport.TLSListener(tcpList)
go s.serve(ctx, tlsList)
}
return
}
func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
l.Debug("drpc listener stopped")
}()
for {
select {
case <-ctx.Done():
return
default:
}
ctx, conn, err := lis.Accept(ctx)
if err != nil {
if isTemporary(err) {
l.Debug("listener temporary accept error", zap.Error(err))
t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
case <-ctx.Done():
return
}
continue
}
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}
l.Error("listener accept error", zap.Error(err))
return
}
go s.serveConn(ctx, conn)
}
}
func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) {
l := log.With(zap.String("remoteAddr", conn.RemoteAddr().String())).With(zap.String("localAddr", conn.LocalAddr().String()))
l.Debug("connection opened")
if err := s.drpcServer.ServeOne(ctx, conn); err != nil {
if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) {
l.Debug("connection closed")
} else {
l.Warn("serve connection error", zap.Error(err))
}
}
})
}
func (s *drpcServer) Close(ctx context.Context) (err error) {
if s.cancel != nil {
s.cancel()
}
for _, l := range s.listeners {
if e := l.Close(); e != nil {
log.Warn("close listener error", zap.Error(e))
}
}
return
return s.BaseDrpcServer.Close(ctx)
}