Add sync tree and remove head send from sync handler
This commit is contained in:
parent
bd0f419fab
commit
404f128310
@ -47,9 +47,8 @@ func (s *syncHandler) HandleHeadUpdate(
|
||||
treeId string) (err error) {
|
||||
|
||||
var (
|
||||
fullRequest *spacesyncproto.ObjectFullSyncRequest
|
||||
snapshotPath []string
|
||||
result tree.AddResult
|
||||
fullRequest *spacesyncproto.ObjectFullSyncRequest
|
||||
result tree.AddResult
|
||||
// in case update changes are empty then we want to sync the whole tree
|
||||
sendChangesOnFullSync = len(update.Changes) == 0
|
||||
)
|
||||
@ -76,7 +75,6 @@ func (s *syncHandler) HandleHeadUpdate(
|
||||
|
||||
// if we couldn't add all the changes
|
||||
shouldFullSync := len(update.Changes) != len(result.Added)
|
||||
snapshotPath = objTree.SnapshotPath()
|
||||
if shouldFullSync {
|
||||
fullRequest, err = s.prepareFullSyncRequest(objTree, sendChangesOnFullSync)
|
||||
if err != nil {
|
||||
@ -94,18 +92,7 @@ func (s *syncHandler) HandleHeadUpdate(
|
||||
if fullRequest != nil {
|
||||
return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId))
|
||||
}
|
||||
// if error or nothing has changed
|
||||
if err != nil || len(result.Added) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
// otherwise sending heads update message
|
||||
newUpdate := &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: result.Heads,
|
||||
Changes: result.Added,
|
||||
SnapshotPath: snapshotPath,
|
||||
}
|
||||
return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncHandler) HandleFullSyncRequest(
|
||||
@ -148,10 +135,7 @@ func (s *syncHandler) HandleFullSyncResponse(
|
||||
header *aclpb.TreeHeader,
|
||||
treeId string) (err error) {
|
||||
|
||||
var (
|
||||
snapshotPath []string
|
||||
result tree.AddResult
|
||||
)
|
||||
var result tree.AddResult
|
||||
|
||||
res, err := s.treeCache.GetTree(ctx, treeId)
|
||||
if err != nil {
|
||||
@ -159,24 +143,21 @@ func (s *syncHandler) HandleFullSyncResponse(
|
||||
}
|
||||
|
||||
err = func() error {
|
||||
syncTree := res.Tree
|
||||
syncTree.Lock()
|
||||
objTree := res.Tree
|
||||
objTree.Lock()
|
||||
defer res.Release()
|
||||
defer syncTree.Unlock()
|
||||
defer objTree.Unlock()
|
||||
|
||||
// if we already have the heads for whatever reason
|
||||
if slice.UnsortedEquals(response.Heads, syncTree.Heads()) {
|
||||
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTree -> syncService: HeadUpdate()
|
||||
// AddRawChanges -> syncTree.addRawChanges(); syncService.HeadUpdate()
|
||||
result, err = syncTree.AddRawChanges(ctx, response.Changes...)
|
||||
result, err = objTree.AddRawChanges(ctx, response.Changes...)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
snapshotPath = syncTree.SnapshotPath()
|
||||
return nil
|
||||
}()
|
||||
|
||||
@ -186,23 +167,9 @@ func (s *syncHandler) HandleFullSyncResponse(
|
||||
}
|
||||
// if we have a new tree
|
||||
if err == storage.ErrUnknownTreeId {
|
||||
err = s.addTree(ctx, response, header, treeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = tree.AddResult{
|
||||
OldHeads: []string{},
|
||||
Heads: response.Heads,
|
||||
Added: response.Changes,
|
||||
}
|
||||
return s.addTree(ctx, response, header, treeId)
|
||||
}
|
||||
// sending heads update message
|
||||
newUpdate := &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: result.Heads,
|
||||
Changes: result.Added,
|
||||
SnapshotPath: snapshotPath,
|
||||
}
|
||||
return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) {
|
||||
|
||||
151
common/commonspace/synctree/synctree.go
Normal file
151
common/commonspace/synctree/synctree.go
Normal file
@ -0,0 +1,151 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
|
||||
)
|
||||
|
||||
type SyncTree struct {
|
||||
objTree tree.ObjectTree
|
||||
syncService syncservice.SyncService
|
||||
}
|
||||
|
||||
func CreateSyncTree(
|
||||
syncService syncservice.SyncService,
|
||||
payload tree.ObjectTreeCreatePayload,
|
||||
listener tree.ObjectTreeUpdateListener,
|
||||
aclList list.ACLList,
|
||||
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
|
||||
t, err = tree.CreateObjectTree(payload, listener, aclList, createStorage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: use context where it is needed
|
||||
err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: t.Heads(),
|
||||
SnapshotPath: t.SnapshotPath(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func BuildSyncTree(
|
||||
syncService syncservice.SyncService,
|
||||
treeStorage storage.TreeStorage,
|
||||
listener tree.ObjectTreeUpdateListener,
|
||||
aclList list.ACLList) (t tree.ObjectTree, err error) {
|
||||
return buildSyncTree(syncService, treeStorage, listener, aclList)
|
||||
}
|
||||
|
||||
func buildSyncTree(
|
||||
syncService syncservice.SyncService,
|
||||
treeStorage storage.TreeStorage,
|
||||
listener tree.ObjectTreeUpdateListener,
|
||||
aclList list.ACLList) (t tree.ObjectTree, err error) {
|
||||
t, err = tree.BuildObjectTree(treeStorage, listener, aclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: use context where it is needed
|
||||
err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: t.Heads(),
|
||||
SnapshotPath: t.SnapshotPath(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SyncTree) Lock() {
|
||||
s.objTree.Lock()
|
||||
}
|
||||
|
||||
func (s *SyncTree) Unlock() {
|
||||
s.objTree.Unlock()
|
||||
}
|
||||
|
||||
func (s *SyncTree) RLock() {
|
||||
s.objTree.RLock()
|
||||
}
|
||||
|
||||
func (s *SyncTree) RUnlock() {
|
||||
s.objTree.RUnlock()
|
||||
}
|
||||
|
||||
func (s *SyncTree) ID() string {
|
||||
return s.objTree.ID()
|
||||
}
|
||||
|
||||
func (s *SyncTree) Header() *aclpb.TreeHeader {
|
||||
return s.objTree.Header()
|
||||
}
|
||||
|
||||
func (s *SyncTree) Heads() []string {
|
||||
return s.objTree.Heads()
|
||||
}
|
||||
|
||||
func (s *SyncTree) Root() *tree.Change {
|
||||
return s.objTree.Root()
|
||||
}
|
||||
|
||||
func (s *SyncTree) HasChange(id string) bool {
|
||||
return s.objTree.HasChange(id)
|
||||
}
|
||||
|
||||
func (s *SyncTree) Iterate(convert tree.ChangeConvertFunc, iterate tree.ChangeIterateFunc) error {
|
||||
return s.objTree.Iterate(convert, iterate)
|
||||
}
|
||||
|
||||
func (s *SyncTree) IterateFrom(id string, convert tree.ChangeConvertFunc, iterate tree.ChangeIterateFunc) error {
|
||||
return s.objTree.IterateFrom(id, convert, iterate)
|
||||
}
|
||||
|
||||
func (s *SyncTree) SnapshotPath() []string {
|
||||
return s.objTree.SnapshotPath()
|
||||
}
|
||||
|
||||
func (s *SyncTree) ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawTreeChangeWithId, error) {
|
||||
return s.objTree.ChangesAfterCommonSnapshot(snapshotPath, heads)
|
||||
}
|
||||
|
||||
func (s *SyncTree) Storage() storage.TreeStorage {
|
||||
return s.objTree.Storage()
|
||||
}
|
||||
|
||||
func (s *SyncTree) DebugDump() (string, error) {
|
||||
return s.objTree.DebugDump()
|
||||
}
|
||||
|
||||
func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeContent) (res tree.AddResult, err error) {
|
||||
res, err = s.objTree.AddContent(ctx, content)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: res.Heads,
|
||||
Changes: res.Added,
|
||||
SnapshotPath: s.SnapshotPath(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (res tree.AddResult, err error) {
|
||||
res, err = s.objTree.AddRawChanges(ctx, changes...)
|
||||
if err != nil || res.Mode == tree.Nothing {
|
||||
return
|
||||
}
|
||||
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{
|
||||
Heads: res.Heads,
|
||||
Changes: res.Added,
|
||||
SnapshotPath: s.SnapshotPath(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SyncTree) Close() error {
|
||||
return s.objTree.Close()
|
||||
}
|
||||
@ -149,12 +149,6 @@ func (ot *objectTree) Storage() storage.TreeStorage {
|
||||
}
|
||||
|
||||
func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) {
|
||||
defer func() {
|
||||
if err == nil && ot.updateListener != nil {
|
||||
ot.updateListener.Update(ot)
|
||||
}
|
||||
}()
|
||||
|
||||
payload, err := ot.prepareBuilderContent(content)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user