Fix sync bugs and add logs
This commit is contained in:
parent
203719b999
commit
1c81926f35
@ -7,6 +7,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -430,7 +431,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
|
||||
// but no changes after some of the snapshots
|
||||
|
||||
var (
|
||||
isNewDocument = len(theirPath) != 0
|
||||
isNewDocument = len(theirPath) == 0
|
||||
ourPath = a.SnapshotPath()
|
||||
// by default returning everything we have
|
||||
commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage
|
||||
@ -462,16 +463,26 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
|
||||
return ch, nil
|
||||
}
|
||||
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
|
||||
log.With(
|
||||
zap.Strings("heads", a.fullTree.Heads()),
|
||||
zap.String("breakpoint", commonSnapshot),
|
||||
zap.String("id", a.id)).
|
||||
Debug("getting all changes from common snapshot")
|
||||
_, err = a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, load)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isNewDocument {
|
||||
// adding snapshot to raw changes
|
||||
_, err = load(commonSnapshot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
log.With(
|
||||
zap.Int("len(changes)", len(rawChanges)),
|
||||
zap.String("id", a.id)).
|
||||
Debug("sending all changes after common snapshot")
|
||||
|
||||
return rawChanges, nil
|
||||
}
|
||||
@ -483,6 +494,8 @@ func (a *aclTree) DebugDump() (string, error) {
|
||||
func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
|
||||
var i int
|
||||
var j int
|
||||
log.With(zap.Strings("our path", ourPath), zap.Strings("their path", theirPath)).
|
||||
Debug("finding common snapshot for two paths")
|
||||
OuterLoop:
|
||||
// find starting point from the right
|
||||
for i = len(ourPath) - 1; i >= 0; i-- {
|
||||
|
||||
@ -4,10 +4,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/thread"
|
||||
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
|
||||
//"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/lib/logging"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
|
||||
)
|
||||
|
||||
@ -18,18 +16,18 @@ var (
|
||||
|
||||
type treeBuilder struct {
|
||||
cache map[string]*Change
|
||||
identityKeys map[string]keys.SigningPubKey
|
||||
signingPubKeyDecoder keys.SigningPubKeyDecoder
|
||||
identityKeys map[string]signingkey.PubKey
|
||||
signingPubKeyDecoder signingkey.PubKeyDecoder
|
||||
tree *Tree
|
||||
thread thread.Thread
|
||||
treeStorage treestorage.TreeStorage
|
||||
|
||||
*changeLoader
|
||||
}
|
||||
|
||||
func newTreeBuilder(t thread.Thread, decoder keys.SigningPubKeyDecoder) *treeBuilder {
|
||||
func newTreeBuilder(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder) *treeBuilder {
|
||||
return &treeBuilder{
|
||||
signingPubKeyDecoder: decoder,
|
||||
thread: t,
|
||||
treeStorage: t,
|
||||
changeLoader: newChangeLoader(
|
||||
t,
|
||||
decoder,
|
||||
@ -39,15 +37,23 @@ func newTreeBuilder(t thread.Thread, decoder keys.SigningPubKeyDecoder) *treeBui
|
||||
|
||||
func (tb *treeBuilder) Init() {
|
||||
tb.cache = make(map[string]*Change)
|
||||
tb.identityKeys = make(map[string]keys.SigningPubKey)
|
||||
tb.identityKeys = make(map[string]signingkey.PubKey)
|
||||
tb.tree = &Tree{}
|
||||
tb.changeLoader.Init(tb.cache, tb.identityKeys)
|
||||
}
|
||||
|
||||
func (tb *treeBuilder) Build(fromStart bool) (*Tree, error) {
|
||||
var headsAndOrphans []string
|
||||
headsAndOrphans = append(headsAndOrphans, tb.thread.Orphans()...)
|
||||
headsAndOrphans = append(headsAndOrphans, tb.thread.Heads()...)
|
||||
orphans, err := tb.treeStorage.Orphans()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
heads, err := tb.treeStorage.Heads()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
headsAndOrphans = append(headsAndOrphans, orphans...)
|
||||
headsAndOrphans = append(headsAndOrphans, heads...)
|
||||
|
||||
if fromStart {
|
||||
if err := tb.buildTreeFromStart(headsAndOrphans); err != nil {
|
||||
@ -109,7 +115,10 @@ func (tb *treeBuilder) dfsFromStart(heads []string) (buf []*Change, root *Change
|
||||
possibleRoots = append(possibleRoots, ch)
|
||||
}
|
||||
}
|
||||
header := tb.thread.Header()
|
||||
header, err := tb.treeStorage.Header()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for _, r := range possibleRoots {
|
||||
if r.Id == header.FirstChangeId {
|
||||
return buf, r, nil
|
||||
@ -125,13 +134,16 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error)
|
||||
return
|
||||
}
|
||||
tb.tree.AddFast(ch)
|
||||
changes, err := tb.dfs(heads, breakpoint)
|
||||
changes, err := tb.dfs(heads, breakpoint, tb.loadChange)
|
||||
|
||||
tb.tree.AddFast(changes...)
|
||||
return
|
||||
}
|
||||
|
||||
func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, err error) {
|
||||
func (tb *treeBuilder) dfs(
|
||||
heads []string,
|
||||
breakpoint string,
|
||||
load func(string) (*Change, error)) (buf []*Change, err error) {
|
||||
stack := make([]string, len(heads), len(heads)*2)
|
||||
copy(stack, heads)
|
||||
|
||||
@ -144,7 +156,7 @@ func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, er
|
||||
continue
|
||||
}
|
||||
|
||||
ch, err := tb.loadChange(id)
|
||||
ch, err := load(id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -3,14 +3,18 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const CName = "NodesService"
|
||||
|
||||
var log = logger.NewNamed("nodesservice")
|
||||
|
||||
type Node struct {
|
||||
Address string
|
||||
PeerId string
|
||||
@ -46,7 +50,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.With(zap.String("node", node.PeerId)).Debug("adding peer to known nodes")
|
||||
filteredNodes = append(filteredNodes, node)
|
||||
}
|
||||
s.nodes = filteredNodes
|
||||
|
||||
@ -117,8 +117,6 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
|
||||
snapshotPath []string
|
||||
heads []string
|
||||
)
|
||||
log.With(zap.String("id", id), zap.String("text", text)).
|
||||
Debug("creating document")
|
||||
|
||||
err = s.treeCache.Create(ctx, func(builder acltree.ChangeBuilder) error {
|
||||
err := builder.UserAdd(acc.Identity, acc.EncKey.GetPublic(), aclpb.ACLChange_Admin)
|
||||
@ -154,6 +152,8 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
log.With(zap.String("id", id), zap.String("text", text)).
|
||||
Debug("creating document")
|
||||
|
||||
err = s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
|
||||
@ -4,8 +4,8 @@ import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
@ -19,7 +19,7 @@ var log = logger.NewNamed("messageservice")
|
||||
const CName = "MessageService"
|
||||
|
||||
type service struct {
|
||||
nodes []config.Node
|
||||
nodes []*node.Node
|
||||
requestHandler requesthandler.RequestHandler
|
||||
pool pool.Pool
|
||||
sync.RWMutex
|
||||
@ -36,7 +36,7 @@ type Service interface {
|
||||
|
||||
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
||||
s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
|
||||
s.nodes = a.MustComponent(config.CName).(*config.Config).Nodes
|
||||
s.nodes = a.MustComponent(node.CName).(node.Service).Nodes()
|
||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||
s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage)
|
||||
return nil
|
||||
@ -106,10 +106,6 @@ func (s *service) SendMessage(ctx context.Context, peerId string, msg *syncproto
|
||||
}
|
||||
|
||||
func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error {
|
||||
log.With(
|
||||
zap.String("message", msgType(msg))).
|
||||
Debug("sending message to all")
|
||||
|
||||
// dial manually to all peers
|
||||
for _, rp := range s.nodes {
|
||||
if er := s.pool.DialAndAddPeer(context.Background(), rp.PeerId); er != nil {
|
||||
@ -126,6 +122,10 @@ func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncprot
|
||||
|
||||
// TODO: use Broadcast method here when it is ready
|
||||
for _, n := range s.nodes {
|
||||
log.With(
|
||||
zap.String("peerId", n.PeerId),
|
||||
zap.String("message", msgType(msg))).
|
||||
Debug("sending message to peer")
|
||||
err := s.pool.SendAndWait(ctx, n.PeerId, &syncproto.Message{
|
||||
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
|
||||
Data: marshalled,
|
||||
|
||||
@ -3,6 +3,7 @@ package requesthandler
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type requestHandler struct {
|
||||
@ -19,6 +21,8 @@ type requestHandler struct {
|
||||
messageService MessageSender
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("requesthandler")
|
||||
|
||||
func New() app.Component {
|
||||
return &requestHandler{}
|
||||
}
|
||||
@ -72,6 +76,8 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
||||
snapshotPath []string
|
||||
result acltree.AddResult
|
||||
)
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
|
||||
Debug("received head update message")
|
||||
|
||||
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
@ -122,6 +128,8 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
||||
snapshotPath []string
|
||||
result acltree.AddResult
|
||||
)
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
|
||||
Debug("received full sync request message")
|
||||
|
||||
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
@ -164,6 +172,8 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
||||
snapshotPath []string
|
||||
result acltree.AddResult
|
||||
)
|
||||
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
|
||||
Debug("received full sync response message")
|
||||
|
||||
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
|
||||
// TODO: check if we already have those changes
|
||||
@ -227,10 +237,12 @@ func (r *requestHandler) prepareFullSyncResponse(
|
||||
// filtering our changes, so we will not send the same changes back
|
||||
var final []*aclpb.RawChange
|
||||
for _, ch := range ourChanges {
|
||||
if _, exists := theirMap[ch.Id]; exists {
|
||||
if _, exists := theirMap[ch.Id]; !exists {
|
||||
final = append(final, ch)
|
||||
}
|
||||
}
|
||||
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
|
||||
Debug("sending changes for tree")
|
||||
|
||||
return &syncproto.SyncFullResponse{
|
||||
Heads: tree.Heads(),
|
||||
|
||||
@ -69,7 +69,7 @@ func (s *service) Do(ctx context.Context, treeId string, f ACLTreeFunc) error {
|
||||
|
||||
func (s *service) Add(ctx context.Context, treeId string, header *treepb.TreeHeader, changes []*aclpb.RawChange, f ACLTreeFunc) error {
|
||||
log.
|
||||
With(zap.String("treeId", treeId)).
|
||||
With(zap.String("treeId", treeId), zap.Int("len(changes)", len(changes))).
|
||||
Debug("adding tree with changes")
|
||||
|
||||
_, err := s.treeProvider.CreateTreeStorage(treeId, header, changes)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user