From 145332b0f772d15dc95b81cfb4eebd858528939e Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 3 Jul 2023 13:43:54 +0200 Subject: [PATCH] Add headsync acl logic --- commonspace/headsync/diffsyncer.go | 18 +++++++++++++++++- commonspace/headsync/headsync.go | 13 +++++++++++-- commonspace/object/acl/list/list.go | 4 ++-- commonspace/object/acl/syncacl/syncacl.go | 17 ++++++++++++++++- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index a0539681..cbae98f9 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -3,10 +3,13 @@ package headsync import ( "context" "fmt" + "time" + "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" "github.com/anyproto/any-sync/commonspace/deletionstate" + "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestorage" @@ -14,8 +17,8 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/rpc/rpcerr" + "github.com/anyproto/any-sync/util/slice" "go.uber.org/zap" - "time" ) type DiffSyncer interface { @@ -38,6 +41,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer { log: log, syncStatus: hs.syncStatus, deletionState: hs.deletionState, + syncAcl: hs.syncAcl, } } @@ -53,6 +57,7 @@ type diffSyncer struct { credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusUpdater treeSyncer treemanager.TreeSyncer + syncAcl *syncacl.SyncAcl } func (d *diffSyncer) Init() { @@ -116,6 +121,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) cl = d.clientFactory.Client(conn) rdiff = NewRemoteDiff(d.spaceId, cl) stateCounter = d.syncStatus.StateCounter() + syncAclId = d.syncAcl.Id() ) newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) @@ -137,6 +143,16 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) totalLen := len(newIds) + len(changedIds) + len(removedIds) // not syncing ids which were removed through settings document missingIds := d.deletionState.Filter(newIds) + prevLen := len(changedIds) + changedIds = slice.DiscardFromSlice(changedIds, func(s string) bool { + return s == syncAclId + }) + // if acl head is different + if len(changedIds) < prevLen { + if syncErr := d.syncAcl.SyncWithPeer(ctx, p.Id()); syncErr != nil { + log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId)) + } + } existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...) d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 18ed7357..1cc0c485 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -3,12 +3,16 @@ package headsync import ( "context" + "sync/atomic" + "time" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" config2 "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" "github.com/anyproto/any-sync/commonspace/deletionstate" + "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestate" @@ -21,8 +25,6 @@ import ( "github.com/anyproto/any-sync/util/slice" "go.uber.org/zap" "golang.org/x/exp/slices" - "sync/atomic" - "time" ) var log = logger.NewNamed(CName) @@ -60,6 +62,7 @@ type headSync struct { credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusService deletionState deletionstate.ObjectDeletionState + syncAcl *syncacl.SyncAcl } func New() HeadSync { @@ -71,6 +74,7 @@ var createDiffSyncer = newDiffSyncer func (h *headSync) Init(a *app.App) (err error) { shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) cfg := a.MustComponent("config").(config2.ConfigGetter) + h.syncAcl = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl) h.spaceId = shared.SpaceId h.spaceIsDeleted = shared.SpaceIsDeleted h.syncPeriod = cfg.GetSpace().SyncPeriod @@ -92,6 +96,7 @@ func (h *headSync) Init(a *app.App) (err error) { return h.syncer.Sync(ctx) } h.periodicSync = periodicsync.NewPeriodicSync(h.syncPeriod, time.Minute, sync, h.log) + h.syncAcl.SetHeadUpdater(h) // TODO: move to run? h.syncer.Init() return nil @@ -177,6 +182,10 @@ func (h *headSync) fillDiff(objectIds []string) { Head: concatStrings(heads), }) } + els = append(els, ldiff.Element{ + Id: h.syncAcl.Id(), + Head: h.syncAcl.Head().Id, + }) h.diff.Set(els...) if err := h.storage.WriteSpaceHash(h.diff.Hash()); err != nil { h.log.Error("can't write space hash", zap.Error(err)) diff --git a/commonspace/object/acl/list/list.go b/commonspace/object/acl/list/list.go index 93a4a952..0d8d1d4b 100644 --- a/commonspace/object/acl/list/list.go +++ b/commonspace/object/acl/list/list.go @@ -57,7 +57,7 @@ type AclList interface { AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) - Close() (err error) + Close(ctx context.Context) (err error) } type aclList struct { @@ -293,6 +293,6 @@ func (a *aclList) IterateFrom(startId string, iterFunc IterFunc) { } } -func (a *aclList) Close() (err error) { +func (a *aclList) Close(ctx context.Context) (err error) { return nil } diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index 30a958b7..04917ab5 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -29,17 +29,30 @@ func New() *SyncAcl { return &SyncAcl{} } +type HeadUpdater interface { + UpdateHeads(id string, heads []string) +} + type SyncAcl struct { list.AclList syncClient SyncClient syncHandler synchandler.SyncHandler + headUpdater HeadUpdater isClosed bool } +func (s *SyncAcl) Run(ctx context.Context) (err error) { + return +} + func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { return s.HandleRequest(ctx, senderId, request) } +func (s *SyncAcl) SetHeadUpdater(updater HeadUpdater) { + s.headUpdater = updater +} + func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { return s.HandleMessage(ctx, senderId, request) } @@ -73,6 +86,7 @@ func (s *SyncAcl) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err erro return } headUpdate := s.syncClient.CreateHeadUpdate(s, []*consensusproto.RawRecordWithId{rawRec}) + s.headUpdater.UpdateHeads(s.Id(), []string{rawRec.Id}) s.syncClient.Broadcast(headUpdate) return } @@ -86,6 +100,7 @@ func (s *SyncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (e return } headUpdate := s.syncClient.CreateHeadUpdate(s, rawRecords) + s.headUpdater.UpdateHeads(s.Id(), []string{rawRecords[len(rawRecords)-1].Id}) s.syncClient.Broadcast(headUpdate) return } @@ -97,7 +112,7 @@ func (s *SyncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) { return s.syncClient.SendUpdate(peerId, s.Id(), headUpdate) } -func (s *SyncAcl) Close() (err error) { +func (s *SyncAcl) Close(ctx context.Context) (err error) { s.Lock() defer s.Unlock() s.isClosed = true