Add message service to handle streams

This commit is contained in:
mcrakhman 2022-07-19 00:47:22 +02:00
parent 5f2144db0f
commit 269f907d1d
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 107 additions and 19 deletions

1
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.1.0
github.com/stretchr/testify v1.7.0
github.com/cheggaaa/mb v1.0.3
go.uber.org/zap v1.21.0
gopkg.in/yaml.v3 v3.0.1
storj.io/drpc v0.0.32

View File

@ -5,8 +5,9 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"io"
@ -14,6 +15,7 @@ import (
"storj.io/drpc"
"storj.io/drpc/drpcserver"
"strings"
"sync"
"time"
)
@ -27,6 +29,9 @@ func New() DRPCServer {
type DRPCServer interface {
app.ComponentRunnable
SendMessage(peerId string, msg *syncpb.SyncContent)
BroadcastMessage(msg *syncpb.SyncContent)
}
type drpcServer struct {
@ -34,12 +39,15 @@ type drpcServer struct {
drpcServer *drpcserver.Server
transport transport.Service
listeners []transport.ContextListener
messageService message.Service
cancel func()
}
func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) {
s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer
s.transport = a.MustComponent(transport.CName).(transport.Service)
s.messageService = a.MustComponent(message.CName).(message.Service)
return nil
}
@ -114,23 +122,22 @@ func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) {
if err != nil {
return
}
l := log.With(zap.String("peer", sc.RemotePeer().String()))
peerId := sc.RemotePeer().String()
l := log.With(zap.String("peer", peerId))
l.Info("stream opened")
defer func() {
l.Info("stream closed", zap.Error(err))
}()
for {
msg := &syncproto.SyncMessage{}
if err = stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
}
//log.Debug("receive msg", zap.Int("seq", int(msg.Seq)))
if err = stream.MsgSend(msg, enc{}); err != nil {
return
}
}
ch := s.messageService.RegisterMessageSender(peerId)
defer s.messageService.UnregisterMessageSender(peerId)
wg := &sync.WaitGroup{}
wg.Add(2)
go s.sendMessages(stream, wg, ch)
go s.receiveMessages(stream, wg, peerId)
wg.Wait()
return nil
}
@ -146,6 +153,33 @@ func (s *drpcServer) Close(ctx context.Context) (err error) {
return
}
func (s *drpcServer) sendMessages(stream drpc.Stream, wg *sync.WaitGroup, ch chan *syncpb.SyncContent) {
defer wg.Done()
for {
select {
case msg := <-ch:
if err := stream.MsgSend(msg, enc{}); err != nil {
return
}
case <-stream.Context().Done():
return
}
}
}
func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, peerId string) {
defer wg.Done()
for {
msg := &syncpb.SyncContent{}
if err := stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
}
s.messageService.HandleMessage(peerId, msg)
}
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {

View File

@ -0,0 +1,53 @@
package message
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
)
const CName = "Service"
type service struct {
}
func NewMessageService() app.Component {
return &service{}
}
type Service interface {
RegisterMessageSender(peerId string) chan *syncpb.SyncContent
UnregisterMessageSender(peerId string)
HandleMessage(peerId string, msg *syncpb.SyncContent)
}
func (c *service) Init(ctx context.Context, a *app.App) (err error) {
return nil
}
func (c *service) Name() (name string) {
return CName
}
func (c *service) Run(ctx context.Context) (err error) {
return nil
}
func (c *service) Close(ctx context.Context) (err error) {
return nil
}
func (c *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent {
//TODO implement me
panic("implement me")
}
func (c *service) UnregisterMessageSender(peerId string) chan *syncpb.SyncContent {
//TODO implement me
panic("implement me")
}
func (c *service) HandleMessage(peerId string, msg *syncpb.SyncContent) {
//TODO implement me
panic("implement me")
}