WIP stream logic

This commit is contained in:
mcrakhman 2022-07-19 01:43:21 +02:00
parent 269f907d1d
commit 16b3787258
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 165 additions and 68 deletions

View File

@ -3,12 +3,29 @@ package message
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/cheggaaa/mb"
"go.uber.org/zap"
"sync"
)
const CName = "Service"
var log = logger.NewNamed("messageservice")
const CName = "MessageService"
type service struct {
receiveBatcher *mb.MB
sendBatcher *mb.MB
senderChannels map[string]chan *syncpb.SyncContent
requestHandler requesthandler.RequestHandler
sync.RWMutex
}
type message struct {
peerId string
content *syncpb.SyncContent
}
func NewMessageService() app.Component {
@ -19,35 +36,106 @@ type Service interface {
RegisterMessageSender(peerId string) chan *syncpb.SyncContent
UnregisterMessageSender(peerId string)
HandleMessage(peerId string, msg *syncpb.SyncContent)
SendMessage(peerId string, msg *syncpb.SyncContent)
}
func (c *service) Init(ctx context.Context, a *app.App) (err error) {
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
s.receiveBatcher = mb.New(0)
s.sendBatcher = mb.New(0)
s.senderChannels = make(map[string]chan *syncpb.SyncContent)
s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
return nil
}
func (c *service) Name() (name string) {
func (s *service) Name() (name string) {
return CName
}
func (c *service) Run(ctx context.Context) (err error) {
func (s *service) Run(ctx context.Context) (err error) {
go s.runSender(ctx)
go s.runReceiver(ctx)
return nil
}
func (c *service) Close(ctx context.Context) (err error) {
func (s *service) Close(ctx context.Context) (err error) {
return nil
}
func (c *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent {
//TODO implement me
panic("implement me")
func (s *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent {
s.Lock()
defer s.Unlock()
if ch, exists := s.senderChannels[peerId]; !exists {
return ch
}
ch := make(chan *syncpb.SyncContent)
s.senderChannels[peerId] = ch
return ch
}
func (c *service) UnregisterMessageSender(peerId string) chan *syncpb.SyncContent {
//TODO implement me
panic("implement me")
func (s *service) UnregisterMessageSender(peerId string) {
s.Lock()
defer s.Unlock()
if _, exists := s.senderChannels[peerId]; !exists {
return
}
close(s.senderChannels[peerId])
delete(s.senderChannels, peerId)
}
func (c *service) HandleMessage(peerId string, msg *syncpb.SyncContent) {
//TODO implement me
panic("implement me")
func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) {
_ = s.receiveBatcher.Add(&message{
peerId: peerId,
content: msg,
})
}
func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) {
_ = s.sendBatcher.Add(&message{
peerId: peerId,
content: msg,
})
}
func (s *service) runReceiver(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
break
}
msgs := s.receiveBatcher.WaitMinMax(1, 100)
// TODO: this is bad to serve everything on a new goroutine, but very easy for prototyping :-)
for _, msg := range msgs {
typedMsg := msg.(*message)
go func(typedMsg *message) {
err := s.requestHandler.HandleFullSyncContent(ctx, typedMsg.peerId, typedMsg.content)
if err != nil {
log.Error("failed to handle content", zap.Error(err))
}
}(typedMsg)
}
}
}
func (s *service) runSender(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
break
}
msgs := s.sendBatcher.WaitMinMax(1, 100)
s.RLock()
for _, msg := range msgs {
typedMsg := msg.(*message)
ch, exists := s.senderChannels[typedMsg.peerId]
if !exists {
continue
}
ch <- typedMsg.content
}
s.RUnlock()
}
}

View File

@ -25,9 +25,7 @@ func NewRequestHandler() app.Component {
}
type RequestHandler interface {
HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error)
HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error)
HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error)
HandleFullSyncContent(ctx context.Context, senderId string, request *syncpb.SyncContent) (err error)
}
const CName = "SyncRequestHandler"
@ -51,6 +49,19 @@ func (r *requestHandler) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId string, content *syncpb.SyncContent) error {
msg := content.GetMessage()
switch {
case msg.GetFullSyncRequest() != nil:
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest())
case msg.GetFullSyncResponse() != nil:
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse())
case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate())
}
return nil
}
func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) {
var (
fullRequest *syncpb.SyncFullRequest

View File

@ -7,7 +7,7 @@ import "pkg/acl/treestorage/treepb/protos/tree.proto";
message Sync {
message Content {
repeated ContentValue messages = 1;
ContentValue message = 1;
}
message ContentValue {

View File

@ -61,7 +61,7 @@ func (m *Sync) XXX_DiscardUnknown() {
var xxx_messageInfo_Sync proto.InternalMessageInfo
type SyncContent struct {
Messages []*SyncContentValue `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
Message *SyncContentValue `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (m *SyncContent) Reset() { *m = SyncContent{} }
@ -97,9 +97,9 @@ func (m *SyncContent) XXX_DiscardUnknown() {
var xxx_messageInfo_SyncContent proto.InternalMessageInfo
func (m *SyncContent) GetMessages() []*SyncContentValue {
func (m *SyncContent) GetMessage() *SyncContentValue {
if m != nil {
return m.Messages
return m.Message
}
return nil
}
@ -482,34 +482,34 @@ func init() {
}
var fileDescriptor_5f66cdd599c6466f = []byte{
// 426 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x53, 0xbd, 0x8e, 0xd3, 0x40,
0x10, 0xf6, 0xe6, 0x1c, 0xfb, 0x6e, 0xee, 0x04, 0xa7, 0x15, 0x3f, 0x2b, 0x17, 0x56, 0x74, 0x12,
0x52, 0x0a, 0xe4, 0x9c, 0x2e, 0x05, 0x1d, 0x45, 0x22, 0x45, 0xa6, 0x43, 0xcb, 0x4f, 0x41, 0xb7,
0x71, 0x06, 0x1b, 0x61, 0x6c, 0xe3, 0x5d, 0x07, 0xf1, 0x16, 0xbc, 0x0d, 0x88, 0x27, 0xa0, 0x4c,
0x49, 0x83, 0x04, 0xc9, 0x63, 0xd0, 0xa0, 0x5d, 0xdb, 0xb1, 0xb9, 0x3c, 0x41, 0x8a, 0x44, 0x33,
0xf3, 0x7d, 0xdf, 0xf8, 0x9b, 0x19, 0x2d, 0x3c, 0x92, 0x58, 0xae, 0xdf, 0x45, 0x38, 0x91, 0x9f,
0xb3, 0xc8, 0xfc, 0x15, 0xcb, 0x49, 0x51, 0xe6, 0x2a, 0x97, 0x26, 0x0b, 0x4c, 0x4c, 0x6d, 0x1d,
0x7b, 0xd7, 0xc5, 0xfb, 0x78, 0x22, 0xa2, 0x54, 0xff, 0xa2, 0x44, 0x64, 0x31, 0x4a, 0x1d, 0x76,
0x8a, 0xae, 0x5e, 0xeb, 0xbc, 0xc7, 0xad, 0x42, 0x95, 0x88, 0x52, 0xe5, 0xa5, 0x88, 0xd1, 0xc4,
0x9d, 0x46, 0x67, 0x35, 0xfb, 0xea, 0x8f, 0x03, 0xf6, 0x0b, 0xfd, 0xa1, 0xa7, 0xe0, 0xce, 0xf3,
0x4c, 0x61, 0xa6, 0xe8, 0x14, 0x4e, 0x3f, 0xa0, 0x94, 0x22, 0x46, 0xc9, 0xc8, 0xe8, 0x64, 0x7c,
0x7e, 0xf3, 0x30, 0x30, 0xc6, 0x34, 0x31, 0x68, 0x58, 0xaf, 0x45, 0x5a, 0x21, 0xdf, 0x13, 0xbd,
0x5f, 0x04, 0x2e, 0xfa, 0x10, 0x7d, 0x02, 0x90, 0xa0, 0x58, 0xbd, 0x2a, 0x56, 0x42, 0x21, 0x23,
0x23, 0x32, 0x3e, 0xbf, 0xb9, 0xdf, 0xeb, 0x13, 0xee, 0xc1, 0xd0, 0xe2, 0x3d, 0x2a, 0x9d, 0xc3,
0xdd, 0xb7, 0x55, 0x9a, 0x6a, 0x12, 0xc7, 0x8f, 0x15, 0x4a, 0xc5, 0x06, 0x46, 0xdd, 0x77, 0xb1,
0xa8, 0xd2, 0x34, 0x68, 0xe0, 0xd0, 0xe2, 0xb7, 0x15, 0x74, 0x01, 0x97, 0x5d, 0x49, 0x16, 0x79,
0x26, 0x91, 0x9d, 0x98, 0x2e, 0xec, 0xb0, 0x4b, 0x8d, 0x87, 0x16, 0x3f, 0xd0, 0xcc, 0x5c, 0x18,
0xae, 0xf5, 0x38, 0xde, 0x77, 0x02, 0xd0, 0x59, 0xa6, 0xf7, 0x60, 0xa8, 0x2d, 0xd7, 0x0b, 0x3a,
0xe3, 0x75, 0x42, 0xc7, 0xe0, 0x36, 0xc7, 0x60, 0x03, 0xb3, 0xb8, 0x3b, 0x81, 0x88, 0xd2, 0x80,
0x8b, 0x4f, 0x73, 0x53, 0xe6, 0x2d, 0x4c, 0x1f, 0x80, 0xa3, 0xaf, 0xf0, 0x6c, 0x65, 0x5c, 0x9d,
0xf1, 0x26, 0xa3, 0x57, 0x70, 0x21, 0x33, 0x51, 0xc8, 0x24, 0x57, 0xcf, 0x85, 0x4a, 0x98, 0x6d,
0xda, 0xff, 0x57, 0xa3, 0xd7, 0x00, 0x9a, 0xad, 0xdd, 0x60, 0xc9, 0x86, 0x66, 0xaa, 0xcb, 0xc0,
0x1c, 0xf5, 0xe5, 0xbe, 0xce, 0x7b, 0x1c, 0xef, 0xef, 0x00, 0x6c, 0x3d, 0xab, 0xf7, 0x95, 0x80,
0xdb, 0xae, 0xe8, 0xb8, 0x46, 0xf8, 0x46, 0xe0, 0xb4, 0xbd, 0xca, 0x71, 0x59, 0x9f, 0x8d, 0x7e,
0x6c, 0x7d, 0xb2, 0xd9, 0xfa, 0xe4, 0xf7, 0xd6, 0x27, 0x5f, 0x76, 0xbe, 0xb5, 0xd9, 0xf9, 0xd6,
0xcf, 0x9d, 0x6f, 0xbd, 0x71, 0xea, 0xd7, 0xbf, 0x74, 0xcc, 0x63, 0x9c, 0xfe, 0x0b, 0x00, 0x00,
0xff, 0xff, 0xbf, 0x4d, 0x45, 0xd4, 0x1b, 0x04, 0x00, 0x00,
// 421 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x53, 0xcd, 0xce, 0x93, 0x40,
0x14, 0x65, 0x5a, 0x0a, 0x7e, 0xf7, 0x6b, 0xb4, 0x99, 0xf8, 0x33, 0x61, 0x41, 0x48, 0x13, 0x13,
0x16, 0x86, 0xd6, 0xba, 0x70, 0xe3, 0xaa, 0x4d, 0x1a, 0xdc, 0x99, 0xf1, 0x67, 0xe1, 0x6e, 0x4a,
0x47, 0x30, 0x22, 0x20, 0x33, 0xd4, 0xf8, 0x16, 0xbe, 0x8d, 0xc6, 0x27, 0x70, 0xd9, 0xa5, 0x1b,
0xa3, 0x69, 0x1f, 0xc3, 0x8d, 0x99, 0x01, 0x0a, 0xda, 0x27, 0xe8, 0x02, 0x72, 0x7f, 0xce, 0x39,
0x9c, 0xcb, 0xcd, 0x85, 0xfb, 0x82, 0x97, 0xbb, 0xb7, 0x11, 0x9f, 0x89, 0x4f, 0x59, 0xa4, 0x5f,
0xc5, 0x66, 0x56, 0x94, 0xb9, 0xcc, 0x85, 0xce, 0x02, 0x1d, 0x63, 0x53, 0xc5, 0xce, 0xbc, 0x78,
0x17, 0xcf, 0x58, 0x94, 0xaa, 0x27, 0x4a, 0x58, 0x16, 0x73, 0xa1, 0xc2, 0x8e, 0xd1, 0xd5, 0x6b,
0x9e, 0xf3, 0xa0, 0x65, 0xc8, 0x92, 0x73, 0x21, 0xf3, 0x92, 0xc5, 0x5c, 0xc7, 0x1d, 0x47, 0x65,
0x35, 0x7a, 0xfa, 0xcb, 0x02, 0xf3, 0xb9, 0xfa, 0xd0, 0x13, 0xb0, 0x57, 0x79, 0x26, 0x79, 0x26,
0xf1, 0x43, 0xb0, 0xdf, 0x73, 0x21, 0x58, 0xcc, 0x09, 0xf2, 0x90, 0x7f, 0xbd, 0xb8, 0x17, 0x68,
0x5f, 0x0a, 0x17, 0x34, 0xa0, 0x57, 0x2c, 0xad, 0x38, 0x6d, 0x71, 0xce, 0x4f, 0x04, 0xe3, 0x7e,
0x07, 0x3f, 0x06, 0x48, 0x38, 0xdb, 0xbe, 0x2c, 0xb6, 0x4c, 0xb6, 0x32, 0x77, 0x7a, 0x32, 0xe1,
0xa9, 0x19, 0x1a, 0xb4, 0x07, 0xc5, 0x2b, 0xb8, 0xf5, 0xa6, 0x4a, 0x53, 0x05, 0xa2, 0xfc, 0x43,
0xc5, 0x85, 0x24, 0x83, 0x33, 0x13, 0xeb, 0x2a, 0x4d, 0x83, 0xa6, 0x1d, 0x1a, 0xf4, 0x7f, 0x06,
0x5e, 0xc3, 0xa4, 0x2b, 0x89, 0x22, 0xcf, 0x04, 0x27, 0x43, 0xad, 0x42, 0xce, 0x55, 0xea, 0x7e,
0x68, 0xd0, 0x33, 0xce, 0xd2, 0x86, 0xd1, 0x4e, 0x8d, 0xe3, 0x7c, 0x43, 0x00, 0x9d, 0x65, 0x7c,
0x1b, 0x46, 0xca, 0xb2, 0x20, 0xc8, 0x1b, 0xfa, 0x57, 0xb4, 0x4e, 0xb0, 0x0f, 0x76, 0xb3, 0x0a,
0x32, 0xf0, 0x86, 0xfe, 0xf5, 0xe2, 0x66, 0xc0, 0xa2, 0x34, 0xa0, 0xec, 0xe3, 0x4a, 0x97, 0x69,
0xdb, 0xc6, 0x77, 0xc1, 0x52, 0x3b, 0x78, 0xba, 0xd5, 0xae, 0xae, 0x68, 0x93, 0xe1, 0x29, 0x8c,
0x45, 0xc6, 0x0a, 0x91, 0xe4, 0xf2, 0x19, 0x93, 0x09, 0x31, 0xb5, 0xfc, 0x3f, 0x35, 0x3c, 0x07,
0x50, 0x68, 0xe5, 0x86, 0x97, 0x64, 0xa4, 0xa7, 0x9a, 0x04, 0x7a, 0xa5, 0x2f, 0x4e, 0x75, 0xda,
0xc3, 0x38, 0x7f, 0x06, 0x60, 0xaa, 0x59, 0x9d, 0x2f, 0x08, 0xec, 0xf6, 0x17, 0x5d, 0xd6, 0x08,
0x5f, 0x11, 0xdc, 0x68, 0xb7, 0x72, 0x59, 0xd6, 0x97, 0xde, 0xf7, 0x83, 0x8b, 0xf6, 0x07, 0x17,
0xfd, 0x3e, 0xb8, 0xe8, 0xf3, 0xd1, 0x35, 0xf6, 0x47, 0xd7, 0xf8, 0x71, 0x74, 0x8d, 0xd7, 0x56,
0x7d, 0xfb, 0x1b, 0x4b, 0x9f, 0xe2, 0xa3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x3c, 0xed, 0x7d,
0xff, 0x19, 0x04, 0x00, 0x00,
}
func (m *Sync) Marshal() (dAtA []byte, err error) {
@ -555,19 +555,17 @@ func (m *SyncContent) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.Messages) > 0 {
for iNdEx := len(m.Messages) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Messages[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSync(dAtA, i, uint64(size))
if m.Message != nil {
{
size, err := m.Message.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i--
dAtA[i] = 0xa
i -= size
i = encodeVarintSync(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
@ -938,11 +936,9 @@ func (m *SyncContent) Size() (n int) {
}
var l int
_ = l
if len(m.Messages) > 0 {
for _, e := range m.Messages {
l = e.Size()
n += 1 + l + sovSync(uint64(l))
}
if m.Message != nil {
l = m.Message.Size()
n += 1 + l + sovSync(uint64(l))
}
return n
}
@ -1196,7 +1192,7 @@ func (m *SyncContent) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Messages", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@ -1223,8 +1219,10 @@ func (m *SyncContent) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Messages = append(m.Messages, &SyncContentValue{})
if err := m.Messages[len(m.Messages)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
if m.Message == nil {
m.Message = &SyncContentValue{}
}
if err := m.Message.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex