This commit is contained in:
mcrakhman 2022-09-14 18:54:10 +02:00
parent 206b695e62
commit f900cbff0b
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
6 changed files with 197 additions and 293 deletions

View File

@ -21,5 +21,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
}
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
return fmt.Errorf("not implemented")
}

View File

@ -0,0 +1,165 @@
package syncservice
import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/libp2p/go-libp2p-core/sec"
"storj.io/drpc"
"storj.io/drpc/drpcctx"
"sync"
)
var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed")
const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams
type StreamPool interface {
AddStream(stream spacesyncproto.SpaceStream) (err error)
SyncClient
}
type SyncClient interface {
SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
}
type MessageHandler interface {
HandleMessage(peerId string, message *spacesyncproto.ObjectSyncMessage)
}
type streamPool struct {
sync.Mutex
peerStreams map[string]spacesyncproto.SpaceStream
messageHandler MessageHandler
}
func newStreamPool(messageHandler MessageHandler) StreamPool {
return &streamPool{
peerStreams: make(map[string]spacesyncproto.SpaceStream),
messageHandler: messageHandler,
}
}
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getStream(peerId)
if err != nil {
return
}
return stream.Send(message)
}
func (s *streamPool) getStream(id string) (stream spacesyncproto.SpaceStream, err error) {
s.Lock()
defer s.Unlock()
stream, exists := s.peerStreams[id]
if !exists {
err = ErrEmptyPeer
return
}
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
err = ErrStreamClosed
default:
}
return
}
func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) {
s.Lock()
defer s.Unlock()
Loop:
for id, stream := range s.peerStreams {
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
continue Loop
default:
}
streams = append(streams, stream)
}
return
}
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
streams := s.getAllStreams()
for _, stream := range streams {
if err = stream.Send(message); err != nil {
// TODO: add logging
}
}
return nil
}
func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) {
s.Lock()
defer s.Unlock()
peerId, err := getPeerIdFromStream(stream)
if err != nil {
return
}
s.peerStreams[peerId] = stream
go s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{}
}
Loop:
for {
msg, err := stream.Recv()
if err != nil {
break
}
select {
case <-limiter:
case <-stream.Context().Done():
break Loop
}
go func() {
defer func() {
limiter <- struct{}{}
}()
s.messageHandler.HandleMessage(peerId, msg)
}()
}
if err = s.removePeer(peerId); err != nil {
// TODO: log something
}
return
}
func (s *streamPool) removePeer(peerId string) (err error) {
s.Lock()
defer s.Unlock()
_, ok := s.peerStreams[peerId]
if !ok {
return ErrEmptyPeer
}
delete(s.peerStreams, peerId)
return
}
func getPeerIdFromStream(stream drpc.Stream) (string, error) {
ctx := stream.Context()
conn, ok := ctx.Value(drpcctx.TransportKey{}).(sec.SecureConn)
if !ok {
return "", fmt.Errorf("incorrect connection type in stream")
}
return conn.RemotePeer().String(), nil
}

View File

