332 lines
9.8 KiB
Go
332 lines
9.8 KiB
Go
package requesthandler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type requestHandler struct {
|
|
treeCache treecache.Service
|
|
account account.Service
|
|
messageService MessageSender
|
|
}
|
|
|
|
var log = logger.NewNamed("requesthandler")
|
|
|
|
var ErrIncorrectDocType = errors.New("incorrec doc type")
|
|
|
|
func New() app.Component {
|
|
return &requestHandler{}
|
|
}
|
|
|
|
type RequestHandler interface {
|
|
HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
|
|
}
|
|
|
|
type MessageSender interface {
|
|
SendMessageAsync(peerId string, msg *syncproto.Sync) error
|
|
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
|
|
}
|
|
|
|
const CName = "SyncRequestHandler"
|
|
|
|
func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) {
|
|
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
|
|
r.account = a.MustComponent(account.CName).(account.Service)
|
|
r.messageService = a.MustComponent("MessageService").(MessageSender)
|
|
return nil
|
|
}
|
|
|
|
func (r *requestHandler) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (r *requestHandler) Run(ctx context.Context) (err error) {
|
|
return nil
|
|
}
|
|
|
|
func (r *requestHandler) Close(ctx context.Context) (err error) {
|
|
return nil
|
|
}
|
|
|
|
func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error {
|
|
msg := content.GetMessage()
|
|
switch {
|
|
case msg.GetFullSyncRequest() != nil:
|
|
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
|
|
case msg.GetFullSyncResponse() != nil:
|
|
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
|
|
case msg.GetHeadUpdate() != nil:
|
|
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *requestHandler) HandleHeadUpdate(
|
|
ctx context.Context,
|
|
senderId string,
|
|
update *syncproto.SyncHeadUpdate,
|
|
header *aclpb.Header,
|
|
treeId string) (err error) {
|
|
|
|
var (
|
|
fullRequest *syncproto.SyncFullRequest
|
|
snapshotPath []string
|
|
result tree.AddResult
|
|
)
|
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
|
Debug("processing head update")
|
|
|
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
|
docTree := obj.(tree.DocTree)
|
|
docTree.Lock()
|
|
defer docTree.Unlock()
|
|
|
|
if slice.UnsortedEquals(update.Heads, docTree.Heads()) {
|
|
return nil
|
|
}
|
|
|
|
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error {
|
|
aclTree := obj.(list.ACLList)
|
|
aclTree.RLock()
|
|
defer aclTree.RUnlock()
|
|
|
|
// TODO: check if we already have those changes
|
|
result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", docTree.Heads())).
|
|
Debug("comparing heads after head update")
|
|
shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads())
|
|
snapshotPath = docTree.SnapshotPath()
|
|
if shouldFullSync {
|
|
fullRequest, err = r.prepareFullSyncRequest(update.SnapshotPath, docTree)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
|
|
// if there are no such tree
|
|
if err == storage.ErrUnknownTreeId {
|
|
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
|
|
fullRequest = &syncproto.SyncFullRequest{}
|
|
}
|
|
// if we have incompatible heads, or we haven't seen the tree at all
|
|
if fullRequest != nil {
|
|
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
|
|
}
|
|
// if error or nothing has changed
|
|
if err != nil || len(result.Added) == 0 {
|
|
return err
|
|
}
|
|
log.Info("res", zap.Int("result added", len(result.Added)))
|
|
// otherwise sending heads update message
|
|
newUpdate := &syncproto.SyncHeadUpdate{
|
|
Heads: result.Heads,
|
|
Changes: result.Added,
|
|
SnapshotPath: snapshotPath,
|
|
}
|
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
|
}
|
|
|
|
func (r *requestHandler) HandleFullSyncRequest(
|
|
ctx context.Context,
|
|
senderId string,
|
|
request *syncproto.SyncFullRequest,
|
|
header *aclpb.Header,
|
|
treeId string) (err error) {
|
|
|
|
var (
|
|
fullResponse *syncproto.SyncFullResponse
|
|
snapshotPath []string
|
|
result tree.AddResult
|
|
)
|
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
|
Debug("processing full sync request")
|
|
|
|
log.Info("getting doc tree from treeCache", zap.String("treeId", treeId))
|
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
|
docTree := obj.(tree.DocTree)
|
|
docTree.Lock()
|
|
defer docTree.Unlock()
|
|
|
|
//if slice.UnsortedEquals(request.Heads, docTree.Heads()) {
|
|
// return nil
|
|
//}
|
|
log.Info("getting tree from treeCache", zap.String("aclId", docTree.Header().AclListId))
|
|
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error {
|
|
aclTree := obj.(list.ACLList)
|
|
aclTree.RLock()
|
|
defer aclTree.RUnlock()
|
|
// TODO: check if we already have those changes
|
|
// if we have non-empty request
|
|
if len(request.Heads) != 0 {
|
|
result, err = docTree.AddRawChanges(ctx, aclTree, request.Changes...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
snapshotPath = docTree.SnapshotPath()
|
|
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, docTree)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
|
|
// if error or nothing has changed
|
|
if err != nil || len(result.Added) == 0 {
|
|
return err
|
|
}
|
|
|
|
// otherwise sending heads update message
|
|
newUpdate := &syncproto.SyncHeadUpdate{
|
|
Heads: result.Heads,
|
|
Changes: result.Added,
|
|
SnapshotPath: snapshotPath,
|
|
}
|
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
|
}
|
|
|
|
func (r *requestHandler) HandleFullSyncResponse(
|
|
ctx context.Context,
|
|
senderId string,
|
|
response *syncproto.SyncFullResponse,
|
|
header *aclpb.Header,
|
|
treeId string) (err error) {
|
|
|
|
var (
|
|
snapshotPath []string
|
|
result tree.AddResult
|
|
)
|
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
|
Debug("processing full sync response")
|
|
|
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
|
docTree := obj.(tree.DocTree)
|
|
docTree.Lock()
|
|
defer docTree.Unlock()
|
|
|
|
if slice.UnsortedEquals(response.Heads, docTree.Heads()) {
|
|
return nil
|
|
}
|
|
|
|
return r.treeCache.Do(ctx, docTree.Header().AclListId, func(obj interface{}) error {
|
|
aclTree := obj.(list.ACLList)
|
|
aclTree.RLock()
|
|
defer aclTree.RUnlock()
|
|
// TODO: check if we already have those changes
|
|
result, err = docTree.AddRawChanges(ctx, aclTree, response.Changes...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
snapshotPath = docTree.SnapshotPath()
|
|
return nil
|
|
})
|
|
})
|
|
|
|
// if error or nothing has changed
|
|
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
|
|
return err
|
|
}
|
|
// if we have a new tree
|
|
if err == storage.ErrUnknownTreeId {
|
|
err = r.createTree(ctx, response, header, treeId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
result = tree.AddResult{
|
|
OldHeads: []string{},
|
|
Heads: response.Heads,
|
|
Added: response.Changes,
|
|
}
|
|
}
|
|
// sending heads update message
|
|
newUpdate := &syncproto.SyncHeadUpdate{
|
|
Heads: result.Heads,
|
|
Changes: result.Added,
|
|
SnapshotPath: snapshotPath,
|
|
}
|
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
|
}
|
|
|
|
func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) {
|
|
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &syncproto.SyncFullRequest{
|
|
Heads: t.Heads(),
|
|
Changes: ourChanges,
|
|
SnapshotPath: t.SnapshotPath(),
|
|
}, nil
|
|
}
|
|
|
|
func (r *requestHandler) prepareFullSyncResponse(
|
|
treeId string,
|
|
theirPath []string,
|
|
theirChanges []*aclpb.RawChange,
|
|
t tree.CommonTree) (*syncproto.SyncFullResponse, error) {
|
|
// TODO: we can probably use the common snapshot calculated on the request step from previous peer
|
|
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
theirMap := make(map[string]struct{})
|
|
for _, ch := range theirChanges {
|
|
theirMap[ch.Id] = struct{}{}
|
|
}
|
|
|
|
// filtering our changes, so we will not send the same changes back
|
|
var final []*aclpb.RawChange
|
|
for _, ch := range ourChanges {
|
|
if _, exists := theirMap[ch.Id]; !exists {
|
|
final = append(final, ch)
|
|
}
|
|
}
|
|
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
|
|
Debug("preparing changes for tree")
|
|
|
|
return &syncproto.SyncFullResponse{
|
|
Heads: t.Heads(),
|
|
Changes: final,
|
|
SnapshotPath: t.SnapshotPath(),
|
|
}, nil
|
|
}
|
|
|
|
func (r *requestHandler) createTree(
|
|
ctx context.Context,
|
|
response *syncproto.SyncFullResponse,
|
|
header *aclpb.Header,
|
|
treeId string) error {
|
|
|
|
return r.treeCache.Add(
|
|
ctx,
|
|
treeId,
|
|
header,
|
|
response.Changes,
|
|
func(obj interface{}) error {
|
|
return nil
|
|
})
|
|
}
|