diff --git a/pkg/acl/aclchanges/change.go b/pkg/acl/aclchanges/change.go index be08fca2..3d3b4376 100644 --- a/pkg/acl/aclchanges/change.go +++ b/pkg/acl/aclchanges/change.go @@ -1,11 +1,11 @@ package aclchanges import ( - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/gogo/protobuf/proto" ) type Change interface { - ProtoChange() *aclpb.ACLChange + ProtoChange() proto.Marshaler DecryptedChangeContent() []byte Signature() []byte CID() string diff --git a/pkg/acl/acltree/change.go b/pkg/acl/acltree/change.go index c768ef23..7ad71209 100644 --- a/pkg/acl/acltree/change.go +++ b/pkg/acl/acltree/change.go @@ -80,7 +80,7 @@ func NewACLChange(id string, ch *aclpb.ACLChange) *Change { } } -func (ch *Change) ProtoChange() *aclpb.ACLChange { +func (ch *Change) ProtoChange() proto.Marshaler { return ch.Content } diff --git a/pkg/acl/tree/acltree.go b/pkg/acl/tree/acltree.go index caefb1fb..c0724c97 100644 --- a/pkg/acl/tree/acltree.go +++ b/pkg/acl/tree/acltree.go @@ -111,7 +111,9 @@ func BuildACLTreeWithIdentity(t treestorage.TreeStorage, acc *account.AccountDat return nil, err } - listener.Rebuild(aclTree) + if listener != nil { + listener.Rebuild(aclTree) + } return aclTree, nil } @@ -151,7 +153,9 @@ func BuildACLTree(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder, l return nil, err } - listener.Rebuild(aclTree) + if listener != nil { + listener.Rebuild(aclTree) + } return aclTree, nil } @@ -259,13 +263,28 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha var mode Mode var changes []*Change // TODO: = addChangesBuf[:0] ... - for _, ch := range rawChanges { + var notSeenIdx []int + prevHeads := a.tree.Heads() + for idx, ch := range rawChanges { + if a.HasChange(ch.Id) { + continue + } + change, err := NewFromRawChange(ch) // TODO: think what if we will have incorrect signatures on rawChanges, how everything will work if err != nil { continue } changes = append(changes, change) + notSeenIdx = append(notSeenIdx, idx) + } + + if len(notSeenIdx) == 0 { + return AddResult{ + OldHeads: prevHeads, + Heads: prevHeads, + Summary: AddResultSummaryNothing, + }, nil } defer func() { @@ -286,6 +305,7 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha if a.updateListener == nil { return } + switch mode { case Append: a.updateListener.Update(a) @@ -298,9 +318,10 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha getAddedChanges := func() []*aclpb.RawChange { var added []*aclpb.RawChange - for _, ch := range rawChanges { - if _, exists := a.tree.attached[ch.Id]; exists { - added = append(added, ch) + for _, idx := range notSeenIdx { + rawChange := rawChanges[idx] + if _, exists := a.tree.attached[rawChange.Id]; exists { + added = append(added, rawChange) } } return added @@ -317,7 +338,6 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha } } - prevHeads := a.tree.Heads() rebuild := func() (AddResult, error) { err = a.rebuildFromStorage() if err != nil { @@ -337,7 +357,7 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha case Nothing: for _, ch := range changes { // rebuilding if the snapshot is different from the root - if ch.SnapshotId != a.tree.RootId() { + if ch.SnapshotId != a.tree.RootId() && ch.SnapshotId != "" { return rebuild() } } diff --git a/pkg/acl/tree/change.go b/pkg/acl/tree/change.go index 3ee4d05d..ac0a55c2 100644 --- a/pkg/acl/tree/change.go +++ b/pkg/acl/tree/change.go @@ -29,9 +29,8 @@ type Change struct { Sign []byte } -func (ch *Change) ProtoChange() *aclpb.ACLChange { - //TODO implement me - panic("implement me") +func (ch *Change) ProtoChange() proto.Marshaler { + return ch.Content } func (ch *Change) DecryptContents(key *symmetric.Key) error { diff --git a/pkg/acl/tree/changebuilder.go b/pkg/acl/tree/changebuilder.go index fc390f4a..98961e4b 100644 --- a/pkg/acl/tree/changebuilder.go +++ b/pkg/acl/tree/changebuilder.go @@ -101,6 +101,10 @@ func (c *aclChangeBuilder) BuildAndApply() (*Change, []byte, error) { Timestamp: int64(time.Now().Nanosecond()), Identity: c.acc.Identity, } + if c.aclState.currentReadKeyHash == 0 { + // setting IsSnapshot for initial change + aclChange.IsSnapshot = true + } marshalledData, err := proto.Marshal(c.aclData) if err != nil { diff --git a/pkg/acl/tree/doctree.go b/pkg/acl/tree/doctree.go index bd056a72..00960554 100644 --- a/pkg/acl/tree/doctree.go +++ b/pkg/acl/tree/doctree.go @@ -74,7 +74,9 @@ func BuildDocTreeWithIdentity(t treestorage.TreeStorage, acc *account.AccountDat return nil, err } - listener.Rebuild(docTree) + if listener != nil { + listener.Rebuild(docTree) + } return docTree, nil } @@ -111,7 +113,9 @@ func BuildDocTree(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder, l return nil, err } - listener.Rebuild(docTree) + if listener != nil { + listener.Rebuild(docTree) + } return docTree, nil } @@ -177,6 +181,7 @@ func (d *docTree) AddContent(ctx context.Context, aclTree ACLTree, content proto CurrentReadKeyHash: state.currentReadKeyHash, Timestamp: int64(time.Now().Nanosecond()), Identity: d.accountData.Identity, + IsSnapshot: isSnapshot, } marshalledData, err := content.Marshal() @@ -234,13 +239,28 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclTree ACLTree, rawChanges var mode Mode var changes []*Change // TODO: = addChangesBuf[:0] ... - for _, ch := range rawChanges { + var notSeenIdx []int + prevHeads := d.tree.Heads() + for idx, ch := range rawChanges { + if d.HasChange(ch.Id) { + continue + } + change, err := NewFromRawChange(ch) // TODO: think what if we will have incorrect signatures on rawChanges, how everything will work if err != nil { continue } changes = append(changes, change) + notSeenIdx = append(notSeenIdx, idx) + } + + if len(notSeenIdx) == 0 { + return AddResult{ + OldHeads: prevHeads, + Heads: prevHeads, + Summary: AddResultSummaryNothing, + }, nil } defer func() { @@ -274,15 +294,15 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclTree ACLTree, rawChanges getAddedChanges := func() []*aclpb.RawChange { var added []*aclpb.RawChange - for _, ch := range rawChanges { - if _, exists := d.tree.attached[ch.Id]; exists { - added = append(added, ch) + for _, idx := range notSeenIdx { + rawChange := rawChanges[idx] + if _, exists := d.tree.attached[rawChange.Id]; exists { + added = append(added, rawChange) } } return added } - prevHeads := d.tree.Heads() rebuild := func() (AddResult, error) { err = d.rebuildFromStorage(aclTree) if err != nil { @@ -313,7 +333,7 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclTree ACLTree, rawChanges case Nothing: for _, ch := range changes { // rebuilding if the snapshot is different from the root - if ch.SnapshotId != d.tree.RootId() { + if ch.SnapshotId != d.tree.RootId() && ch.SnapshotId != "" { return rebuild() } } diff --git a/pkg/acl/tree/treebuilder.go b/pkg/acl/tree/treebuilder.go index b243c0b4..5376bee6 100644 --- a/pkg/acl/tree/treebuilder.go +++ b/pkg/acl/tree/treebuilder.go @@ -10,6 +10,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/gogo/protobuf/proto" + "go.uber.org/zap" "time" ) @@ -52,6 +53,7 @@ func (tb *treeBuilder) Build(fromStart bool) (*Tree, error) { headsAndOrphans = append(headsAndOrphans, orphans...) headsAndOrphans = append(headsAndOrphans, heads...) + log.With(zap.Strings("heads", heads), zap.Strings("orphans", orphans)).Debug("building tree") if fromStart { if err := tb.buildTreeFromStart(headsAndOrphans); err != nil { return nil, fmt.Errorf("buildTree error: %v", err) @@ -67,7 +69,7 @@ func (tb *treeBuilder) Build(fromStart bool) (*Tree, error) { } } - tb.cache = nil + tb.cache = make(map[string]*Change) return tb.tree, nil } @@ -187,8 +189,8 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) { return nil, err } - tb.cache[id] = NewChange(id, verifiedChange) - + ch = NewChange(id, verifiedChange) + tb.cache[id] = ch return ch, nil } diff --git a/pkg/acl/tree/treestorage.go b/pkg/acl/tree/treestorage.go index 297f1977..a407a374 100644 --- a/pkg/acl/tree/treestorage.go +++ b/pkg/acl/tree/treestorage.go @@ -35,7 +35,7 @@ func CreateNewTreeStorageWithACL( Signature: change.Signature(), Id: change.CID(), } - header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_ACLTree) + header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_ACLTree, "") if err != nil { return nil, err } @@ -64,6 +64,7 @@ func CreateNewTreeStorage( CurrentReadKeyHash: state.currentReadKeyHash, Timestamp: int64(time.Now().Nanosecond()), Identity: acc.Identity, + IsSnapshot: true, } marshalledData, err := content.Marshal() @@ -84,7 +85,7 @@ func CreateNewTreeStorage( if err != nil { return nil, err } - id, err := cid.NewCIDFromBytes(fullMarshalledChange) + changeId, err := cid.NewCIDFromBytes(fullMarshalledChange) if err != nil { return nil, err } @@ -92,29 +93,30 @@ func CreateNewTreeStorage( rawChange := &aclpb.RawChange{ Payload: fullMarshalledChange, Signature: signature, - Id: id, + Id: changeId, } - header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_DocTree) + header, treeId, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_DocTree, aclTree.ID()) if err != nil { return nil, err } - thr, err := create(id, header, []*aclpb.RawChange{rawChange}) + thr, err := create(treeId, header, []*aclpb.RawChange{rawChange}) if err != nil { return nil, err } - err = thr.SetHeads([]string{id}) + err = thr.SetHeads([]string{changeId}) if err != nil { return nil, err } return thr, nil } -func createTreeHeaderAndId(change *aclpb.RawChange, treeType treepb.TreeHeaderTreeType) (*treepb.TreeHeader, string, error) { +func createTreeHeaderAndId(change *aclpb.RawChange, treeType treepb.TreeHeaderTreeType, aclTreeId string) (*treepb.TreeHeader, string, error) { header := &treepb.TreeHeader{ FirstChangeId: change.Id, Type: treeType, + AclTreeId: aclTreeId, } marshalledHeader, err := proto.Marshal(header) if err != nil { diff --git a/pkg/acl/treestorage/inmemory.go b/pkg/acl/treestorage/inmemory.go index adfa9af8..7025a701 100644 --- a/pkg/acl/treestorage/inmemory.go +++ b/pkg/acl/treestorage/inmemory.go @@ -7,7 +7,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" - "github.com/gogo/protobuf/proto" "sync" ) @@ -106,9 +105,8 @@ func (t *inMemoryTreeStorage) AddChange(change aclchanges.Change) error { defer t.Unlock() signature := change.Signature() id := change.CID() - aclChange := change.ProtoChange() - fullMarshalledChange, err := proto.Marshal(aclChange) + fullMarshalledChange, err := change.ProtoChange().Marshal() if err != nil { return err } diff --git a/service/document/service.go b/service/document/service.go index 5d6b96b6..12c3c6a1 100644 --- a/service/document/service.go +++ b/service/document/service.go @@ -118,10 +118,8 @@ func (s *service) UpdateDocumentTree(ctx context.Context, id, text string) (err return s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, - TreeId: id, SnapshotPath: snapshotPath, - TreeHeader: header, - })) + }, header, id)) } func (s *service) CreateACLTree(ctx context.Context) (id string, err error) { @@ -172,10 +170,8 @@ func (s *service) CreateACLTree(ctx context.Context) (id string, err error) { err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, - TreeId: id, SnapshotPath: snapshotPath, - TreeHeader: header, - })) + }, header, id)) return id, nil } @@ -227,10 +223,8 @@ func (s *service) CreateDocumentTree(ctx context.Context, aclTreeId string, text err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{ Heads: heads, Changes: []*aclpb.RawChange{ch}, - TreeId: id, SnapshotPath: snapshotPath, - TreeHeader: header, - })) + }, header, id)) if err != nil { return "", err } diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index 3ffbda84..4600ab96 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -93,6 +93,11 @@ func (r *requestHandler) HandleHeadUpdate( t := obj.(tree.ACLTree) t.Lock() defer t.Unlock() + + if slice.UnsortedEquals(update.Heads, t.Heads()) { + return nil + } + // TODO: check if we already have those changes result, err = t.AddRawChanges(ctx, update.Changes...) if err != nil { @@ -118,10 +123,15 @@ func (r *requestHandler) HandleHeadUpdate( docTree.Lock() defer docTree.Unlock() - return r.treeCache.Do(ctx, treeId, func(obj interface{}) error { + if slice.UnsortedEquals(update.Heads, docTree.Heads()) { + return nil + } + + return r.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error { aclTree := obj.(tree.ACLTree) aclTree.RLock() defer aclTree.RUnlock() + // TODO: check if we already have those changes result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...) if err != nil { @@ -164,6 +174,7 @@ func (r *requestHandler) HandleHeadUpdate( if err != nil || len(result.Added) == 0 { return err } + log.Info("res", zap.Int("result added", len(result.Added))) // otherwise sending heads update message newUpdate := &syncproto.SyncHeadUpdate{ Heads: result.Heads, @@ -194,6 +205,10 @@ func (r *requestHandler) HandleFullSyncRequest( t.Lock() defer t.Unlock() + //if slice.UnsortedEquals(request.Heads, t.Heads()) { + // return nil + //} + // TODO: check if we already have those changes // if we have non-empty request if len(request.Heads) != 0 { @@ -212,12 +227,17 @@ func (r *requestHandler) HandleFullSyncRequest( } requestDocTree := func() { + log.Info("getting doc tree from treeCache", zap.String("treeId", treeId)) err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { docTree := obj.(tree.DocTree) docTree.Lock() defer docTree.Unlock() - return r.treeCache.Do(ctx, treeId, func(obj interface{}) error { + //if slice.UnsortedEquals(request.Heads, docTree.Heads()) { + // return nil + //} + log.Info("getting tree from treeCache", zap.String("aclId", docTree.Header().AclTreeId)) + return r.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error { aclTree := obj.(tree.ACLTree) aclTree.RLock() defer aclTree.RUnlock() @@ -285,6 +305,11 @@ func (r *requestHandler) HandleFullSyncResponse( t := obj.(tree.ACLTree) t.Lock() defer t.Unlock() + + if slice.UnsortedEquals(response.Heads, t.Heads()) { + return nil + } + // TODO: check if we already have those changes result, err = t.AddRawChanges(ctx, response.Changes...) if err != nil { @@ -301,7 +326,11 @@ func (r *requestHandler) HandleFullSyncResponse( docTree.Lock() defer docTree.Unlock() - return r.treeCache.Do(ctx, treeId, func(obj interface{}) error { + if slice.UnsortedEquals(response.Heads, docTree.Heads()) { + return nil + } + + return r.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error { aclTree := obj.(tree.ACLTree) aclTree.RLock() defer aclTree.RUnlock() diff --git a/service/treecache/service.go b/service/treecache/service.go index a6d215ba..37b58f21 100644 --- a/service/treecache/service.go +++ b/service/treecache/service.go @@ -99,6 +99,7 @@ func (s *service) loadTree(ctx context.Context, id string) (ocache.Object, error default: return nil, fmt.Errorf("incorrect type") } + log.Info("got header", zap.String("header", header.String())) var docTree tree.DocTree // TODO: it is a question if we need to use ACLTree on the first tree build, because we can think that the tree is already validated err = s.Do(ctx, header.AclTreeId, func(obj interface{}) error {