Fix sync bugs and add logs

This commit is contained in:
mcrakhman 2022-08-05 18:14:35 +02:00
parent 31517db9bf
commit fbaf96eb38
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
7 changed files with 42 additions and 14 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"go.uber.org/zap"
"sync"
)
@ -430,7 +431,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
// but no changes after some of the snapshots
var (
isNewDocument = len(theirPath) != 0
isNewDocument = len(theirPath) == 0
ourPath = a.SnapshotPath()
// by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage
@ -462,16 +463,26 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
return ch, nil
}
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
log.With(
zap.Strings("heads", a.fullTree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", a.id)).
Debug("getting all changes from common snapshot")
_, err = a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, load)
if err != nil {
return nil, err
}
if isNewDocument {
// adding snapshot to raw changes
_, err = load(commonSnapshot)
if err != nil {
return nil, err
}
}
log.With(
zap.Int("len(changes)", len(rawChanges)),
zap.String("id", a.id)).
Debug("sending all changes after common snapshot")
return rawChanges, nil
}
@ -483,6 +494,8 @@ func (a *aclTree) DebugDump() (string, error) {
func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
var i int
var j int
log.With(zap.Strings("our path", ourPath), zap.Strings("their path", theirPath)).
Debug("finding common snapshot for two paths")
OuterLoop:
// find starting point from the right
for i = len(ourPath) - 1; i >= 0; i-- {

View File

@ -6,7 +6,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)

View File

@ -3,14 +3,18 @@ package node
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"go.uber.org/zap"
)
const CName = "NodesService"
var log = logger.NewNamed("nodesservice")
type Node struct {
Address string
PeerId string
@ -46,7 +50,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
if err != nil {
return err
}
log.With(zap.String("node", node.PeerId)).Debug("adding peer to known nodes")
filteredNodes = append(filteredNodes, node)
}
s.nodes = filteredNodes

View File

@ -117,8 +117,6 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
snapshotPath []string
heads []string
)
log.With(zap.String("id", id), zap.String("text", text)).
Debug("creating document")
err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error {
err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin)
@ -154,6 +152,8 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
if err != nil {
return "", err
}
log.With(zap.String("id", id), zap.String("text", text)).
Debug("creating document")
err = s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
Heads: heads,

View File

@ -4,8 +4,8 @@ import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
@ -19,7 +19,7 @@ var log = logger.NewNamed("messageservice")
const CName = "MessageService"
type service struct {
nodes []config.Node
nodes []*node.Node
requestHandler requesthandler.RequestHandler
pool pool.Pool
sync.RWMutex
@ -36,7 +36,7 @@ type Service interface {
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
s.nodes = a.MustComponent(config.CName).(*config.Config).Nodes
s.nodes = a.MustComponent(node.CName).(node.Service).Nodes()
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage)
return nil
@ -106,10 +106,6 @@ func (s *service) SendMessage(ctx context.Context, peerId string, msg *syncproto
}
func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error {
log.With(
zap.String("message", msgType(msg))).
Debug("sending message to all")
// dial manually to all peers
for _, rp := range s.nodes {
if er := s.pool.DialAndAddPeer(context.Background(), rp.PeerId); er != nil {
@ -126,6 +122,10 @@ func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncprot
// TODO: use Broadcast method here when it is ready
for _, n := range s.nodes {
log.With(
zap.String("peerId", n.PeerId),
zap.String("message", msgType(msg))).
Debug("sending message to peer")
err := s.pool.SendAndWait(ctx, n.PeerId, &syncproto.Message{
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
Data: marshalled,

View File

@ -3,6 +3,7 @@ package requesthandler
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
@ -11,6 +12,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
)
type requestHandler struct {
@ -19,6 +21,8 @@ type requestHandler struct {
messageService MessageSender
}
var log = logger.NewNamed("requesthandler")
func New() app.Component {
return &requestHandler{}
}
@ -72,6 +76,8 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
snapshotPath []string
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
Debug("received head update message")
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -122,6 +128,8 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
snapshotPath []string
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
Debug("received full sync request message")
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -164,6 +172,8 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
snapshotPath []string
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
Debug("received full sync response message")
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -227,10 +237,12 @@ func (r *requestHandler) prepareFullSyncResponse(
// filtering our changes, so we will not send the same changes back
var final []*aclpb.RawChange
for _, ch := range ourChanges {
if _, exists := theirMap[ch.Id]; exists {
if _, exists := theirMap[ch.Id]; !exists {
final = append(final, ch)
}
}
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
Debug("sending changes for tree")
return &syncproto.SyncFullResponse{
Heads: tree.Heads(),

View File

@ -69,7 +69,7 @@ func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error {
func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error {
log.
With(zap.String("treeId", treeId)).
With(zap.String("treeId", treeId), zap.Int("len(changes)", len(changes))).
Debug("adding tree with changes")
_, err := s.treeProvider.CreateTreeStorage(treeId, header, changes)