diff --git a/cmd/node/node.go b/cmd/node/node.go index 69c5c601..d65f7a10 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -7,11 +7,17 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/example" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/api" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "go.uber.org/zap" "net/http" _ "net/http/pprof" @@ -85,9 +91,16 @@ func main() { } func Bootstrap(a *app.App) { - a.Register(secure.New()). + a.Register(account.New()). + Register(secure.New()). Register(server.New()). Register(dialer.New()). Register(pool.NewPool()). - Register(&example.Example{}) + //Register(&example.Example{}) + Register(node.New()). + Register(document.New()). + Register(message.New()). + Register(requesthandler.New()). + Register(treecache.New()). + Register(api.New()) } diff --git a/cmd/nodesgen/gen.go b/cmd/nodesgen/gen.go index 7532f12c..069c3c59 100644 --- a/cmd/nodesgen/gen.go +++ b/cmd/nodesgen/gen.go @@ -13,9 +13,8 @@ import ( ) var ( - flagNodeMap = flag.String("n", "cmd/nodesgen/nodemap.yml", "path to nodes map file") - flagAccountConfigFile = flag.String("a", "etc/nodes.yml", "path to account file") - flagEtcPath = flag.String("e", "etc", "path to etc directory") + flagNodeMap = flag.String("n", "cmd/nodesgen/nodemap.yml", "path to nodes map file") + flagEtcPath = flag.String("e", "etc", "path to etc directory") ) type NodesMap struct { diff --git a/service/net/dialer/dialer.go b/service/net/dialer/dialer.go index a1cb8a31..b90bf872 100644 --- a/service/net/dialer/dialer.go +++ b/service/net/dialer/dialer.go @@ -42,10 +42,10 @@ type dialer struct { func (d *dialer) Init(ctx context.Context, a *app.App) (err error) { d.transport = a.MustComponent(secure.CName).(secure.Service) - peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote + nodes := a.MustComponent(config.CName).(*config.Config).Nodes d.peerAddrs = map[string][]string{} - for _, rp := range peerConf { - d.peerAddrs[rp.PeerId] = []string{rp.Addr} + for _, n := range nodes { + d.peerAddrs[n.PeerId] = []string{n.Address} } return } diff --git a/service/sync/document/service.go b/service/sync/document/service.go index 8a4ae6c5..8aae859b 100644 --- a/service/sync/document/service.go +++ b/service/sync/document/service.go @@ -11,8 +11,8 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/gogo/protobuf/proto" "go.uber.org/zap" ) @@ -100,10 +100,10 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro zap.String("header", header.String())). Debug("document updated in the database") - return s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{ + return s.messageService.SendToSpace("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, - TreeId: "", + TreeId: id, SnapshotPath: snapshotPath, TreeHeader: header, })) @@ -155,10 +155,10 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e return "", err } - err = s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{ + err = s.messageService.SendToSpace("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, - TreeId: "", + TreeId: id, SnapshotPath: snapshotPath, TreeHeader: header, })) diff --git a/service/sync/message/service.go b/service/sync/message/service.go index e3061657..c78b0e1c 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -4,9 +4,11 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" - "github.com/cheggaaa/mb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "github.com/gogo/protobuf/proto" "go.uber.org/zap" "sync" ) @@ -16,35 +18,25 @@ var log = logger.NewNamed("messageservice") const CName = "MessageService" type service struct { - receiveBatcher *mb.MB - sendBatcher *mb.MB - senderChannels map[string]chan *syncpb.SyncContent + nodes []config.Node requestHandler requesthandler.RequestHandler + pool pool.Pool sync.RWMutex } -type message struct { - peerId string - content *syncpb.SyncContent -} - func New() app.Component { return &service{} } type Service interface { - RegisterMessageSender(peerId string) chan *syncpb.SyncContent - UnregisterMessageSender(peerId string) - - HandleMessage(peerId string, msg *syncpb.SyncContent) error - SendMessage(peerId string, msg *syncpb.SyncContent) error + SendMessage(peerId string, msg *syncproto.Sync) error + SendToSpace(spaceId string, msg *syncproto.Sync) error } func (s *service) Init(ctx context.Context, a *app.App) (err error) { - s.receiveBatcher = mb.New(0) - s.sendBatcher = mb.New(0) - s.senderChannels = make(map[string]chan *syncpb.SyncContent) s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) + s.nodes = a.MustComponent(config.CName).(*config.Config).Nodes + s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -53,8 +45,16 @@ func (s *service) Name() (name string) { } func (s *service) Run(ctx context.Context) (err error) { - //go s.runSender(ctx) - //go s.runReceiver(ctx) + // dial manually to all peers + for _, rp := range s.nodes { + if er := s.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil { + log.Info("can't dial to peer", zap.Error(er)) + } else { + log.Info("connected with peer", zap.String("peerId", rp.PeerId)) + } + } + s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage) + return nil } @@ -62,104 +62,74 @@ func (s *service) Close(ctx context.Context) (err error) { return nil } -func (s *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent { - s.Lock() - defer s.Unlock() - if ch, exists := s.senderChannels[peerId]; !exists { - return ch - } - ch := make(chan *syncpb.SyncContent) - s.senderChannels[peerId] = ch - return ch -} - -func (s *service) UnregisterMessageSender(peerId string) { - s.Lock() - defer s.Unlock() - if _, exists := s.senderChannels[peerId]; !exists { - return - } - close(s.senderChannels[peerId]) - delete(s.senderChannels, peerId) -} - -func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) error { +func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) { log.With( - zap.String("peerId", peerId), - zap.String("message", msgType(msg))). + zap.String("peerId", msg.Peer().Id())). Debug("handling message from peer") - return s.receiveBatcher.Add(&message{ - peerId: peerId, - content: msg, - }) + + var syncMsg *syncproto.Sync + err = proto.Unmarshal(msg.Data, syncMsg) + if err != nil { + return err + } + + return s.requestHandler.HandleSyncMessage(ctx, msg.Peer().Id(), syncMsg) } -func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) error { +func (s *service) SendMessage(peerId string, msg *syncproto.Sync) error { log.With( zap.String("peerId", peerId), zap.String("message", msgType(msg))). Debug("sending message to peer") - return s.sendBatcher.Add(&message{ - peerId: peerId, - content: msg, + + marshalled, err := proto.Marshal(msg) + if err != nil { + return err + } + + err = s.pool.SendAndWait(context.Background(), peerId, &syncproto.Message{ + Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, + Data: marshalled, }) + if err != nil { + log.With( + zap.String("peerId", peerId), + zap.String("message", msgType(msg)), + zap.Error(err)). + Error("failed to send message to peer") + } + return err } -func (s *service) runReceiver(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - break - } - msgs := s.receiveBatcher.WaitMinMax(1, 100) - // TODO: this is bad to serve everything on a new goroutine, but very easy for prototyping :-) - for _, msg := range msgs { - typedMsg := msg.(*message) - go func(typedMsg *message) { - err := s.requestHandler.HandleFullSyncContent(ctx, typedMsg.peerId, typedMsg.content) - if err != nil { - log.Error("failed to handle content", zap.Error(err)) - } - }(typedMsg) +func (s *service) SendToSpace(spaceId string, msg *syncproto.Sync) error { + log.With( + zap.String("message", msgType(msg))). + Debug("sending message to all") + + marshalled, err := proto.Marshal(msg) + if err != nil { + return err + } + + // TODO: use Broadcast method here when it is ready + for _, n := range s.nodes { + err := s.pool.SendAndWait(context.Background(), n.PeerId, &syncproto.Message{ + Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, + Data: marshalled, + }) + if err != nil { + log.With( + zap.String("peerId", n.PeerId), + zap.String("message", msgType(msg)), + zap.Error(err)). + Error("failed to send message to peer") } } + + return nil } -func (s *service) runSender(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - break - } - msgs := s.sendBatcher.WaitMinMax(1, 100) - s.RLock() - for _, msg := range msgs { - s.sendMessage(msg.(*message)) - } - s.RUnlock() - } -} - -func (s *service) sendMessage(typedMsg *message) { - // this should be done under lock - if typedMsg.content.GetMessage().GetHeadUpdate() != nil { - for _, ch := range s.senderChannels { - ch <- typedMsg.content - } - return - } - ch, exists := s.senderChannels[typedMsg.peerId] - if !exists { - return - } - ch <- typedMsg.content -} - -func msgType(content *syncpb.SyncContent) string { +func msgType(content *syncproto.Sync) string { msg := content.GetMessage() switch { case msg.GetFullSyncRequest() != nil: diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index 35456cce..724be478 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -24,12 +24,12 @@ func New() app.Component { } type RequestHandler interface { - HandleFullSyncContent(ctx context.Context, senderId string, request *syncproto.Sync) (err error) + HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error) } type MessageSender interface { SendMessage(peerId string, msg *syncproto.Sync) error - SendSpace(spaceId string, msg *syncproto.Sync) error + SendToSpace(spaceId string, msg *syncproto.Sync) error } const CName = "SyncRequestHandler" @@ -53,7 +53,7 @@ func (r *requestHandler) Close(ctx context.Context) (err error) { return nil } -func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId string, content *syncproto.Sync) error { +func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error { msg := content.GetMessage() switch { case msg.GetFullSyncRequest() != nil: @@ -113,7 +113,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, TreeId: update.TreeId, TreeHeader: update.TreeHeader, } - return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) { @@ -156,7 +156,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str TreeId: request.TreeId, TreeHeader: request.TreeHeader, } - return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) { @@ -192,7 +192,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st SnapshotPath: snapshotPath, TreeId: response.TreeId, } - return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate)) } func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) {