diff --git a/service/sync/client/client.go b/service/sync/client/client.go deleted file mode 100644 index 8cc292de..00000000 --- a/service/sync/client/client.go +++ /dev/null @@ -1,56 +0,0 @@ -package client - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" -) - -const CName = "SyncClient" - -type client struct { - handler requesthandler.RequestHandler -} - -func NewClient() app.Component { - return &client{} -} - -type Client interface { - NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error - RequestFullSync(id string, request *syncpb.SyncFullRequest) error - SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error -} - -func (c *client) Init(ctx context.Context, a *app.App) (err error) { - c.handler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler) - return nil -} - -func (c *client) Name() (name string) { - return CName -} - -func (c *client) Run(ctx context.Context) (err error) { - return nil -} - -func (c *client) Close(ctx context.Context) (err error) { - return nil -} - -func (c *client) NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error { - //TODO implement me - panic("implement me") -} - -func (c *client) RequestFullSync(id string, request *syncpb.SyncFullRequest) error { - //TODO implement me - panic("implement me") -} - -func (c *client) SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error { - //TODO implement me - panic("implement me") -} diff --git a/service/sync/drpcserver/drpcserver.go b/service/sync/drpcserver/drpcserver.go index c341e326..90613d1a 100644 --- a/service/sync/drpcserver/drpcserver.go +++ b/service/sync/drpcserver/drpcserver.go @@ -29,9 +29,6 @@ func New() DRPCServer { type DRPCServer interface { app.ComponentRunnable - - SendMessage(peerId string, msg *syncpb.SyncContent) - BroadcastMessage(msg *syncpb.SyncContent) } type drpcServer struct { @@ -176,7 +173,10 @@ func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, pee return } } - s.messageService.HandleMessage(peerId, msg) + err := s.messageService.HandleMessage(peerId, msg) + if err != nil { + log.Error("error handling message", zap.Error(err)) + } } } diff --git a/service/sync/message/service.go b/service/sync/message/service.go index 92ee6310..e6670c63 100644 --- a/service/sync/message/service.go +++ b/service/sync/message/service.go @@ -35,8 +35,8 @@ func NewMessageService() app.Component { type Service interface { RegisterMessageSender(peerId string) chan *syncpb.SyncContent UnregisterMessageSender(peerId string) - HandleMessage(peerId string, msg *syncpb.SyncContent) - SendMessage(peerId string, msg *syncpb.SyncContent) + HandleMessage(peerId string, msg *syncpb.SyncContent) error + SendMessage(peerId string, msg *syncpb.SyncContent) error } func (s *service) Init(ctx context.Context, a *app.App) (err error) { @@ -82,15 +82,15 @@ func (s *service) UnregisterMessageSender(peerId string) { delete(s.senderChannels, peerId) } -func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) { - _ = s.receiveBatcher.Add(&message{ +func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) error { + return s.receiveBatcher.Add(&message{ peerId: peerId, content: msg, }) } -func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) { - _ = s.sendBatcher.Add(&message{ +func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) error { + return s.sendBatcher.Add(&message{ peerId: peerId, content: msg, }) @@ -129,13 +129,23 @@ func (s *service) runSender(ctx context.Context) { msgs := s.sendBatcher.WaitMinMax(1, 100) s.RLock() for _, msg := range msgs { - typedMsg := msg.(*message) - ch, exists := s.senderChannels[typedMsg.peerId] - if !exists { - continue - } - ch <- typedMsg.content + s.sendMessage(msg.(*message)) } s.RUnlock() } } + +func (s *service) sendMessage(typedMsg *message) { + // this should be done under lock + if typedMsg.content.GetMessage().GetHeadUpdate() != nil { + for _, ch := range s.senderChannels { + ch <- typedMsg.content + } + return + } + ch, exists := s.senderChannels[typedMsg.peerId] + if !exists { + return + } + ch <- typedMsg.content +} diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index 36f7bf8b..f9d854a5 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -8,16 +8,16 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/client" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" ) type requestHandler struct { - treeCache treecache.Service - client client.Client - account account.Service + treeCache treecache.Service + account account.Service + messageService message.Service } func NewRequestHandler() app.Component { @@ -32,8 +32,8 @@ const CName = "SyncRequestHandler" func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) { r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) - r.client = a.MustComponent(client.CName).(client.Client) r.account = a.MustComponent(account.CName).(account.Service) + r.messageService = a.MustComponent(message.CName).(message.Service) return nil } @@ -95,7 +95,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, } // if we have incompatible heads, or we haven't seen the tree at all if fullRequest != nil { - return r.client.RequestFullSync(senderId, fullRequest) + return r.messageService.SendMessage(senderId, wrapFullRequest(fullRequest)) } // if error or nothing has changed if err != nil || len(result.Added) == 0 { @@ -109,7 +109,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, TreeId: update.TreeId, TreeHeader: update.TreeHeader, } - return r.client.NotifyHeadsChanged(newUpdate) + return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) { @@ -138,7 +138,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str if err != nil { return err } - err = r.client.SendFullSyncResponse(senderId, fullResponse) + err = r.messageService.SendMessage(senderId, wrapFullResponse(fullResponse)) // if error or nothing has changed if err != nil || len(result.Added) == 0 { return err @@ -152,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str TreeId: request.TreeId, TreeHeader: request.TreeHeader, } - return r.client.NotifyHeadsChanged(newUpdate) + return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) } func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) { @@ -188,7 +188,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st SnapshotPath: snapshotPath, TreeId: response.TreeId, } - return r.client.NotifyHeadsChanged(newUpdate) + return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate)) } func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { @@ -240,3 +240,21 @@ func (r *requestHandler) prepareFullSyncResponse( func (r *requestHandler) createTree(ctx context.Context, response *syncpb.SyncFullResponse) error { return r.treeCache.Add(ctx, response.TreeId, response.TreeHeader, response.Changes) } + +func wrapHeadUpdate(update *syncpb.SyncHeadUpdate) *syncpb.SyncContent { + return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ + Value: &syncpb.SyncContentValueValueOfHeadUpdate{HeadUpdate: update}, + }} +} + +func wrapFullRequest(request *syncpb.SyncFullRequest) *syncpb.SyncContent { + return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ + Value: &syncpb.SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request}, + }} +} + +func wrapFullResponse(response *syncpb.SyncFullResponse) *syncpb.SyncContent { + return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{ + Value: &syncpb.SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response}, + }} +}