Move treeheader to upper level in sync
This commit is contained in:
parent
45a0e43fe4
commit
19e1b6f22b
@ -283,7 +283,9 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.updateListener != nil {
|
if a.updateListener == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
switch mode {
|
switch mode {
|
||||||
case Append:
|
case Append:
|
||||||
a.updateListener.Update(a)
|
a.updateListener.Update(a)
|
||||||
@ -292,7 +294,6 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
|
|||||||
default:
|
default:
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
getAddedChanges := func() []*aclpb.RawChange {
|
getAddedChanges := func() []*aclpb.RawChange {
|
||||||
@ -317,16 +318,7 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
|
|||||||
}
|
}
|
||||||
|
|
||||||
prevHeads := a.tree.Heads()
|
prevHeads := a.tree.Heads()
|
||||||
mode = a.tree.Add(changes...)
|
rebuild := func() (AddResult, error) {
|
||||||
switch mode {
|
|
||||||
case Nothing:
|
|
||||||
return AddResult{
|
|
||||||
OldHeads: prevHeads,
|
|
||||||
Heads: prevHeads,
|
|
||||||
Summary: AddResultSummaryNothing,
|
|
||||||
}, nil
|
|
||||||
|
|
||||||
case Rebuild:
|
|
||||||
err = a.rebuildFromStorage()
|
err = a.rebuildFromStorage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return AddResult{}, err
|
return AddResult{}, err
|
||||||
@ -338,6 +330,26 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
|
|||||||
Added: getAddedChanges(),
|
Added: getAddedChanges(),
|
||||||
Summary: AddResultSummaryRebuild,
|
Summary: AddResultSummaryRebuild,
|
||||||
}, nil
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mode = a.tree.Add(changes...)
|
||||||
|
switch mode {
|
||||||
|
case Nothing:
|
||||||
|
for _, ch := range changes {
|
||||||
|
// rebuilding if the snapshot is different from the root
|
||||||
|
if ch.SnapshotId != a.tree.RootId() {
|
||||||
|
return rebuild()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return AddResult{
|
||||||
|
OldHeads: prevHeads,
|
||||||
|
Heads: prevHeads,
|
||||||
|
Summary: AddResultSummaryNothing,
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
case Rebuild:
|
||||||
|
return rebuild()
|
||||||
default:
|
default:
|
||||||
// just rebuilding the state from start without reloading everything from tree storage
|
// just rebuilding the state from start without reloading everything from tree storage
|
||||||
// as an optimization we could've started from current heads, but I didn't implement that
|
// as an optimization we could've started from current heads, but I didn't implement that
|
||||||
|
|||||||
@ -165,7 +165,9 @@ func (d *docTree) AddContent(ctx context.Context, aclTree ACLTree, content proto
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
|
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
|
||||||
|
if d.updateListener != nil {
|
||||||
d.updateListener.Update(d)
|
d.updateListener.Update(d)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
state := aclTree.ACLState()
|
state := aclTree.ACLState()
|
||||||
change := &aclpb.Change{
|
change := &aclpb.Change{
|
||||||
@ -256,6 +258,10 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclTree ACLTree, rawChanges
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if d.updateListener == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
switch mode {
|
switch mode {
|
||||||
case Append:
|
case Append:
|
||||||
d.updateListener.Update(d)
|
d.updateListener.Update(d)
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package message
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||||
@ -85,7 +86,7 @@ func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err erro
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.sendAsync(peerId, msgType(msg), marshalled)
|
go s.sendAsync(peerId, msgInfo(msg), marshalled)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,15 +109,16 @@ func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func msgType(content *syncproto.Sync) string {
|
func msgInfo(content *syncproto.Sync) (syncMethod string) {
|
||||||
msg := content.GetMessage()
|
msg := content.GetMessage()
|
||||||
switch {
|
switch {
|
||||||
case msg.GetFullSyncRequest() != nil:
|
case msg.GetFullSyncRequest() != nil:
|
||||||
return "FullSyncRequest"
|
syncMethod = "FullSyncRequest"
|
||||||
case msg.GetFullSyncResponse() != nil:
|
case msg.GetFullSyncResponse() != nil:
|
||||||
return "FullSyncResponse"
|
syncMethod = "FullSyncResponse"
|
||||||
case msg.GetHeadUpdate() != nil:
|
case msg.GetHeadUpdate() != nil:
|
||||||
return "HeadUpdate"
|
syncMethod = "HeadUpdate"
|
||||||
}
|
}
|
||||||
return "UnknownMessage"
|
syncMethod = fmt.Sprintf("method: %s, treeType: %s", syncMethod, content.TreeHeader.Type.String())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -64,26 +64,32 @@ func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string,
|
|||||||
msg := content.GetMessage()
|
msg := content.GetMessage()
|
||||||
switch {
|
switch {
|
||||||
case msg.GetFullSyncRequest() != nil:
|
case msg.GetFullSyncRequest() != nil:
|
||||||
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest())
|
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
|
||||||
case msg.GetFullSyncResponse() != nil:
|
case msg.GetFullSyncResponse() != nil:
|
||||||
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse())
|
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
|
||||||
case msg.GetHeadUpdate() != nil:
|
case msg.GetHeadUpdate() != nil:
|
||||||
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate())
|
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncproto.SyncHeadUpdate) (err error) {
|
func (r *requestHandler) HandleHeadUpdate(
|
||||||
|
ctx context.Context,
|
||||||
|
senderId string,
|
||||||
|
update *syncproto.SyncHeadUpdate,
|
||||||
|
header *treepb.TreeHeader,
|
||||||
|
treeId string) (err error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fullRequest *syncproto.SyncFullRequest
|
fullRequest *syncproto.SyncFullRequest
|
||||||
snapshotPath []string
|
snapshotPath []string
|
||||||
result tree.AddResult
|
result tree.AddResult
|
||||||
)
|
)
|
||||||
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
||||||
Debug("processing head update")
|
Debug("processing head update")
|
||||||
|
|
||||||
updateACLTree := func() {
|
updateACLTree := func() {
|
||||||
err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
t := obj.(tree.ACLTree)
|
t := obj.(tree.ACLTree)
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
@ -97,7 +103,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
shouldFullSync := !slice.UnsortedEquals(update.Heads, t.Heads())
|
shouldFullSync := !slice.UnsortedEquals(update.Heads, t.Heads())
|
||||||
snapshotPath = t.SnapshotPath()
|
snapshotPath = t.SnapshotPath()
|
||||||
if shouldFullSync {
|
if shouldFullSync {
|
||||||
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, t)
|
fullRequest, err = r.prepareFullSyncRequest(treeId, header, update.SnapshotPath, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -107,12 +113,12 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
updateDocTree := func() {
|
updateDocTree := func() {
|
||||||
err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
docTree := obj.(tree.DocTree)
|
docTree := obj.(tree.DocTree)
|
||||||
docTree.Lock()
|
docTree.Lock()
|
||||||
defer docTree.Unlock()
|
defer docTree.Unlock()
|
||||||
|
|
||||||
return r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error {
|
return r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
aclTree := obj.(tree.ACLTree)
|
aclTree := obj.(tree.ACLTree)
|
||||||
aclTree.RLock()
|
aclTree.RLock()
|
||||||
defer aclTree.RUnlock()
|
defer aclTree.RUnlock()
|
||||||
@ -126,7 +132,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads())
|
shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads())
|
||||||
snapshotPath = docTree.SnapshotPath()
|
snapshotPath = docTree.SnapshotPath()
|
||||||
if shouldFullSync {
|
if shouldFullSync {
|
||||||
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, docTree)
|
fullRequest, err = r.prepareFullSyncRequest(treeId, header, update.SnapshotPath, docTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -136,7 +142,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
switch update.TreeHeader.Type {
|
switch header.Type {
|
||||||
case treepb.TreeHeader_ACLTree:
|
case treepb.TreeHeader_ACLTree:
|
||||||
updateACLTree()
|
updateACLTree()
|
||||||
case treepb.TreeHeader_DocTree:
|
case treepb.TreeHeader_DocTree:
|
||||||
@ -148,14 +154,11 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
// if there are no such tree
|
// if there are no such tree
|
||||||
if err == treestorage.ErrUnknownTreeId {
|
if err == treestorage.ErrUnknownTreeId {
|
||||||
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
|
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request
|
||||||
fullRequest = &syncproto.SyncFullRequest{
|
fullRequest = &syncproto.SyncFullRequest{}
|
||||||
TreeId: update.TreeId,
|
|
||||||
TreeHeader: update.TreeHeader,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// if we have incompatible heads, or we haven't seen the tree at all
|
// if we have incompatible heads, or we haven't seen the tree at all
|
||||||
if fullRequest != nil {
|
if fullRequest != nil {
|
||||||
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest))
|
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
|
||||||
}
|
}
|
||||||
// if error or nothing has changed
|
// if error or nothing has changed
|
||||||
if err != nil || len(result.Added) == 0 {
|
if err != nil || len(result.Added) == 0 {
|
||||||
@ -166,23 +169,27 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||||||
Heads: result.Heads,
|
Heads: result.Heads,
|
||||||
Changes: result.Added,
|
Changes: result.Added,
|
||||||
SnapshotPath: snapshotPath,
|
SnapshotPath: snapshotPath,
|
||||||
TreeId: update.TreeId,
|
|
||||||
TreeHeader: update.TreeHeader,
|
|
||||||
}
|
}
|
||||||
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) {
|
func (r *requestHandler) HandleFullSyncRequest(
|
||||||
|
ctx context.Context,
|
||||||
|
senderId string,
|
||||||
|
request *syncproto.SyncFullRequest,
|
||||||
|
header *treepb.TreeHeader,
|
||||||
|
treeId string) (err error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fullResponse *syncproto.SyncFullResponse
|
fullResponse *syncproto.SyncFullResponse
|
||||||
snapshotPath []string
|
snapshotPath []string
|
||||||
result tree.AddResult
|
result tree.AddResult
|
||||||
)
|
)
|
||||||
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
||||||
Debug("processing full sync request")
|
Debug("processing full sync request")
|
||||||
|
|
||||||
requestACLTree := func() {
|
requestACLTree := func() {
|
||||||
err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
t := obj.(tree.ACLTree)
|
t := obj.(tree.ACLTree)
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
@ -196,7 +203,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshotPath = t.SnapshotPath()
|
snapshotPath = t.SnapshotPath()
|
||||||
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, t)
|
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -205,12 +212,12 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
}
|
}
|
||||||
|
|
||||||
requestDocTree := func() {
|
requestDocTree := func() {
|
||||||
err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
docTree := obj.(tree.DocTree)
|
docTree := obj.(tree.DocTree)
|
||||||
docTree.Lock()
|
docTree.Lock()
|
||||||
defer docTree.Unlock()
|
defer docTree.Unlock()
|
||||||
|
|
||||||
return r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error {
|
return r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
aclTree := obj.(tree.ACLTree)
|
aclTree := obj.(tree.ACLTree)
|
||||||
aclTree.RLock()
|
aclTree.RLock()
|
||||||
defer aclTree.RUnlock()
|
defer aclTree.RUnlock()
|
||||||
@ -223,7 +230,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshotPath = docTree.SnapshotPath()
|
snapshotPath = docTree.SnapshotPath()
|
||||||
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, docTree)
|
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Changes, docTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -232,7 +239,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
switch request.TreeHeader.Type {
|
switch header.Type {
|
||||||
case treepb.TreeHeader_ACLTree:
|
case treepb.TreeHeader_ACLTree:
|
||||||
requestACLTree()
|
requestACLTree()
|
||||||
case treepb.TreeHeader_DocTree:
|
case treepb.TreeHeader_DocTree:
|
||||||
@ -244,7 +251,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse))
|
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
|
||||||
// if error or nothing has changed
|
// if error or nothing has changed
|
||||||
if err != nil || len(result.Added) == 0 {
|
if err != nil || len(result.Added) == 0 {
|
||||||
return err
|
return err
|
||||||
@ -255,22 +262,26 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||||||
Heads: result.Heads,
|
Heads: result.Heads,
|
||||||
Changes: result.Added,
|
Changes: result.Added,
|
||||||
SnapshotPath: snapshotPath,
|
SnapshotPath: snapshotPath,
|
||||||
TreeId: request.TreeId,
|
|
||||||
TreeHeader: request.TreeHeader,
|
|
||||||
}
|
}
|
||||||
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) {
|
func (r *requestHandler) HandleFullSyncResponse(
|
||||||
|
ctx context.Context,
|
||||||
|
senderId string,
|
||||||
|
response *syncproto.SyncFullResponse,
|
||||||
|
header *treepb.TreeHeader,
|
||||||
|
treeId string) (err error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
snapshotPath []string
|
snapshotPath []string
|
||||||
result tree.AddResult
|
result tree.AddResult
|
||||||
)
|
)
|
||||||
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
|
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
|
||||||
Debug("processing full sync response")
|
Debug("processing full sync response")
|
||||||
|
|
||||||
responseACLTree := func() {
|
responseACLTree := func() {
|
||||||
err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
t := obj.(tree.ACLTree)
|
t := obj.(tree.ACLTree)
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
@ -285,12 +296,12 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
|||||||
}
|
}
|
||||||
|
|
||||||
responseDocTree := func() {
|
responseDocTree := func() {
|
||||||
err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
|
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
docTree := obj.(tree.DocTree)
|
docTree := obj.(tree.DocTree)
|
||||||
docTree.Lock()
|
docTree.Lock()
|
||||||
defer docTree.Unlock()
|
defer docTree.Unlock()
|
||||||
|
|
||||||
return r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error {
|
return r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
|
||||||
aclTree := obj.(tree.ACLTree)
|
aclTree := obj.(tree.ACLTree)
|
||||||
aclTree.RLock()
|
aclTree.RLock()
|
||||||
defer aclTree.RUnlock()
|
defer aclTree.RUnlock()
|
||||||
@ -305,7 +316,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
switch response.TreeHeader.Type {
|
switch header.Type {
|
||||||
case treepb.TreeHeader_ACLTree:
|
case treepb.TreeHeader_ACLTree:
|
||||||
responseACLTree()
|
responseACLTree()
|
||||||
case treepb.TreeHeader_DocTree:
|
case treepb.TreeHeader_DocTree:
|
||||||
@ -320,7 +331,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
|||||||
}
|
}
|
||||||
// if we have a new tree
|
// if we have a new tree
|
||||||
if err == treestorage.ErrUnknownTreeId {
|
if err == treestorage.ErrUnknownTreeId {
|
||||||
err = r.createTree(ctx, response)
|
err = r.createTree(ctx, response, header, treeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -335,9 +346,8 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
|||||||
Heads: result.Heads,
|
Heads: result.Heads,
|
||||||
Changes: result.Added,
|
Changes: result.Added,
|
||||||
SnapshotPath: snapshotPath,
|
SnapshotPath: snapshotPath,
|
||||||
TreeId: response.TreeId,
|
|
||||||
}
|
}
|
||||||
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
|
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) {
|
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) {
|
||||||
@ -348,9 +358,7 @@ func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.Tr
|
|||||||
return &syncproto.SyncFullRequest{
|
return &syncproto.SyncFullRequest{
|
||||||
Heads: t.Heads(),
|
Heads: t.Heads(),
|
||||||
Changes: ourChanges,
|
Changes: ourChanges,
|
||||||
TreeId: treeId,
|
|
||||||
SnapshotPath: t.SnapshotPath(),
|
SnapshotPath: t.SnapshotPath(),
|
||||||
TreeHeader: header,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,17 +390,20 @@ func (r *requestHandler) prepareFullSyncResponse(
|
|||||||
return &syncproto.SyncFullResponse{
|
return &syncproto.SyncFullResponse{
|
||||||
Heads: t.Heads(),
|
Heads: t.Heads(),
|
||||||
Changes: final,
|
Changes: final,
|
||||||
TreeId: treeId,
|
|
||||||
SnapshotPath: t.SnapshotPath(),
|
SnapshotPath: t.SnapshotPath(),
|
||||||
TreeHeader: t.Header(),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHandler) createTree(ctx context.Context, response *syncproto.SyncFullResponse) error {
|
func (r *requestHandler) createTree(
|
||||||
|
ctx context.Context,
|
||||||
|
response *syncproto.SyncFullResponse,
|
||||||
|
header *treepb.TreeHeader,
|
||||||
|
treeId string) error {
|
||||||
|
|
||||||
return r.treeCache.Add(
|
return r.treeCache.Add(
|
||||||
ctx,
|
ctx,
|
||||||
response.TreeId,
|
treeId,
|
||||||
response.TreeHeader,
|
header,
|
||||||
response.Changes,
|
response.Changes,
|
||||||
func(obj interface{}) error {
|
func(obj interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -6,7 +6,6 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||||
@ -19,7 +18,6 @@ const CName = "treecache"
|
|||||||
|
|
||||||
// TODO: add context
|
// TODO: add context
|
||||||
type TreeFunc = func(tree interface{}) error
|
type TreeFunc = func(tree interface{}) error
|
||||||
type ChangeBuildFunc = func(builder acltree.ChangeBuilder) error
|
|
||||||
|
|
||||||
var log = logger.NewNamed("treecache")
|
var log = logger.NewNamed("treecache")
|
||||||
|
|
||||||
|
|||||||
@ -1,19 +1,33 @@
|
|||||||
package syncproto
|
package syncproto
|
||||||
|
|
||||||
func WrapHeadUpdate(update *SyncHeadUpdate) *Sync {
|
import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
|
||||||
return &Sync{Message: &SyncContentValue{
|
|
||||||
|
func WrapHeadUpdate(update *SyncHeadUpdate, header *treepb.TreeHeader, treeId string) *Sync {
|
||||||
|
return &Sync{
|
||||||
|
Message: &SyncContentValue{
|
||||||
Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
|
Value: &SyncContentValueValueOfHeadUpdate{HeadUpdate: update},
|
||||||
}}
|
},
|
||||||
|
TreeHeader: header,
|
||||||
|
TreeId: treeId,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WrapFullRequest(request *SyncFullRequest) *Sync {
|
func WrapFullRequest(request *SyncFullRequest, header *treepb.TreeHeader, treeId string) *Sync {
|
||||||
return &Sync{Message: &SyncContentValue{
|
return &Sync{
|
||||||
|
Message: &SyncContentValue{
|
||||||
Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
|
Value: &SyncContentValueValueOfFullSyncRequest{FullSyncRequest: request},
|
||||||
}}
|
},
|
||||||
|
TreeHeader: header,
|
||||||
|
TreeId: treeId,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WrapFullResponse(response *SyncFullResponse) *Sync {
|
func WrapFullResponse(response *SyncFullResponse, header *treepb.TreeHeader, treeId string) *Sync {
|
||||||
return &Sync{Message: &SyncContentValue{
|
return &Sync{
|
||||||
|
Message: &SyncContentValue{
|
||||||
Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
|
Value: &SyncContentValueValueOfFullSyncResponse{FullSyncResponse: response},
|
||||||
}}
|
},
|
||||||
|
TreeHeader: header,
|
||||||
|
TreeId: treeId,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -53,6 +53,8 @@ message System {
|
|||||||
message Sync {
|
message Sync {
|
||||||
string spaceId = 1;
|
string spaceId = 1;
|
||||||
ContentValue message = 2;
|
ContentValue message = 2;
|
||||||
|
tree.TreeHeader treeHeader = 3;
|
||||||
|
string treeId = 4;
|
||||||
|
|
||||||
message ContentValue {
|
message ContentValue {
|
||||||
oneof value {
|
oneof value {
|
||||||
@ -65,9 +67,7 @@ message Sync {
|
|||||||
message HeadUpdate {
|
message HeadUpdate {
|
||||||
repeated string heads = 1;
|
repeated string heads = 1;
|
||||||
repeated acl.RawChange changes = 2;
|
repeated acl.RawChange changes = 2;
|
||||||
string treeId = 3;
|
repeated string snapshotPath = 3;
|
||||||
repeated string snapshotPath = 4;
|
|
||||||
tree.TreeHeader treeHeader = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Full {
|
message Full {
|
||||||
@ -75,17 +75,13 @@ message Sync {
|
|||||||
message Request {
|
message Request {
|
||||||
repeated string heads = 1;
|
repeated string heads = 1;
|
||||||
repeated acl.RawChange changes = 2;
|
repeated acl.RawChange changes = 2;
|
||||||
string treeId = 3;
|
repeated string snapshotPath = 3;
|
||||||
repeated string snapshotPath = 4;
|
|
||||||
tree.TreeHeader treeHeader = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Response {
|
message Response {
|
||||||
repeated string heads = 1;
|
repeated string heads = 1;
|
||||||
repeated acl.RawChange changes = 2;
|
repeated acl.RawChange changes = 2;
|
||||||
string treeId = 3;
|
repeated string snapshotPath = 3;
|
||||||
repeated string snapshotPath = 4;
|
|
||||||
tree.TreeHeader treeHeader = 5;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
1163
syncproto/sync.pb.go
1163
syncproto/sync.pb.go
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user