WIP sync logic prototype with server

This commit is contained in:
mcrakhman 2022-07-19 10:40:31 +02:00 committed by Mikhail Iudin
parent 2ec62c6d77
commit b8016c1d44
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
4 changed files with 54 additions and 82 deletions

View File

@ -1,56 +0,0 @@
package client
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
)
const CName = "SyncClient"
type client struct {
handler requesthandler.RequestHandler
}
func NewClient() app.Component {
return &client{}
}
type Client interface {
NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error
RequestFullSync(id string, request *syncpb.SyncFullRequest) error
SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error
}
func (c *client) Init(ctx context.Context, a *app.App) (err error) {
c.handler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
return nil
}
func (c *client) Name() (name string) {
return CName
}
func (c *client) Run(ctx context.Context) (err error) {
return nil
}
func (c *client) Close(ctx context.Context) (err error) {
return nil
}
func (c *client) NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error {
//TODO implement me
panic("implement me")
}
func (c *client) RequestFullSync(id string, request *syncpb.SyncFullRequest) error {
//TODO implement me
panic("implement me")
}
func (c *client) SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error {
//TODO implement me
panic("implement me")
}

View File

@ -29,9 +29,6 @@ func New() DRPCServer {
type DRPCServer interface {
app.ComponentRunnable
SendMessage(peerId string, msg *syncpb.SyncContent)
BroadcastMessage(msg *syncpb.SyncContent)
}
type drpcServer struct {
@ -176,7 +173,10 @@ func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, pee
return
}
}
s.messageService.HandleMessage(peerId, msg)
err := s.messageService.HandleMessage(peerId, msg)
if err != nil {
log.Error("error handling message", zap.Error(err))
}
}
}

View File

@ -35,8 +35,8 @@ func NewMessageService() app.Component {
type Service interface {
RegisterMessageSender(peerId string) chan *syncpb.SyncContent
UnregisterMessageSender(peerId string)
HandleMessage(peerId string, msg *syncpb.SyncContent)
SendMessage(peerId string, msg *syncpb.SyncContent)
HandleMessage(peerId string, msg *syncpb.SyncContent) error
SendMessage(peerId string, msg *syncpb.SyncContent) error
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
@ -82,15 +82,15 @@ func (s *service) UnregisterMessageSender(peerId string) {
delete(s.senderChannels, peerId)
}
func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) {
_ = s.receiveBatcher.Add(&message{
func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) error {
return s.receiveBatcher.Add(&message{
peerId: peerId,
content: msg,
})
}
func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) {
_ = s.sendBatcher.Add(&message{
func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) error {
return s.sendBatcher.Add(&message{
peerId: peerId,
content: msg,
})
@ -129,13 +129,23 @@ func (s *service) runSender(ctx context.Context) {
msgs := s.sendBatcher.WaitMinMax(1, 100)
s.RLock()
for _, msg := range msgs {
typedMsg := msg.(*message)
ch, exists := s.senderChannels[typedMsg.peerId]
if !exists {
continue
}
ch <- typedMsg.content
s.sendMessage(msg.(*message))
}
s.RUnlock()
}
}
func (s *service) sendMessage(typedMsg *message) {
// this should be done under lock
if typedMsg.content.GetMessage().GetHeadUpdate() != nil {
for _, ch := range s.senderChannels {
ch <- typedMsg.content
}
return
}
ch, exists := s.senderChannels[typedMsg.peerId]
if !exists {
return
}
ch <- typedMsg.content
}

View File

@ -8,16 +8,16 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/client"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type requestHandler struct {
treeCache treecache.Service
client client.Client
account account.Service
treeCache treecache.Service
account account.Service
messageService message.Service
}
func NewRequestHandler() app.Component {
@ -32,8 +32,8 @@ const CName = "SyncRequestHandler"
func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) {
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
r.client = a.MustComponent(client.CName).(client.Client)
r.account = a.MustComponent(account.CName).(account.Service)
r.messageService = a.MustComponent(message.CName).(message.Service)
return nil
}
@ -95,7 +95,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.client.RequestFullSync(senderId, fullRequest)
return r.messageService.SendMessage(senderId, wrapFullRequest(fullRequest))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
@ -109,7 +109,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
TreeId: update.TreeId,
TreeHeader: update.TreeHeader,
}
return r.client.NotifyHeadsChanged(newUpdate)
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
}
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) {
@ -138,7 +138,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
if err != nil {
return err
}
err = r.client.SendFullSyncResponse(senderId, fullResponse)
err = r.messageService.SendMessage(senderId, wrapFullResponse(fullResponse))
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
@ -152,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
TreeId: request.TreeId,
TreeHeader: request.TreeHeader,
}
return r.client.NotifyHeadsChanged(newUpdate)
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
}
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) {
@ -188,7 +188,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
SnapshotPath: snapshotPath,
TreeId: response.TreeId,
}
return r.client.NotifyHeadsChanged(newUpdate)
return r.messageService.SendMessage("", wrapHeadUpdate(newUpdate))
}
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
@ -240,3 +240,21 @@ func (r *requestHandler) prepareFullSyncResponse(
func (r *requestHandler) createTree(ctx context.Context, response *syncpb.SyncFullResponse) error {
return r.treeCache.Add(ctx, response.TreeId, response.TreeHeader, response.Changes)
}
func wrapHeadUpdate(update *syncpb.SyncHeadUpdate) *syncpb.SyncContent {
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
Value: &syncpb.SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
}}
}
func wrapFullRequest(request *syncpb.SyncFullRequest) *syncpb.SyncContent {
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
Value: &syncpb.SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
}}
}
func wrapFullResponse(response *syncpb.SyncFullResponse) *syncpb.SyncContent {
return &syncpb.SyncContent{Message: &syncpb.SyncContentValue{
Value: &syncpb.SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
}}
}