@ -19,10 +19,6 @@ type SyncHandler interface {
HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
}
type SyncClient interface {
SendSyncMessage(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
return &syncHandler{
treeCache: treeCache,
@ -96,7 +92,7 @@ func (s *syncHandler) HandleHeadUpdate(
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId))
return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
@ -109,7 +105,7 @@ func (s *syncHandler) HandleHeadUpdate(
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (s *syncHandler) HandleFullSyncRequest(
@ -142,7 +138,7 @@ func (s *syncHandler) HandleFullSyncRequest(
if err != nil {
return err
}
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId))
return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId))
}
func (s *syncHandler) HandleFullSyncResponse(
@ -163,21 +159,24 @@ func (s *syncHandler) HandleFullSyncResponse(
}
err = func() error {
objTree := res.Tree
objTree.Lock()
syncTree := res.Tree
syncTree.Lock()
defer res.Release()
defer objTree.Unlock()
defer syncTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
if slice.UnsortedEquals(response.Heads, syncTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
// syncTree -> syncService: HeadUpdate()
// AddRawChanges -> syncTree.addRawChanges(); syncService.HeadUpdate()
result, err = syncTree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = objTree.SnapshotPath()
snapshotPath = syncTree.SnapshotPath()
return nil
}()
@ -203,7 +202,7 @@ func (s *syncHandler) HandleFullSyncResponse(
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) {

View File

@ -1,291 +1,32 @@
package syncservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
)
type syncService struct {
treeCache treecache.Service
account account.Service
messageService MessageSender
}
var log = logger.NewNamed("requesthandler")
func New() app.Component {
return &syncService{}
}
type SyncService interface {
HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool
}
type MessageSender interface {
SendMessageAsync(peerId string, msg *syncproto.Sync) error
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
type syncService struct {
syncHandler SyncHandler
streamPool StreamPool
configuration nodeconf.Configuration
}
const CName = "SyncRequestHandler"
func (r *syncService) Init(a *app.App) (err error) {
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
r.account = a.MustComponent(account.CName).(account.Service)
r.messageService = a.MustComponent("MessageService").(MessageSender)
return nil
func (s *syncService) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
msg := spacesyncproto.WrapHeadUpdate(update, header, treeId)
all
return s.streamPool.BroadcastAsync(msg)
}
func (r *syncService) Name() (name string) {
return CName
func (s *syncService) StreamPool() StreamPool {
return s.streamPool
}
func (r *syncService) Run(ctx context.Context) (err error) {
return nil
}
func (r *syncService) Close(ctx context.Context) (err error) {
return nil
}
func (r *syncService) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error {
msg := content.GetMessage()
switch {
case msg.GetFullSyncRequest() != nil:
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetFullSyncResponse() != nil:
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetAclList() != nil:
return r.HandleACLList(ctx, senderId, msg.GetAclList(), content.GetTreeHeader(), content.GetTreeId())
}
return nil
}
func (r *syncService) HandleHeadUpdate(
ctx context.Context,
senderId string,
update *syncproto.SyncHeadUpdate,
header *aclpb.Header,
treeId string) (err error) {
var (
fullRequest *syncproto.SyncFullRequest
snapshotPath []string
result tree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing head update")
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
// if we couldn't add all the changes
shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = objTree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(objTree)
if err != nil {
return err
}
}
return nil
})
// if there are no such tree
if err == storage.ErrUnknownTreeId {
fullRequest = &syncproto.SyncFullRequest{}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *syncService) HandleFullSyncRequest(
ctx context.Context,
senderId string,
request *syncproto.SyncFullRequest,
header *aclpb.Header,
treeId string) (err error) {
var fullResponse *syncproto.SyncFullResponse
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
}
func (r *syncService) HandleFullSyncResponse(
ctx context.Context,
senderId string,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) (err error) {
var (
snapshotPath []string
result tree.AddResult
)
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = objTree.SnapshotPath()
return nil
})
// if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
return err
}
// if we have a new tree
if err == storage.ErrUnknownTreeId {
err = r.createTree(ctx, response, header, treeId)
if err != nil {
return err
}
result = tree.AddResult{
OldHeads: []string{},
Heads: response.Heads,
Added: response.Changes,
}
}
// sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *syncService) HandleACLList(
ctx context.Context,
senderId string,
req *syncproto.SyncACLList,
header *aclpb.Header,
id string) (err error) {
err = r.treeCache.Do(ctx, id, func(obj interface{}) error {
return nil
})
// do nothing if already added
if err == nil {
return nil
}
// if not found then add to storage
if err == storage.ErrUnknownTreeId {
return r.createACLList(ctx, req, header, id)
}
return err
}
func (r *syncService) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
return &syncproto.SyncFullRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *syncService) prepareFullSyncResponse(
treeId string,
theirPath, theirHeads []string,
t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
if err != nil {
return nil, err
}
return &syncproto.SyncFullResponse{
Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *syncService) createTree(
ctx context.Context,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.TreeStorageCreatePayload{
TreeId: treeId,
Header: header,
Changes: response.Changes,
Heads: response.Heads,
})
}
func (r *syncService) createACLList(
ctx context.Context,
req *syncproto.SyncACLList,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.ACLListStorageCreatePayload{
ListId: treeId,
Header: header,
Records: req.Records,
})
func newSyncService() {
}

View File

@ -7,7 +7,7 @@ import (
)
type AccountData struct { // TODO: create a convenient constructor for this
Identity []byte // TODO: this is essentially the same as sign key
Identity []byte // public key
SignKey signingkey.PrivKey
EncKey encryptionkey.PrivKey
Decoder keys.Decoder

View File

@ -65,8 +65,6 @@ func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string,
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetAclList() != nil:
return r.HandleACLList(ctx, senderId, msg.GetAclList(), content.GetTreeHeader(), content.GetTreeId())
}
return nil
}