Start using raw changes + wip request handler
This commit is contained in:
parent
e7469c848c
commit
5b821c0d2a
@ -19,10 +19,16 @@ const (
|
|||||||
type AddResult struct {
|
type AddResult struct {
|
||||||
OldHeads []string
|
OldHeads []string
|
||||||
Heads []string
|
Heads []string
|
||||||
|
Added []*aclpb.RawChange
|
||||||
// TODO: add summary for changes
|
// TODO: add summary for changes
|
||||||
Summary AddResultSummary
|
Summary AddResultSummary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HeadWithPathToRoot struct {
|
||||||
|
Id string
|
||||||
|
Path []string
|
||||||
|
}
|
||||||
|
|
||||||
type TreeUpdateListener interface {
|
type TreeUpdateListener interface {
|
||||||
Update(tree ACLTree)
|
Update(tree ACLTree)
|
||||||
Rebuild(tree ACLTree)
|
Rebuild(tree ACLTree)
|
||||||
@ -37,13 +43,13 @@ func (n NoOpListener) Rebuild(tree ACLTree) {}
|
|||||||
type ACLTree interface {
|
type ACLTree interface {
|
||||||
ACLState() *ACLState
|
ACLState() *ACLState
|
||||||
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error)
|
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error)
|
||||||
AddChanges(ctx context.Context, changes ...*Change) (AddResult, error)
|
|
||||||
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
|
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
|
||||||
Heads() []string
|
Heads() []string
|
||||||
Root() *Change
|
Root() *Change
|
||||||
Iterate(func(change *Change) bool)
|
Iterate(func(change *Change) bool)
|
||||||
IterateFrom(string, func(change *Change) bool)
|
IterateFrom(string, func(change *Change) bool)
|
||||||
HasChange(string) bool
|
HasChange(string) bool
|
||||||
|
HeadsPathToRoot() []HeadWithPathToRoot
|
||||||
|
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
@ -247,25 +253,21 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) {
|
func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) {
|
||||||
var aclChanges []*Change
|
a.Lock()
|
||||||
|
// TODO: make proper error handling, because there are a lot of corner cases where this will break
|
||||||
|
var err error
|
||||||
|
var mode Mode
|
||||||
|
|
||||||
|
var changes []*Change
|
||||||
for _, ch := range rawChanges {
|
for _, ch := range rawChanges {
|
||||||
change, err := NewFromRawChange(ch)
|
change, err := NewFromRawChange(ch)
|
||||||
// TODO: think what if we will have incorrect signatures on rawChanges, how everything will work
|
// TODO: think what if we will have incorrect signatures on rawChanges, how everything will work
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
aclChanges = append(aclChanges, change)
|
changes = append(changes, change)
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.AddChanges(ctx, aclChanges...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult, error) {
|
|
||||||
a.Lock()
|
|
||||||
// TODO: make proper error handling, because there are a lot of corner cases where this will break
|
|
||||||
var err error
|
|
||||||
var mode Mode
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -292,6 +294,16 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
getAddedChanges := func() []*aclpb.RawChange {
|
||||||
|
var added []*aclpb.RawChange
|
||||||
|
for _, ch := range rawChanges {
|
||||||
|
if _, exists := a.fullTree.attached[ch.Id]; exists {
|
||||||
|
added = append(added, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return added
|
||||||
|
}
|
||||||
|
|
||||||
for _, ch := range changes {
|
for _, ch := range changes {
|
||||||
err = a.treeStorage.AddChange(ch)
|
err = a.treeStorage.AddChange(ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -322,6 +334,7 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult
|
|||||||
return AddResult{
|
return AddResult{
|
||||||
OldHeads: prevHeads,
|
OldHeads: prevHeads,
|
||||||
Heads: a.fullTree.Heads(),
|
Heads: a.fullTree.Heads(),
|
||||||
|
Added: getAddedChanges(),
|
||||||
Summary: AddResultSummaryRebuild,
|
Summary: AddResultSummaryRebuild,
|
||||||
}, nil
|
}, nil
|
||||||
default:
|
default:
|
||||||
@ -335,6 +348,7 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult
|
|||||||
return AddResult{
|
return AddResult{
|
||||||
OldHeads: prevHeads,
|
OldHeads: prevHeads,
|
||||||
Heads: a.fullTree.Heads(),
|
Heads: a.fullTree.Heads(),
|
||||||
|
Added: getAddedChanges(),
|
||||||
Summary: AddResultSummaryAppend,
|
Summary: AddResultSummaryAppend,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -376,3 +390,28 @@ func (a *aclTree) Root() *Change {
|
|||||||
func (a *aclTree) Close() error {
|
func (a *aclTree) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *aclTree) HeadsPathToRoot() []HeadWithPathToRoot {
|
||||||
|
a.RLock()
|
||||||
|
defer a.RUnlock()
|
||||||
|
var headsWithPath []HeadWithPathToRoot
|
||||||
|
for _, h := range a.fullTree.Heads() {
|
||||||
|
headWithPath := HeadWithPathToRoot{
|
||||||
|
Id: h,
|
||||||
|
}
|
||||||
|
var path []string
|
||||||
|
// TODO: think that the user may have not all of the snapshots locally
|
||||||
|
currentSnapshotId := a.fullTree.attached[h].SnapshotId
|
||||||
|
for currentSnapshotId != "" {
|
||||||
|
sn, err := a.treeBuilder.loadChange(currentSnapshotId)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
path = append(path, currentSnapshotId)
|
||||||
|
currentSnapshotId = sn.SnapshotId
|
||||||
|
}
|
||||||
|
headWithPath.Path = path
|
||||||
|
headsWithPath = append(headsWithPath, headWithPath)
|
||||||
|
}
|
||||||
|
return headsWithPath
|
||||||
|
}
|
||||||
|
|||||||
@ -85,16 +85,7 @@ func TestACLTree_UserJoinUpdate_Append(t *testing.T) {
|
|||||||
t.Fatalf("should Build acl ACLState without err: %v", err)
|
t.Fatalf("should Build acl ACLState without err: %v", err)
|
||||||
}
|
}
|
||||||
rawChanges := thr.GetUpdates("append")
|
rawChanges := thr.GetUpdates("append")
|
||||||
var changes []*Change
|
res, err := tree.AddRawChanges(context.Background(), rawChanges...)
|
||||||
for _, ch := range rawChanges {
|
|
||||||
newCh, err := NewFromRawChange(ch)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("should be able to create change from raw: %v", err)
|
|
||||||
}
|
|
||||||
changes = append(changes, newCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := tree.AddChanges(context.Background(), changes...)
|
|
||||||
assert.Equal(t, res.Summary, AddResultSummaryAppend)
|
assert.Equal(t, res.Summary, AddResultSummaryAppend)
|
||||||
|
|
||||||
aclState := tree.ACLState()
|
aclState := tree.ACLState()
|
||||||
@ -135,16 +126,7 @@ func TestACLTree_UserJoinUpdate_Rebuild(t *testing.T) {
|
|||||||
t.Fatalf("should Build acl ACLState without err: %v", err)
|
t.Fatalf("should Build acl ACLState without err: %v", err)
|
||||||
}
|
}
|
||||||
rawChanges := thr.GetUpdates("rebuild")
|
rawChanges := thr.GetUpdates("rebuild")
|
||||||
var changes []*Change
|
res, err := tree.AddRawChanges(context.Background(), rawChanges...)
|
||||||
for _, ch := range rawChanges {
|
|
||||||
newCh, err := NewFromRawChange(ch)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("should be able to create change from raw: %v", err)
|
|
||||||
}
|
|
||||||
changes = append(changes, newCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := tree.AddChanges(context.Background(), changes...)
|
|
||||||
assert.Equal(t, res.Summary, AddResultSummaryRebuild)
|
assert.Equal(t, res.Summary, AddResultSummaryRebuild)
|
||||||
|
|
||||||
aclState := tree.ACLState()
|
aclState := tree.ACLState()
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package sync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
|
||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||||
@ -13,19 +14,44 @@ type requestHander struct {
|
|||||||
client SyncClient
|
client SyncClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) error {
|
func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) {
|
||||||
err := r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
|
var fullRequest *syncpb.SyncFullRequest
|
||||||
_, err := tree.AddRawChanges(ctx, update.Changes...)
|
var addedChanges []*aclpb.RawChange
|
||||||
|
var headsWithPath []acltree.HeadWithPathToRoot
|
||||||
|
defer func() {
|
||||||
|
if err != nil || fullRequest != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newUpdate := syncpb.NewHeadsUpdate(update.TreeId, headsWithPath, addedChanges)
|
||||||
|
err = r.client.NotifyHeadsChanged(newUpdate)
|
||||||
|
}()
|
||||||
|
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
|
||||||
|
// TODO: check if we already have those changes
|
||||||
|
res, err := tree.AddRawChanges(ctx, update.Changes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
addedChanges = res.Added
|
||||||
shouldFullSync := !r.compareHeads(update.Heads, tree.Heads())
|
shouldFullSync := !r.compareHeads(update.Heads, tree.Heads())
|
||||||
|
if shouldFullSync {
|
||||||
|
fullRequest, err = r.prepareFullSyncRequest(tree)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
headsWithPath = tree.HeadsPathToRoot()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if fullRequest != nil {
|
||||||
|
return r.client.RequestFullSync(senderId, fullRequest)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *requestHander) HandleFullSync(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,5 +65,5 @@ func (r *requestHander) compareHeads(syncHeads []*syncpb.SyncHead, heads []strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestHander) prepareFullSyncRequest(tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
|
func (r *requestHander) prepareFullSyncRequest(tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|||||||
22
service/sync/syncpb/pbutils.go
Normal file
22
service/sync/syncpb/pbutils.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package syncpb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||||
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewHeadsUpdate(treeId string, headsWithPath []acltree.HeadWithPathToRoot, changes []*aclpb.RawChange) *SyncHeadUpdate {
|
||||||
|
var heads []*SyncHead
|
||||||
|
for _, headWithPath := range headsWithPath {
|
||||||
|
syncHead := &SyncHead{
|
||||||
|
Id: headWithPath.Id,
|
||||||
|
SnapshotPath: headWithPath.Path,
|
||||||
|
}
|
||||||
|
heads = append(heads, syncHead)
|
||||||
|
}
|
||||||
|
return &SyncHeadUpdate{
|
||||||
|
Heads: heads,
|
||||||
|
Changes: changes,
|
||||||
|
TreeId: treeId,
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user