package message import ( "context" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/gogo/protobuf/proto" "sync" "time" ) var log = logger.NewNamed("messageservice") const CName = "MessageService" type service struct { nodes []*node.Node requestHandler requesthandler.RequestHandler pool pool.Pool sync.RWMutex } func New() app.Component { return &service{} } type Service interface { SendMessageAsync(peerId string, msg *syncproto.Sync) error SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error } func (s *service) Init(ctx context.Context, a *app.App) (err error) { s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) s.nodes = a.MustComponent(node.CName).(node.Service).Nodes() s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage) return nil } func (s *service) Name() (name string) { return CName } func (s *service) Run(ctx context.Context) (err error) { return nil } func (s *service) Close(ctx context.Context) (err error) { return nil } func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) { defer func() { if err != nil { msg.AckError(syncproto.System_Error_UNKNOWN, err.Error()) } else { msg.Ack() } }() syncMsg := &syncproto.Sync{} err = proto.Unmarshal(msg.Data, syncMsg) if err != nil { return } timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() err = s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg) return } func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) { _, err = s.pool.DialAndAddPeer(context.Background(), peerId) if err != nil { return } marshalled, err := proto.Marshal(msg) if err != nil { return } go s.sendAsync(peerId, msgInfo(msg), marshalled) return } func (s *service) SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error { for _, rp := range s.nodes { s.SendMessageAsync(rp.PeerId, msg) } return nil } func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() return s.pool.SendAndWait(ctx, peerId, &syncproto.Message{ Header: &syncproto.Header{ Type: syncproto.MessageType_MessageTypeSync, DebugInfo: msgTypeStr, }, Data: marshalled, }) } func msgInfo(content *syncproto.Sync) (syncMethod string) { msg := content.GetMessage() switch { case msg.GetFullSyncRequest() != nil: syncMethod = "FullSyncRequest" case msg.GetFullSyncResponse() != nil: syncMethod = "FullSyncResponse" case msg.GetHeadUpdate() != nil: syncMethod = "HeadUpdate" } syncMethod = fmt.Sprintf("method: %s, treeType: %s", syncMethod, content.TreeHeader.DocType.String()) return }