From 269f907d1df79ef04b6ff024a6650b090257348a Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 19 Jul 2022 00:47:22 +0200 Subject: [PATCH] Add message service to handle streams --- go.mod | 1 + service/sync/drpcserver/drpcserver.go | 72 ++++++++++++++++++++------- service/sync/message/service.go | 53 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 19 deletions(-) create mode 100644 service/sync/message/service.go diff --git a/go.mod b/go.mod index 91d42059..715f5f35 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.1.0 github.com/stretchr/testify v1.7.0 + github.com/cheggaaa/mb v1.0.3 go.uber.org/zap v1.21.0 gopkg.in/yaml.v3 v3.0.1 storj.io/drpc v0.0.32 diff --git a/service/sync/drpcserver/drpcserver.go b/service/sync/drpcserver/drpcserver.go index e3fe7bee..c341e326 100644 --- a/service/sync/drpcserver/drpcserver.go +++ b/service/sync/drpcserver/drpcserver.go @@ -5,8 +5,9 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/gogo/protobuf/proto" "go.uber.org/zap" "io" @@ -14,6 +15,7 @@ import ( "storj.io/drpc" "storj.io/drpc/drpcserver" "strings" + "sync" "time" ) @@ -27,19 +29,25 @@ func New() DRPCServer { type DRPCServer interface { app.ComponentRunnable + + SendMessage(peerId string, msg *syncpb.SyncContent) + BroadcastMessage(msg *syncpb.SyncContent) } type drpcServer struct { - config config.GrpcServer - drpcServer *drpcserver.Server - transport transport.Service - listeners []transport.ContextListener - cancel func() + config config.GrpcServer + drpcServer *drpcserver.Server + transport transport.Service + listeners []transport.ContextListener + messageService message.Service + + cancel func() } func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) { s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer s.transport = a.MustComponent(transport.CName).(transport.Service) + s.messageService = a.MustComponent(message.CName).(message.Service) return nil } @@ -114,23 +122,22 @@ func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) { if err != nil { return } - l := log.With(zap.String("peer", sc.RemotePeer().String())) + peerId := sc.RemotePeer().String() + l := log.With(zap.String("peer", peerId)) l.Info("stream opened") defer func() { l.Info("stream closed", zap.Error(err)) }() - for { - msg := &syncproto.SyncMessage{} - if err = stream.MsgRecv(msg, enc{}); err != nil { - if err == io.EOF { - return - } - } - //log.Debug("receive msg", zap.Int("seq", int(msg.Seq))) - if err = stream.MsgSend(msg, enc{}); err != nil { - return - } - } + + ch := s.messageService.RegisterMessageSender(peerId) + defer s.messageService.UnregisterMessageSender(peerId) + + wg := &sync.WaitGroup{} + wg.Add(2) + go s.sendMessages(stream, wg, ch) + go s.receiveMessages(stream, wg, peerId) + wg.Wait() + return nil } @@ -146,6 +153,33 @@ func (s *drpcServer) Close(ctx context.Context) (err error) { return } +func (s *drpcServer) sendMessages(stream drpc.Stream, wg *sync.WaitGroup, ch chan *syncpb.SyncContent) { + defer wg.Done() + for { + select { + case msg := <-ch: + if err := stream.MsgSend(msg, enc{}); err != nil { + return + } + case <-stream.Context().Done(): + return + } + } +} + +func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, peerId string) { + defer wg.Done() + for { + msg := &syncpb.SyncContent{} + if err := stream.MsgRecv(msg, enc{}); err != nil { + if err == io.EOF { + return + } + } + s.messageService.HandleMessage(peerId, msg) + } +} + type enc struct{} func (e enc) Marshal(msg drpc.Message) ([]byte, error) { diff --git a/service/sync/message/service.go b/service/sync/message/service.go new file mode 100644 index 00000000..5ebc706d --- /dev/null +++ b/service/sync/message/service.go @@ -0,0 +1,53 @@ +package message + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" +) + +const CName = "Service" + +type service struct { +} + +func NewMessageService() app.Component { + return &service{} +} + +type Service interface { + RegisterMessageSender(peerId string) chan *syncpb.SyncContent + UnregisterMessageSender(peerId string) + HandleMessage(peerId string, msg *syncpb.SyncContent) +} + +func (c *service) Init(ctx context.Context, a *app.App) (err error) { + return nil +} + +func (c *service) Name() (name string) { + return CName +} + +func (c *service) Run(ctx context.Context) (err error) { + return nil +} + +func (c *service) Close(ctx context.Context) (err error) { + return nil +} + +func (c *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent { + //TODO implement me + panic("implement me") +} + +func (c *service) UnregisterMessageSender(peerId string) chan *syncpb.SyncContent { + //TODO implement me + panic("implement me") +} + +func (c *service) HandleMessage(peerId string, msg *syncpb.SyncContent) { + //TODO implement me + panic("implement me") +}