diff --git a/client/api/service.go b/client/api/service.go index 587c756d..aed12e29 100644 --- a/client/api/service.go +++ b/client/api/service.go @@ -56,7 +56,8 @@ func (s *service) Name() (name string) { func (s *service) Run(ctx context.Context) (err error) { err = s.BaseDrpcServer.Run( ctx, - s.cfg.APIServer.ListenAddrs, func(handler drpc.Handler) drpc.Handler { + s.cfg.APIServer.ListenAddrs, + func(handler drpc.Handler) drpc.Handler { return handler }, s.transport.BasicListener) diff --git a/common/net/rpc/server/drpcserver.go b/common/net/rpc/server/drpcserver.go index 5d894607..73559cfe 100644 --- a/common/net/rpc/server/drpcserver.go +++ b/common/net/rpc/server/drpcserver.go @@ -61,7 +61,8 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { if err = s.metric.Registry().Register(histVec); err != nil { return } - return s.BaseDrpcServer.Run(ctx, + return s.BaseDrpcServer.Run( + ctx, s.config.ListenAddrs, func(handler drpc.Handler) drpc.Handler { return &metric.PrometheusDRPC{ diff --git a/util/cmd/debug/api/service.go b/util/cmd/debug/api/service.go new file mode 100644 index 00000000..4c55aa83 --- /dev/null +++ b/util/cmd/debug/api/service.go @@ -0,0 +1,43 @@ +package api + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/client/api/apiproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/client" +) + +const CName = "debug.api" + +var log = logger.NewNamed(CName) + +type Service interface { + CreateSpace(ctx context.Context, ip string, request *apiproto.CreateSpaceRequest) (resp *apiproto.CreateSpaceResponse, err error) + app.Component +} + +type service struct { + client client.Service +} + +func New() Service { + return &service{} +} + +func (s *service) Init(a *app.App) (err error) { + s.client = a.MustComponent(client.CName).(client.Service) + return nil +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) CreateSpace(ctx context.Context, ip string, request *apiproto.CreateSpaceRequest) (resp *apiproto.CreateSpaceResponse, err error) { + cl, err := s.client.Get(ctx, ip) + if err != nil { + return + } + return cl.CreateSpace(ctx, request) +} diff --git a/util/cmd/debug/client/service.go b/util/cmd/debug/client/service.go new file mode 100644 index 00000000..2c2993ec --- /dev/null +++ b/util/cmd/debug/client/service.go @@ -0,0 +1,73 @@ +package client + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/client/api/apiproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" + "net" + "storj.io/drpc/drpcconn" + "time" +) + +type Service interface { + Get(ctx context.Context, ip string) (apiproto.DRPCClientApiClient, error) + app.ComponentRunnable +} + +const CName = "debug.client" + +var log = logger.NewNamed(CName) + +type service struct { + cache ocache.OCache +} + +func New() Service { + return &service{} +} + +func (s *service) Init(a *app.App) (err error) { + s.cache = ocache.New( + func(ctx context.Context, ip string) (value ocache.Object, err error) { + conn, err := net.Dial("tcp", ip) + if err != nil { + return + } + value = drpcconn.New(conn) + return + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Minute*5), + ) + return nil +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) Run(ctx context.Context) (err error) { + return nil +} + +func (s *service) Get(ctx context.Context, ip string) (apiproto.DRPCClientApiClient, error) { + v, err := s.cache.Get(ctx, ip) + if err != nil { + return nil, err + } + conn := v.(*drpcconn.Conn) + select { + case <-conn.Closed(): + default: + return apiproto.NewDRPCClientApiClient(conn), nil + } + s.cache.Remove(ip) + return s.Get(ctx, ip) +} + +func (s *service) Close(ctx context.Context) (err error) { + return s.cache.Close() +} diff --git a/util/cmd/debug/debug.go b/util/cmd/debug/debug.go new file mode 100644 index 00000000..1910ca79 --- /dev/null +++ b/util/cmd/debug/debug.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "flag" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/client" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/stdin" + "go.uber.org/zap" + "net/http" + "os" + "os/signal" + "syscall" + "time" +) + +var log = logger.NewNamed("main") + +var ( + // we can't use "v" here because of glog init (through badger) setting flag.Bool with "v" + flagVersion = flag.Bool("ver", false, "show version and exit") + flagHelp = flag.Bool("h", false, "show help and exit") +) + +func main() { + flag.Parse() + + if *flagVersion { + fmt.Println(app.VersionDescription()) + return + } + if *flagHelp { + flag.PrintDefaults() + return + } + + if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" { + go func() { + http.ListenAndServe(debug, nil) + }() + } + + // create app + ctx := context.Background() + a := new(app.App) + Bootstrap(a) + // start app + if err := a.Start(ctx); err != nil { + log.Fatal("can't start app", zap.Error(err)) + } + log.Info("app started", zap.String("version", a.Version())) + + // wait exit signal + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) + sig := <-exit + log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig))) + + // close app + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + if err := a.Close(ctx); err != nil { + log.Fatal("close error", zap.Error(err)) + } else { + log.Info("goodbye!") + } + time.Sleep(time.Second / 3) +} + +func Bootstrap(a *app.App) { + a.Register(client.New()). + Register(api.New()). + Register(stdin.New()) +} diff --git a/util/cmd/debug/stdin/service.go b/util/cmd/debug/stdin/service.go new file mode 100644 index 00000000..a7c3e653 --- /dev/null +++ b/util/cmd/debug/stdin/service.go @@ -0,0 +1,69 @@ +package stdin + +import ( + "bufio" + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/client/api/apiproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cmd/debug/api" + "os" +) + +const CName = "debug.stdin" + +var log = logger.NewNamed(CName).Sugar() + +type Service interface { + app.ComponentRunnable +} + +type service struct { + api api.Service +} + +func New() Service { + return &service{} +} + +func (s *service) Init(a *app.App) (err error) { + s.api = a.MustComponent(api.CName).(api.Service) + return +} + +func (s *service) Name() (name string) { + return CName +} + +func (s *service) Run(ctx context.Context) (err error) { + go s.readStdin() + return nil +} + +func (s *service) Close(ctx context.Context) (err error) { + return nil +} + +func (s *service) readStdin() { + // create new reader from stdin + reader := bufio.NewReader(os.Stdin) + // start infinite loop to continuously listen to input + for { + // read by one line (enter pressed) + str, err := reader.ReadString('\n') + // check for errors + if err != nil { + // close channel just to inform others + log.Errorf("Error in read string: %s", err) + return + } + log.Debug(str) + + res, err := s.api.CreateSpace(context.Background(), "127.0.0.1:8090", &apiproto.CreateSpaceRequest{}) + if err != nil { + log.Errorf("Error in performing request: %s", err) + return + } + log.Debug(res) + } +}