Merge pull request #47 from anyproto/acl-sync-protocol

Acl sync protocol
This commit is contained in:
Mikhail Rakhmanov 2023-07-13 11:23:49 +02:00 committed by GitHub
commit b1c198df1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
72 changed files with 3461 additions and 344 deletions

View File

@ -3,7 +3,7 @@ package mock_accountservice
import (
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/golang/mock/gomock"
"go.uber.org/mock/gomock"
)
func NewAccountServiceWithAccount(ctrl *gomock.Controller, acc *accountdata.AccountKeys) *MockService {

View File

@ -9,7 +9,7 @@ import (
app "github.com/anyproto/any-sync/app"
accountdata "github.com/anyproto/any-sync/commonspace/object/accountdata"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockService is a mock of Service interface.

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
ldiff "github.com/anyproto/any-sync/app/ldiff"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockDiff is a mock of Diff interface.

View File

@ -10,7 +10,7 @@ import (
app "github.com/anyproto/any-sync/app"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockCredentialProvider is a mock of CredentialProvider interface.

View File

@ -3,8 +3,8 @@ package deletionstate
import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sort"
"testing"
)

View File

@ -9,7 +9,7 @@ import (
app "github.com/anyproto/any-sync/app"
deletionstate "github.com/anyproto/any-sync/commonspace/deletionstate"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockObjectDeletionState is a mock of ObjectDeletionState interface.

View File

@ -3,10 +3,13 @@ package headsync
import (
"context"
"fmt"
"time"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestorage"
@ -14,8 +17,8 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
"time"
)
type DiffSyncer interface {
@ -38,6 +41,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
log: log,
syncStatus: hs.syncStatus,
deletionState: hs.deletionState,
syncAcl: hs.syncAcl,
}
}
@ -53,6 +57,7 @@ type diffSyncer struct {
credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusUpdater
treeSyncer treemanager.TreeSyncer
syncAcl syncacl.SyncAcl
}
func (d *diffSyncer) Init() {
@ -116,6 +121,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
cl = d.clientFactory.Client(conn)
rdiff = NewRemoteDiff(d.spaceId, cl)
stateCounter = d.syncStatus.StateCounter()
syncAclId = d.syncAcl.Id()
)
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
@ -138,17 +144,29 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
// not syncing ids which were removed through settings document
missingIds := d.deletionState.Filter(newIds)
existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...)
d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter)
prevExistingLen := len(existingIds)
existingIds = slice.DiscardFromSlice(existingIds, func(s string) bool {
return s == syncAclId
})
// if we removed acl head from the list
if len(existingIds) < prevExistingLen {
if syncErr := d.syncAcl.SyncWithPeer(ctx, p.Id()); syncErr != nil {
log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId))
}
}
// treeSyncer should not get acl id, that's why we filter existing ids before
err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds)
if err != nil {
return err
}
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
d.log.Info("sync done:",
zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)),
zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
zap.Int("already deleted ids", totalLen-prevExistingLen-len(missingIds)),
zap.String("peerId", p.Id()),
)
return

View File

@ -14,8 +14,8 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"storj.io/drpc"
)
@ -109,6 +109,7 @@ func TestDiffSyncer(t *testing.T) {
fx.initDiffSyncer(t)
defer fx.stop()
mPeer := mockPeer{}
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mPeer}, nil)
@ -122,6 +123,26 @@ func TestDiffSyncer(t *testing.T) {
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
t.Run("diff syncer sync, acl changed", func(t *testing.T) {
fx := newHeadSyncFixture(t)
fx.initDiffSyncer(t)
defer fx.stop()
mPeer := mockPeer{}
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mPeer}, nil)
fx.diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))).
Return([]string{"new"}, []string{"changed"}, nil, nil)
fx.deletionStateMock.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed", "aclId"}).Times(1)
fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1)
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
fx.aclMock.EXPECT().SyncWithPeer(gomock.Any(), mPeer.Id()).Return(nil)
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
t.Run("diff syncer sync conf error", func(t *testing.T) {
fx := newHeadSyncFixture(t)
fx.initDiffSyncer(t)
@ -139,6 +160,7 @@ func TestDiffSyncer(t *testing.T) {
fx.initDiffSyncer(t)
defer fx.stop()
deletedId := "id"
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.deletionStateMock.EXPECT().Exists(deletedId).Return(true)
// this should not result in any mock being called
@ -152,6 +174,7 @@ func TestDiffSyncer(t *testing.T) {
newId := "newId"
newHeads := []string{"h1", "h2"}
hash := "hash"
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.diffMock.EXPECT().Set(ldiff.Element{
Id: newId,
Head: concatStrings(newHeads),
@ -166,6 +189,7 @@ func TestDiffSyncer(t *testing.T) {
fx := newHeadSyncFixture(t)
fx.initDiffSyncer(t)
defer fx.stop()
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
aclStorageMock := mock_liststorage.NewMockListStorage(fx.ctrl)
settingsStorage := mock_treestorage.NewMockTreeStorage(fx.ctrl)
settingsId := "settingsId"
@ -211,6 +235,7 @@ func TestDiffSyncer(t *testing.T) {
fx := newHeadSyncFixture(t)
fx.initDiffSyncer(t)
defer fx.stop()
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil)
@ -226,6 +251,7 @@ func TestDiffSyncer(t *testing.T) {
fx.initDiffSyncer(t)
defer fx.stop()
mPeer := mockPeer{}
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mPeer}, nil)

View File

@ -3,12 +3,16 @@ package headsync
import (
"context"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
config2 "github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestate"
@ -21,8 +25,6 @@ import (
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"sync/atomic"
"time"
)
var log = logger.NewNamed(CName)
@ -60,6 +62,7 @@ type headSync struct {
credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusService
deletionState deletionstate.ObjectDeletionState
syncAcl syncacl.SyncAcl
}
func New() HeadSync {
@ -71,6 +74,7 @@ var createDiffSyncer = newDiffSyncer
func (h *headSync) Init(a *app.App) (err error) {
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
cfg := a.MustComponent("config").(config2.ConfigGetter)
h.syncAcl = a.MustComponent(syncacl.CName).(syncacl.SyncAcl)
h.spaceId = shared.SpaceId
h.spaceIsDeleted = shared.SpaceIsDeleted
h.syncPeriod = cfg.GetSpace().SyncPeriod
@ -92,6 +96,7 @@ func (h *headSync) Init(a *app.App) (err error) {
return h.syncer.Sync(ctx)
}
h.periodicSync = periodicsync.NewPeriodicSync(h.syncPeriod, time.Minute, sync, h.log)
h.syncAcl.SetHeadUpdater(h)
// TODO: move to run?
h.syncer.Init()
return nil
@ -177,6 +182,10 @@ func (h *headSync) fillDiff(objectIds []string) {
Head: concatStrings(heads),
})
}
els = append(els, ldiff.Element{
Id: h.syncAcl.Id(),
Head: h.syncAcl.Head().Id,
})
h.diff.Set(els...)
if err := h.storage.WriteSpaceHash(h.diff.Hash()); err != nil {
h.log.Error("can't write space hash", zap.Error(err))

View File

@ -11,6 +11,9 @@ import (
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate"
"github.com/anyproto/any-sync/commonspace/headsync/mock_headsync"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/mock_syncacl"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
@ -23,8 +26,8 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sync/atomic"
"testing"
)
@ -60,6 +63,7 @@ type headSyncFixture struct {
treeSyncerMock *mock_treemanager.MockTreeSyncer
diffMock *mock_ldiff.MockDiff
clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient
aclMock *mock_syncacl.MockSyncAcl
headSync *headSync
diffSyncer *diffSyncer
}
@ -87,9 +91,13 @@ func newHeadSyncFixture(t *testing.T) *headSyncFixture {
treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl)
diffMock := mock_ldiff.NewMockDiff(ctrl)
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
aclMock := mock_syncacl.NewMockSyncAcl(ctrl)
aclMock.EXPECT().Name().AnyTimes().Return(syncacl.CName)
aclMock.EXPECT().SetHeadUpdater(gomock.Any()).AnyTimes()
hs := &headSync{}
a := &app.App{}
a.Register(spaceState).
Register(aclMock).
Register(mockConfig{}).
Register(configurationMock).
Register(storageMock).
@ -115,6 +123,7 @@ func newHeadSyncFixture(t *testing.T) *headSyncFixture {
treeSyncerMock: treeSyncerMock,
diffMock: diffMock,
clientMock: clientMock,
aclMock: aclMock,
}
}
@ -144,6 +153,8 @@ func TestHeadSync(t *testing.T) {
treeMock := mock_treestorage.NewMockTreeStorage(fx.ctrl)
fx.storageMock.EXPECT().StoredIds().Return(ids, nil)
fx.storageMock.EXPECT().TreeStorage(ids[0]).Return(treeMock, nil)
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
fx.aclMock.EXPECT().Head().AnyTimes().Return(&list.AclRecord{Id: "headId"})
treeMock.EXPECT().Heads().Return([]string{"h1", "h2"}, nil)
fx.diffMock.EXPECT().Set(ldiff.Element{
Id: "id1",

View File

@ -8,7 +8,7 @@ import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockDiffSyncer is a mock of DiffSyncer interface.

View File

@ -10,6 +10,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/util/cidutil"
"github.com/anyproto/any-sync/util/crypto"
)
@ -44,7 +45,10 @@ type AclList interface {
Records() []*AclRecord
AclState() *AclState
IsAfter(first string, second string) (bool, error)
HasHead(head string) bool
Head() *AclRecord
RecordsAfter(ctx context.Context, id string) (records []*consensusproto.RawRecordWithId, err error)
Get(id string) (*AclRecord, error)
GetIndex(idx int) (*AclRecord, error)
Iterate(iterFunc IterFunc)
@ -55,8 +59,9 @@ type AclList interface {
ValidateRawRecord(record *consensusproto.RawRecord) (err error)
AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error)
AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error)
Close() (err error)
Close(ctx context.Context) (err error)
}
type aclList struct {
@ -195,6 +200,16 @@ func (a *aclList) ValidateRawRecord(rawRec *consensusproto.RawRecord) (err error
return a.aclState.Validator().ValidateAclRecordContents(record)
}
func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) {
for _, rec := range rawRecords {
err = a.AddRawRecord(rec)
if err != nil && err != ErrRecordAlreadyExists {
return
}
}
return
}
func (a *aclList) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) {
if _, ok := a.indexes[rawRec.Id]; ok {
return ErrRecordAlreadyExists
@ -246,6 +261,11 @@ func (a *aclList) Head() *AclRecord {
return a.records[len(a.records)-1]
}
func (a *aclList) HasHead(head string) bool {
_, exists := a.indexes[head]
return exists
}
func (a *aclList) Get(id string) (*AclRecord, error) {
recIdx, ok := a.indexes[id]
if !ok {
@ -270,6 +290,21 @@ func (a *aclList) Iterate(iterFunc IterFunc) {
}
}
func (a *aclList) RecordsAfter(ctx context.Context, id string) (records []*consensusproto.RawRecordWithId, err error) {
recIdx, ok := a.indexes[id]
if !ok {
return nil, ErrNoSuchRecord
}
for i := recIdx + 1; i < len(a.records); i++ {
rawRec, err := a.storage.GetRawRecord(ctx, a.records[i].Id)
if err != nil {
return nil, err
}
records = append(records, rawRec)
}
return
}
func (a *aclList) IterateFrom(startId string, iterFunc IterFunc) {
recIdx, ok := a.indexes[startId]
if !ok {
@ -282,6 +317,21 @@ func (a *aclList) IterateFrom(startId string, iterFunc IterFunc) {
}
}
func (a *aclList) Close() (err error) {
func (a *aclList) Close(ctx context.Context) (err error) {
return nil
}
func WrapAclRecord(rawRec *consensusproto.RawRecord) *consensusproto.RawRecordWithId {
payload, err := rawRec.Marshal()
if err != nil {
panic(err)
}
id, err := cidutil.NewCidFromBytes(payload)
if err != nil {
panic(err)
}
return &consensusproto.RawRecordWithId{
Payload: payload,
Id: id,
}
}

View File

@ -7,26 +7,10 @@ import (
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/util/cidutil"
"github.com/anyproto/any-sync/util/crypto"
"github.com/stretchr/testify/require"
)
func wrapRecord(rawRec *consensusproto.RawRecord) *consensusproto.RawRecordWithId {
payload, err := rawRec.Marshal()
if err != nil {
panic(err)
}
id, err := cidutil.NewCidFromBytes(payload)
if err != nil {
panic(err)
}
return &consensusproto.RawRecordWithId{
Payload: payload,
Id: id,
}
}
type aclFixture struct {
ownerKeys *accountdata.AccountKeys
accountKeys *accountdata.AccountKeys
@ -71,7 +55,7 @@ func (fx *aclFixture) inviteAccount(t *testing.T, perms AclPermissions) {
// building invite
inv, err := ownerAcl.RecordBuilder().BuildInvite()
require.NoError(t, err)
inviteRec := wrapRecord(inv.InviteRec)
inviteRec := WrapAclRecord(inv.InviteRec)
fx.addRec(t, inviteRec)
// building request join
@ -80,7 +64,7 @@ func (fx *aclFixture) inviteAccount(t *testing.T, perms AclPermissions) {
InviteKey: inv.InviteKey,
})
require.NoError(t, err)
requestJoinRec := wrapRecord(requestJoin)
requestJoinRec := WrapAclRecord(requestJoin)
fx.addRec(t, requestJoinRec)
// building request accept
@ -92,7 +76,7 @@ func (fx *aclFixture) inviteAccount(t *testing.T, perms AclPermissions) {
// validate
err = ownerAcl.ValidateRawRecord(requestAccept)
require.NoError(t, err)
requestAcceptRec := wrapRecord(requestAccept)
requestAcceptRec := WrapAclRecord(requestAccept)
fx.addRec(t, requestAcceptRec)
// checking acl state
@ -132,13 +116,13 @@ func TestAclList_InviteRevoke(t *testing.T) {
// building invite
inv, err := fx.ownerAcl.RecordBuilder().BuildInvite()
require.NoError(t, err)
inviteRec := wrapRecord(inv.InviteRec)
inviteRec := WrapAclRecord(inv.InviteRec)
fx.addRec(t, inviteRec)
// building invite revoke
inviteRevoke, err := fx.ownerAcl.RecordBuilder().BuildInviteRevoke(ownerState.lastRecordId)
require.NoError(t, err)
inviteRevokeRec := wrapRecord(inviteRevoke)
inviteRevokeRec := WrapAclRecord(inviteRevoke)
fx.addRec(t, inviteRevokeRec)
// checking acl state
@ -159,7 +143,7 @@ func TestAclList_RequestDecline(t *testing.T) {
// building invite
inv, err := ownerAcl.RecordBuilder().BuildInvite()
require.NoError(t, err)
inviteRec := wrapRecord(inv.InviteRec)
inviteRec := WrapAclRecord(inv.InviteRec)
fx.addRec(t, inviteRec)
// building request join
@ -168,13 +152,13 @@ func TestAclList_RequestDecline(t *testing.T) {
InviteKey: inv.InviteKey,
})
require.NoError(t, err)
requestJoinRec := wrapRecord(requestJoin)
requestJoinRec := WrapAclRecord(requestJoin)
fx.addRec(t, requestJoinRec)
// building request decline
requestDecline, err := ownerAcl.RecordBuilder().BuildRequestDecline(ownerState.lastRecordId)
require.NoError(t, err)
requestDeclineRec := wrapRecord(requestDecline)
requestDeclineRec := WrapAclRecord(requestDecline)
fx.addRec(t, requestDeclineRec)
// checking acl state
@ -198,7 +182,7 @@ func TestAclList_Remove(t *testing.T) {
ReadKey: newReadKey,
})
require.NoError(t, err)
removeRec := wrapRecord(remove)
removeRec := WrapAclRecord(remove)
fx.addRec(t, removeRec)
// checking acl state
@ -225,7 +209,7 @@ func TestAclList_ReadKeyChange(t *testing.T) {
newReadKey := crypto.NewAES()
readKeyChange, err := fx.ownerAcl.RecordBuilder().BuildReadKeyChange(newReadKey)
require.NoError(t, err)
readKeyRec := wrapRecord(readKeyChange)
readKeyRec := WrapAclRecord(readKeyChange)
fx.addRec(t, readKeyRec)
// checking acl state
@ -255,7 +239,7 @@ func TestAclList_PermissionChange(t *testing.T) {
Permissions: AclPermissions(aclrecordproto.AclUserPermissions_Writer),
})
require.NoError(t, err)
permissionChangeRec := wrapRecord(permissionChange)
permissionChangeRec := WrapAclRecord(permissionChange)
fx.addRec(t, permissionChangeRec)
// checking acl state
@ -279,7 +263,7 @@ func TestAclList_RequestRemove(t *testing.T) {
removeRequest, err := fx.accountAcl.RecordBuilder().BuildRequestRemove()
require.NoError(t, err)
removeRequestRec := wrapRecord(removeRequest)
removeRequestRec := WrapAclRecord(removeRequest)
fx.addRec(t, removeRequestRec)
recs := fx.accountAcl.AclState().RemoveRecords()
@ -292,7 +276,7 @@ func TestAclList_RequestRemove(t *testing.T) {
ReadKey: newReadKey,
})
require.NoError(t, err)
removeRec := wrapRecord(remove)
removeRec := WrapAclRecord(remove)
fx.addRec(t, removeRec)
// checking acl state

View File

@ -5,12 +5,13 @@
package mock_list
import (
context "context"
reflect "reflect"
list "github.com/anyproto/any-sync/commonspace/object/acl/list"
consensusproto "github.com/anyproto/any-sync/consensus/consensusproto"
crypto "github.com/anyproto/any-sync/util/crypto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockAclList is a mock of AclList interface.
@ -64,18 +65,32 @@ func (mr *MockAclListMockRecorder) AddRawRecord(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecord", reflect.TypeOf((*MockAclList)(nil).AddRawRecord), arg0)
}
// Close mocks base method.
func (m *MockAclList) Close() error {
// AddRawRecords mocks base method.
func (m *MockAclList) AddRawRecords(arg0 []*consensusproto.RawRecordWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret := m.ctrl.Call(m, "AddRawRecords", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawRecords indicates an expected call of AddRawRecords.
func (mr *MockAclListMockRecorder) AddRawRecords(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecords", reflect.TypeOf((*MockAclList)(nil).AddRawRecords), arg0)
}
// Close mocks base method.
func (m *MockAclList) Close(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockAclListMockRecorder) Close() *gomock.Call {
func (mr *MockAclListMockRecorder) Close(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAclList)(nil).Close))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAclList)(nil).Close), arg0)
}
// Get mocks base method.
@ -108,6 +123,20 @@ func (mr *MockAclListMockRecorder) GetIndex(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIndex", reflect.TypeOf((*MockAclList)(nil).GetIndex), arg0)
}
// HasHead mocks base method.
func (m *MockAclList) HasHead(arg0 string) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasHead", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// HasHead indicates an expected call of HasHead.
func (mr *MockAclListMockRecorder) HasHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasHead", reflect.TypeOf((*MockAclList)(nil).HasHead), arg0)
}
// Head mocks base method.
func (m *MockAclList) Head() *list.AclRecord {
m.ctrl.T.Helper()
@ -253,6 +282,21 @@ func (mr *MockAclListMockRecorder) Records() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Records", reflect.TypeOf((*MockAclList)(nil).Records))
}
// RecordsAfter mocks base method.
func (m *MockAclList) RecordsAfter(arg0 context.Context, arg1 string) ([]*consensusproto.RawRecordWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RecordsAfter", arg0, arg1)
ret0, _ := ret[0].([]*consensusproto.RawRecordWithId)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RecordsAfter indicates an expected call of RecordsAfter.
func (mr *MockAclListMockRecorder) RecordsAfter(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordsAfter", reflect.TypeOf((*MockAclList)(nil).RecordsAfter), arg0, arg1)
}
// Root mocks base method.
func (m *MockAclList) Root() *consensusproto.RawRecordWithId {
m.ctrl.T.Helper()

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
consensusproto "github.com/anyproto/any-sync/consensus/consensusproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockListStorage is a mock of ListStorage interface.

View File

@ -0,0 +1,120 @@
//go:generate mockgen -destination mock_syncacl/mock_syncacl.go github.com/anyproto/any-sync/commonspace/object/acl/syncacl SyncAcl,SyncClient,RequestFactory,AclSyncProtocol
package syncacl
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/consensus/consensusproto"
"go.uber.org/zap"
)
type AclSyncProtocol interface {
HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error)
FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error)
FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error)
}
type aclSyncProtocol struct {
log logger.CtxLogger
spaceId string
aclList list.AclList
reqFactory RequestFactory
}
func (a *aclSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error) {
isEmptyUpdate := len(update.Records) == 0
log := a.log.With(
zap.String("senderId", senderId),
zap.String("update head", update.Head),
zap.Int("len(update records)", len(update.Records)))
log.DebugCtx(ctx, "received acl head update message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl head update finished with error", zap.Error(err))
} else if request != nil {
cnt := request.Content.GetFullSyncRequest()
log.DebugCtx(ctx, "returning acl full sync request", zap.String("request head", cnt.Head))
} else {
if !isEmptyUpdate {
log.DebugCtx(ctx, "acl head update finished correctly")
}
}
}()
if isEmptyUpdate {
headEquals := a.aclList.Head().Id == update.Head
log.DebugCtx(ctx, "is empty acl head update", zap.Bool("headEquals", headEquals))
if headEquals {
return
}
return a.reqFactory.CreateFullSyncRequest(a.aclList, update.Head)
}
if a.aclList.HasHead(update.Head) {
return
}
err = a.aclList.AddRawRecords(update.Records)
if err == list.ErrIncorrectRecordSequence {
return a.reqFactory.CreateFullSyncRequest(a.aclList, update.Head)
}
return
}
func (a *aclSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error) {
log := a.log.With(
zap.String("senderId", senderId),
zap.String("request head", request.Head),
zap.Int("len(request records)", len(request.Records)))
log.DebugCtx(ctx, "received acl full sync request message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl full sync request finished with error", zap.Error(err))
} else if response != nil {
cnt := response.Content.GetFullSyncResponse()
log.DebugCtx(ctx, "acl full sync response sent", zap.String("response head", cnt.Head), zap.Int("len(response records)", len(cnt.Records)))
}
}()
if !a.aclList.HasHead(request.Head) {
if len(request.Records) > 0 {
// in this case we can try to add some records
err = a.aclList.AddRawRecords(request.Records)
if err != nil {
return
}
} else {
// here it is impossible for us to do anything, we can't return records after head as defined in request, because we don't have it
return nil, list.ErrIncorrectRecordSequence
}
}
return a.reqFactory.CreateFullSyncResponse(a.aclList, request.Head)
}
func (a *aclSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error) {
log := a.log.With(
zap.String("senderId", senderId),
zap.String("response head", response.Head),
zap.Int("len(response records)", len(response.Records)))
log.DebugCtx(ctx, "received acl full sync response message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "acl full sync response succeeded")
}
}()
if a.aclList.HasHead(response.Head) {
return
}
return a.aclList.AddRawRecords(response.Records)
}
func newAclSyncProtocol(spaceId string, aclList list.AclList, reqFactory RequestFactory) *aclSyncProtocol {
return &aclSyncProtocol{
log: log.With(zap.String("spaceId", spaceId), zap.String("aclId", aclList.Id())),
spaceId: spaceId,
aclList: aclList,
reqFactory: reqFactory,
}
}

View File

@ -0,0 +1,213 @@
package syncacl
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/list/mock_list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/mock_syncacl"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)
type aclSyncProtocolFixture struct {
log logger.CtxLogger
spaceId string
senderId string
aclId string
aclMock *mock_list.MockAclList
reqFactory *mock_syncacl.MockRequestFactory
ctrl *gomock.Controller
syncProtocol AclSyncProtocol
}
func newSyncProtocolFixture(t *testing.T) *aclSyncProtocolFixture {
ctrl := gomock.NewController(t)
aclList := mock_list.NewMockAclList(ctrl)
spaceId := "spaceId"
reqFactory := mock_syncacl.NewMockRequestFactory(ctrl)
aclList.EXPECT().Id().Return("aclId")
syncProtocol := newAclSyncProtocol(spaceId, aclList, reqFactory)
return &aclSyncProtocolFixture{
log: log,
spaceId: spaceId,
senderId: "senderId",
aclId: "aclId",
aclMock: aclList,
reqFactory: reqFactory,
ctrl: ctrl,
syncProtocol: syncProtocol,
}
}
func (fx *aclSyncProtocolFixture) stop() {
fx.ctrl.Finish()
}
func TestHeadUpdate(t *testing.T) {
ctx := context.Background()
fullRequest := &consensusproto.LogSyncMessage{
Content: &consensusproto.LogSyncContentValue{
Value: &consensusproto.LogSyncContentValue_FullSyncRequest{
FullSyncRequest: &consensusproto.LogFullSyncRequest{},
},
},
}
t.Run("head update non empty all heads added", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(false)
fx.aclMock.EXPECT().AddRawRecords(headUpdate.Records).Return(nil)
req, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
require.Nil(t, req)
require.NoError(t, err)
})
t.Run("head update results in full request", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(false)
fx.aclMock.EXPECT().AddRawRecords(headUpdate.Records).Return(list.ErrIncorrectRecordSequence)
fx.reqFactory.EXPECT().CreateFullSyncRequest(fx.aclMock, headUpdate.Head).Return(fullRequest, nil)
req, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
require.Equal(t, fullRequest, req)
require.NoError(t, err)
})
t.Run("head update old heads", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(true)
req, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
require.Nil(t, req)
require.NoError(t, err)
})
t.Run("head update empty equals", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().Head().Return(&list.AclRecord{Id: "h1"})
req, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
require.Nil(t, req)
require.NoError(t, err)
})
t.Run("head update empty results in full request", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().Head().Return(&list.AclRecord{Id: "h2"})
fx.reqFactory.EXPECT().CreateFullSyncRequest(fx.aclMock, headUpdate.Head).Return(fullRequest, nil)
req, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
require.Equal(t, fullRequest, req)
require.NoError(t, err)
})
}
func TestFullSyncRequest(t *testing.T) {
ctx := context.Background()
fullResponse := &consensusproto.LogSyncMessage{
Content: &consensusproto.LogSyncContentValue{
Value: &consensusproto.LogSyncContentValue_FullSyncResponse{
FullSyncResponse: &consensusproto.LogFullSyncResponse{},
},
},
}
t.Run("full sync request non empty all heads added", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullRequest := &consensusproto.LogFullSyncRequest{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(false)
fx.aclMock.EXPECT().AddRawRecords(fullRequest.Records).Return(nil)
fx.reqFactory.EXPECT().CreateFullSyncResponse(fx.aclMock, fullRequest.Head).Return(fullResponse, nil)
resp, err := fx.syncProtocol.FullSyncRequest(ctx, fx.senderId, fullRequest)
require.Equal(t, fullResponse, resp)
require.NoError(t, err)
})
t.Run("full sync request non empty head exists", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullRequest := &consensusproto.LogFullSyncRequest{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(true)
fx.reqFactory.EXPECT().CreateFullSyncResponse(fx.aclMock, fullRequest.Head).Return(fullResponse, nil)
resp, err := fx.syncProtocol.FullSyncRequest(ctx, fx.senderId, fullRequest)
require.Equal(t, fullResponse, resp)
require.NoError(t, err)
})
t.Run("full sync request empty head not exists", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
fullRequest := &consensusproto.LogFullSyncRequest{
Head: "h1",
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(false)
resp, err := fx.syncProtocol.FullSyncRequest(ctx, fx.senderId, fullRequest)
require.Nil(t, resp)
require.Error(t, list.ErrIncorrectRecordSequence, err)
})
}
func TestFullSyncResponse(t *testing.T) {
ctx := context.Background()
t.Run("full sync response no heads", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullResponse := &consensusproto.LogFullSyncResponse{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(false)
fx.aclMock.EXPECT().AddRawRecords(fullResponse.Records).Return(nil)
err := fx.syncProtocol.FullSyncResponse(ctx, fx.senderId, fullResponse)
require.NoError(t, err)
})
t.Run("full sync response has heads", func(t *testing.T) {
fx := newSyncProtocolFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullResponse := &consensusproto.LogFullSyncResponse{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.aclMock.EXPECT().HasHead("h1").Return(true)
err := fx.syncProtocol.FullSyncResponse(ctx, fx.senderId, fullResponse)
require.NoError(t, err)
})
}

View File

@ -0,0 +1,5 @@
package headupdater
type HeadUpdater interface {
UpdateHeads(id string, heads []string)
}

View File

@ -0,0 +1,694 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anyproto/any-sync/commonspace/object/acl/syncacl (interfaces: SyncAcl,SyncClient,RequestFactory,AclSyncProtocol)
// Package mock_syncacl is a generated GoMock package.
package mock_syncacl
import (
context "context"
reflect "reflect"
app "github.com/anyproto/any-sync/app"
list "github.com/anyproto/any-sync/commonspace/object/acl/list"
headupdater "github.com/anyproto/any-sync/commonspace/object/acl/syncacl/headupdater"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
consensusproto "github.com/anyproto/any-sync/consensus/consensusproto"
crypto "github.com/anyproto/any-sync/util/crypto"
gomock "go.uber.org/mock/gomock"
)
// MockSyncAcl is a mock of SyncAcl interface.
type MockSyncAcl struct {
ctrl *gomock.Controller
recorder *MockSyncAclMockRecorder
}
// MockSyncAclMockRecorder is the mock recorder for MockSyncAcl.
type MockSyncAclMockRecorder struct {
mock *MockSyncAcl
}
// NewMockSyncAcl creates a new mock instance.
func NewMockSyncAcl(ctrl *gomock.Controller) *MockSyncAcl {
mock := &MockSyncAcl{ctrl: ctrl}
mock.recorder = &MockSyncAclMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockSyncAcl) EXPECT() *MockSyncAclMockRecorder {
return m.recorder
}
// AclState mocks base method.
func (m *MockSyncAcl) AclState() *list.AclState {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AclState")
ret0, _ := ret[0].(*list.AclState)
return ret0
}
// AclState indicates an expected call of AclState.
func (mr *MockSyncAclMockRecorder) AclState() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AclState", reflect.TypeOf((*MockSyncAcl)(nil).AclState))
}
// AddRawRecord mocks base method.
func (m *MockSyncAcl) AddRawRecord(arg0 *consensusproto.RawRecordWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRawRecord", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawRecord indicates an expected call of AddRawRecord.
func (mr *MockSyncAclMockRecorder) AddRawRecord(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecord", reflect.TypeOf((*MockSyncAcl)(nil).AddRawRecord), arg0)
}
// AddRawRecords mocks base method.
func (m *MockSyncAcl) AddRawRecords(arg0 []*consensusproto.RawRecordWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRawRecords", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawRecords indicates an expected call of AddRawRecords.
func (mr *MockSyncAclMockRecorder) AddRawRecords(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawRecords", reflect.TypeOf((*MockSyncAcl)(nil).AddRawRecords), arg0)
}
// Close mocks base method.
func (m *MockSyncAcl) Close(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockSyncAclMockRecorder) Close(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncAcl)(nil).Close), arg0)
}
// Get mocks base method.
func (m *MockSyncAcl) Get(arg0 string) (*list.AclRecord, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", arg0)
ret0, _ := ret[0].(*list.AclRecord)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Get indicates an expected call of Get.
func (mr *MockSyncAclMockRecorder) Get(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockSyncAcl)(nil).Get), arg0)
}
// GetIndex mocks base method.
func (m *MockSyncAcl) GetIndex(arg0 int) (*list.AclRecord, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetIndex", arg0)
ret0, _ := ret[0].(*list.AclRecord)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetIndex indicates an expected call of GetIndex.
func (mr *MockSyncAclMockRecorder) GetIndex(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIndex", reflect.TypeOf((*MockSyncAcl)(nil).GetIndex), arg0)
}
// HandleMessage mocks base method.
func (m *MockSyncAcl) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// HandleMessage indicates an expected call of HandleMessage.
func (mr *MockSyncAclMockRecorder) HandleMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockSyncAcl)(nil).HandleMessage), arg0, arg1, arg2)
}
// HandleRequest mocks base method.
func (m *MockSyncAcl) HandleRequest(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleRequest", arg0, arg1, arg2)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HandleRequest indicates an expected call of HandleRequest.
func (mr *MockSyncAclMockRecorder) HandleRequest(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleRequest", reflect.TypeOf((*MockSyncAcl)(nil).HandleRequest), arg0, arg1, arg2)
}
// HasHead mocks base method.
func (m *MockSyncAcl) HasHead(arg0 string) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasHead", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// HasHead indicates an expected call of HasHead.
func (mr *MockSyncAclMockRecorder) HasHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasHead", reflect.TypeOf((*MockSyncAcl)(nil).HasHead), arg0)
}
// Head mocks base method.
func (m *MockSyncAcl) Head() *list.AclRecord {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Head")
ret0, _ := ret[0].(*list.AclRecord)
return ret0
}
// Head indicates an expected call of Head.
func (mr *MockSyncAclMockRecorder) Head() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Head", reflect.TypeOf((*MockSyncAcl)(nil).Head))
}
// Id mocks base method.
func (m *MockSyncAcl) Id() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Id")
ret0, _ := ret[0].(string)
return ret0
}
// Id indicates an expected call of Id.
func (mr *MockSyncAclMockRecorder) Id() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockSyncAcl)(nil).Id))
}
// Init mocks base method.
func (m *MockSyncAcl) Init(arg0 *app.App) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Init", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Init indicates an expected call of Init.
func (mr *MockSyncAclMockRecorder) Init(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockSyncAcl)(nil).Init), arg0)
}
// IsAfter mocks base method.
func (m *MockSyncAcl) IsAfter(arg0, arg1 string) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsAfter", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// IsAfter indicates an expected call of IsAfter.
func (mr *MockSyncAclMockRecorder) IsAfter(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAfter", reflect.TypeOf((*MockSyncAcl)(nil).IsAfter), arg0, arg1)
}
// Iterate mocks base method.
func (m *MockSyncAcl) Iterate(arg0 func(*list.AclRecord) bool) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Iterate", arg0)
}
// Iterate indicates an expected call of Iterate.
func (mr *MockSyncAclMockRecorder) Iterate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Iterate", reflect.TypeOf((*MockSyncAcl)(nil).Iterate), arg0)
}
// IterateFrom mocks base method.
func (m *MockSyncAcl) IterateFrom(arg0 string, arg1 func(*list.AclRecord) bool) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "IterateFrom", arg0, arg1)
}
// IterateFrom indicates an expected call of IterateFrom.
func (mr *MockSyncAclMockRecorder) IterateFrom(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IterateFrom", reflect.TypeOf((*MockSyncAcl)(nil).IterateFrom), arg0, arg1)
}
// KeyStorage mocks base method.
func (m *MockSyncAcl) KeyStorage() crypto.KeyStorage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "KeyStorage")
ret0, _ := ret[0].(crypto.KeyStorage)
return ret0
}
// KeyStorage indicates an expected call of KeyStorage.
func (mr *MockSyncAclMockRecorder) KeyStorage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeyStorage", reflect.TypeOf((*MockSyncAcl)(nil).KeyStorage))
}
// Lock mocks base method.
func (m *MockSyncAcl) Lock() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Lock")
}
// Lock indicates an expected call of Lock.
func (mr *MockSyncAclMockRecorder) Lock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncAcl)(nil).Lock))
}
// Name mocks base method.
func (m *MockSyncAcl) Name() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Name")
ret0, _ := ret[0].(string)
return ret0
}
// Name indicates an expected call of Name.
func (mr *MockSyncAclMockRecorder) Name() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockSyncAcl)(nil).Name))
}
// RLock mocks base method.
func (m *MockSyncAcl) RLock() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "RLock")
}
// RLock indicates an expected call of RLock.
func (mr *MockSyncAclMockRecorder) RLock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RLock", reflect.TypeOf((*MockSyncAcl)(nil).RLock))
}
// RUnlock mocks base method.
func (m *MockSyncAcl) RUnlock() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "RUnlock")
}
// RUnlock indicates an expected call of RUnlock.
func (mr *MockSyncAclMockRecorder) RUnlock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RUnlock", reflect.TypeOf((*MockSyncAcl)(nil).RUnlock))
}
// RecordBuilder mocks base method.
func (m *MockSyncAcl) RecordBuilder() list.AclRecordBuilder {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RecordBuilder")
ret0, _ := ret[0].(list.AclRecordBuilder)
return ret0
}
// RecordBuilder indicates an expected call of RecordBuilder.
func (mr *MockSyncAclMockRecorder) RecordBuilder() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordBuilder", reflect.TypeOf((*MockSyncAcl)(nil).RecordBuilder))
}
// Records mocks base method.
func (m *MockSyncAcl) Records() []*list.AclRecord {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Records")
ret0, _ := ret[0].([]*list.AclRecord)
return ret0
}
// Records indicates an expected call of Records.
func (mr *MockSyncAclMockRecorder) Records() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Records", reflect.TypeOf((*MockSyncAcl)(nil).Records))
}
// RecordsAfter mocks base method.
func (m *MockSyncAcl) RecordsAfter(arg0 context.Context, arg1 string) ([]*consensusproto.RawRecordWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RecordsAfter", arg0, arg1)
ret0, _ := ret[0].([]*consensusproto.RawRecordWithId)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RecordsAfter indicates an expected call of RecordsAfter.
func (mr *MockSyncAclMockRecorder) RecordsAfter(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordsAfter", reflect.TypeOf((*MockSyncAcl)(nil).RecordsAfter), arg0, arg1)
}
// Root mocks base method.
func (m *MockSyncAcl) Root() *consensusproto.RawRecordWithId {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Root")
ret0, _ := ret[0].(*consensusproto.RawRecordWithId)
return ret0
}
// Root indicates an expected call of Root.
func (mr *MockSyncAclMockRecorder) Root() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncAcl)(nil).Root))
}
// Run mocks base method.
func (m *MockSyncAcl) Run(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Run", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Run indicates an expected call of Run.
func (mr *MockSyncAclMockRecorder) Run(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncAcl)(nil).Run), arg0)
}
// SetHeadUpdater mocks base method.
func (m *MockSyncAcl) SetHeadUpdater(arg0 headupdater.HeadUpdater) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetHeadUpdater", arg0)
}
// SetHeadUpdater indicates an expected call of SetHeadUpdater.
func (mr *MockSyncAclMockRecorder) SetHeadUpdater(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeadUpdater", reflect.TypeOf((*MockSyncAcl)(nil).SetHeadUpdater), arg0)
}
// SyncWithPeer mocks base method.
func (m *MockSyncAcl) SyncWithPeer(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SyncWithPeer indicates an expected call of SyncWithPeer.
func (mr *MockSyncAclMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncAcl)(nil).SyncWithPeer), arg0, arg1)
}
// Unlock mocks base method.
func (m *MockSyncAcl) Unlock() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Unlock")
}
// Unlock indicates an expected call of Unlock.
func (mr *MockSyncAclMockRecorder) Unlock() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockSyncAcl)(nil).Unlock))
}
// ValidateRawRecord mocks base method.
func (m *MockSyncAcl) ValidateRawRecord(arg0 *consensusproto.RawRecord) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ValidateRawRecord", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// ValidateRawRecord indicates an expected call of ValidateRawRecord.
func (mr *MockSyncAclMockRecorder) ValidateRawRecord(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateRawRecord", reflect.TypeOf((*MockSyncAcl)(nil).ValidateRawRecord), arg0)
}
// MockSyncClient is a mock of SyncClient interface.
type MockSyncClient struct {
ctrl *gomock.Controller
recorder *MockSyncClientMockRecorder
}
// MockSyncClientMockRecorder is the mock recorder for MockSyncClient.
type MockSyncClientMockRecorder struct {
mock *MockSyncClient
}
// NewMockSyncClient creates a new mock instance.
func NewMockSyncClient(ctrl *gomock.Controller) *MockSyncClient {
mock := &MockSyncClient{ctrl: ctrl}
mock.recorder = &MockSyncClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
return m.recorder
}
// Broadcast mocks base method.
func (m *MockSyncClient) Broadcast(arg0 *consensusproto.LogSyncMessage) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Broadcast", arg0)
}
// Broadcast indicates an expected call of Broadcast.
func (mr *MockSyncClientMockRecorder) Broadcast(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0)
}
// CreateFullSyncRequest mocks base method.
func (m *MockSyncClient) CreateFullSyncRequest(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncRequest", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncRequest indicates an expected call of CreateFullSyncRequest.
func (mr *MockSyncClientMockRecorder) CreateFullSyncRequest(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncRequest), arg0, arg1)
}
// CreateFullSyncResponse mocks base method.
func (m *MockSyncClient) CreateFullSyncResponse(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncResponse", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncResponse indicates an expected call of CreateFullSyncResponse.
func (mr *MockSyncClientMockRecorder) CreateFullSyncResponse(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncResponse", reflect.TypeOf((*MockSyncClient)(nil).CreateFullSyncResponse), arg0, arg1)
}
// CreateHeadUpdate mocks base method.
func (m *MockSyncClient) CreateHeadUpdate(arg0 list.AclList, arg1 []*consensusproto.RawRecordWithId) *consensusproto.LogSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateHeadUpdate", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
return ret0
}
// CreateHeadUpdate indicates an expected call of CreateHeadUpdate.
func (mr *MockSyncClientMockRecorder) CreateHeadUpdate(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHeadUpdate", reflect.TypeOf((*MockSyncClient)(nil).CreateHeadUpdate), arg0, arg1)
}
// QueueRequest mocks base method.
func (m *MockSyncClient) QueueRequest(arg0 string, arg1 *consensusproto.LogSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueueRequest", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// QueueRequest indicates an expected call of QueueRequest.
func (mr *MockSyncClientMockRecorder) QueueRequest(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueRequest", reflect.TypeOf((*MockSyncClient)(nil).QueueRequest), arg0, arg1)
}
// SendRequest mocks base method.
func (m *MockSyncClient) SendRequest(arg0 context.Context, arg1 string, arg2 *consensusproto.LogSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendRequest", arg0, arg1, arg2)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendRequest indicates an expected call of SendRequest.
func (mr *MockSyncClientMockRecorder) SendRequest(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRequest", reflect.TypeOf((*MockSyncClient)(nil).SendRequest), arg0, arg1, arg2)
}
// SendUpdate mocks base method.
func (m *MockSyncClient) SendUpdate(arg0 string, arg1 *consensusproto.LogSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendUpdate", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SendUpdate indicates an expected call of SendUpdate.
func (mr *MockSyncClientMockRecorder) SendUpdate(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendUpdate", reflect.TypeOf((*MockSyncClient)(nil).SendUpdate), arg0, arg1)
}
// MockRequestFactory is a mock of RequestFactory interface.
type MockRequestFactory struct {
ctrl *gomock.Controller
recorder *MockRequestFactoryMockRecorder
}
// MockRequestFactoryMockRecorder is the mock recorder for MockRequestFactory.
type MockRequestFactoryMockRecorder struct {
mock *MockRequestFactory
}
// NewMockRequestFactory creates a new mock instance.
func NewMockRequestFactory(ctrl *gomock.Controller) *MockRequestFactory {
mock := &MockRequestFactory{ctrl: ctrl}
mock.recorder = &MockRequestFactoryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockRequestFactory) EXPECT() *MockRequestFactoryMockRecorder {
return m.recorder
}
// CreateFullSyncRequest mocks base method.
func (m *MockRequestFactory) CreateFullSyncRequest(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncRequest", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncRequest indicates an expected call of CreateFullSyncRequest.
func (mr *MockRequestFactoryMockRecorder) CreateFullSyncRequest(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncRequest", reflect.TypeOf((*MockRequestFactory)(nil).CreateFullSyncRequest), arg0, arg1)
}
// CreateFullSyncResponse mocks base method.
func (m *MockRequestFactory) CreateFullSyncResponse(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateFullSyncResponse", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateFullSyncResponse indicates an expected call of CreateFullSyncResponse.
func (mr *MockRequestFactoryMockRecorder) CreateFullSyncResponse(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFullSyncResponse", reflect.TypeOf((*MockRequestFactory)(nil).CreateFullSyncResponse), arg0, arg1)
}
// CreateHeadUpdate mocks base method.
func (m *MockRequestFactory) CreateHeadUpdate(arg0 list.AclList, arg1 []*consensusproto.RawRecordWithId) *consensusproto.LogSyncMessage {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateHeadUpdate", arg0, arg1)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
return ret0
}
// CreateHeadUpdate indicates an expected call of CreateHeadUpdate.
func (mr *MockRequestFactoryMockRecorder) CreateHeadUpdate(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHeadUpdate", reflect.TypeOf((*MockRequestFactory)(nil).CreateHeadUpdate), arg0, arg1)
}
// MockAclSyncProtocol is a mock of AclSyncProtocol interface.
type MockAclSyncProtocol struct {
ctrl *gomock.Controller
recorder *MockAclSyncProtocolMockRecorder
}
// MockAclSyncProtocolMockRecorder is the mock recorder for MockAclSyncProtocol.
type MockAclSyncProtocolMockRecorder struct {
mock *MockAclSyncProtocol
}
// NewMockAclSyncProtocol creates a new mock instance.
func NewMockAclSyncProtocol(ctrl *gomock.Controller) *MockAclSyncProtocol {
mock := &MockAclSyncProtocol{ctrl: ctrl}
mock.recorder = &MockAclSyncProtocolMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockAclSyncProtocol) EXPECT() *MockAclSyncProtocolMockRecorder {
return m.recorder
}
// FullSyncRequest mocks base method.
func (m *MockAclSyncProtocol) FullSyncRequest(arg0 context.Context, arg1 string, arg2 *consensusproto.LogFullSyncRequest) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FullSyncRequest", arg0, arg1, arg2)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FullSyncRequest indicates an expected call of FullSyncRequest.
func (mr *MockAclSyncProtocolMockRecorder) FullSyncRequest(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FullSyncRequest", reflect.TypeOf((*MockAclSyncProtocol)(nil).FullSyncRequest), arg0, arg1, arg2)
}
// FullSyncResponse mocks base method.
func (m *MockAclSyncProtocol) FullSyncResponse(arg0 context.Context, arg1 string, arg2 *consensusproto.LogFullSyncResponse) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FullSyncResponse", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// FullSyncResponse indicates an expected call of FullSyncResponse.
func (mr *MockAclSyncProtocolMockRecorder) FullSyncResponse(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FullSyncResponse", reflect.TypeOf((*MockAclSyncProtocol)(nil).FullSyncResponse), arg0, arg1, arg2)
}
// HeadUpdate mocks base method.
func (m *MockAclSyncProtocol) HeadUpdate(arg0 context.Context, arg1 string, arg2 *consensusproto.LogHeadUpdate) (*consensusproto.LogSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HeadUpdate", arg0, arg1, arg2)
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HeadUpdate indicates an expected call of HeadUpdate.
func (mr *MockAclSyncProtocolMockRecorder) HeadUpdate(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadUpdate", reflect.TypeOf((*MockAclSyncProtocol)(nil).HeadUpdate), arg0, arg1, arg2)
}

View File

@ -0,0 +1,54 @@
package syncacl
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/consensus/consensusproto"
)
type RequestFactory interface {
CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage)
CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error)
CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error)
}
type requestFactory struct{}
func NewRequestFactory() RequestFactory {
return &requestFactory{}
}
func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage) {
return consensusproto.WrapHeadUpdate(&consensusproto.LogHeadUpdate{
Head: l.Head().Id,
Records: added,
}, l.Root())
}
func (r *requestFactory) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error) {
if !l.HasHead(theirHead) {
return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{
Head: l.Head().Id,
}, l.Root()), nil
}
records, err := l.RecordsAfter(context.Background(), theirHead)
if err != nil {
return
}
return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{
Head: l.Head().Id,
Records: records,
}, l.Root()), nil
}
func (r *requestFactory) CreateFullSyncResponse(l list.AclList, theirHead string) (resp *consensusproto.LogSyncMessage, err error) {
records, err := l.RecordsAfter(context.Background(), theirHead)
if err != nil {
return
}
return consensusproto.WrapFullResponse(&consensusproto.LogFullSyncResponse{
Head: l.Head().Id,
Records: records,
}, l.Root()), nil
}

View File

@ -2,33 +2,68 @@ package syncacl
import (
"context"
"errors"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/headupdater"
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
)
const CName = "common.acl.syncacl"
func New() *SyncAcl {
return &SyncAcl{}
}
var (
log = logger.NewNamed(CName)
type SyncAcl struct {
ErrSyncAclClosed = errors.New("sync acl is closed")
)
type SyncAcl interface {
app.ComponentRunnable
list.AclList
syncobjectgetter.SyncObject
SetHeadUpdater(updater headupdater.HeadUpdater)
SyncWithPeer(ctx context.Context, peerId string) (err error)
}
func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
return nil, nil
func New() SyncAcl {
return &syncAcl{}
}
func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
type syncAcl struct {
list.AclList
syncClient SyncClient
syncHandler synchandler.SyncHandler
headUpdater headupdater.HeadUpdater
isClosed bool
}
func (s *SyncAcl) Init(a *app.App) (err error) {
func (s *syncAcl) Run(ctx context.Context) (err error) {
return
}
func (s *syncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
return s.syncHandler.HandleRequest(ctx, senderId, request)
}
func (s *syncAcl) SetHeadUpdater(updater headupdater.HeadUpdater) {
s.headUpdater = updater
}
func (s *syncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
return s.syncHandler.HandleMessage(ctx, senderId, request)
}
func (s *syncAcl) Init(a *app.App) (err error) {
storage := a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
aclStorage, err := storage.AclStorage()
if err != nil {
@ -36,9 +71,60 @@ func (s *SyncAcl) Init(a *app.App) (err error) {
}
acc := a.MustComponent(accountservice.CName).(accountservice.Service)
s.AclList, err = list.BuildAclListWithIdentity(acc.Account(), aclStorage, list.NoOpAcceptorVerifier{})
if err != nil {
return
}
spaceId := storage.Id()
requestManager := a.MustComponent(requestmanager.CName).(requestmanager.RequestManager)
peerManager := a.MustComponent(peermanager.CName).(peermanager.PeerManager)
syncStatus := a.MustComponent(syncstatus.CName).(syncstatus.StatusService)
s.syncClient = NewSyncClient(spaceId, requestManager, peerManager)
s.syncHandler = newSyncAclHandler(storage.Id(), s, s.syncClient, syncStatus)
return err
}
func (s *SyncAcl) Name() (name string) {
func (s *syncAcl) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) {
if s.isClosed {
return ErrSyncAclClosed
}
err = s.AclList.AddRawRecord(rawRec)
if err != nil {
return
}
headUpdate := s.syncClient.CreateHeadUpdate(s, []*consensusproto.RawRecordWithId{rawRec})
s.headUpdater.UpdateHeads(s.Id(), []string{rawRec.Id})
s.syncClient.Broadcast(headUpdate)
return
}
func (s *syncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) {
if s.isClosed {
return ErrSyncAclClosed
}
err = s.AclList.AddRawRecords(rawRecords)
if err != nil {
return
}
headUpdate := s.syncClient.CreateHeadUpdate(s, rawRecords)
s.headUpdater.UpdateHeads(s.Id(), []string{rawRecords[len(rawRecords)-1].Id})
s.syncClient.Broadcast(headUpdate)
return
}
func (s *syncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) {
s.Lock()
defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(peerId, headUpdate)
}
func (s *syncAcl) Close(ctx context.Context) (err error) {
s.Lock()
defer s.Unlock()
s.isClosed = true
return
}
func (s *syncAcl) Name() (name string) {
return CName
}

View File

@ -2,15 +2,81 @@ package syncacl
import (
"context"
"errors"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/gogo/protobuf/proto"
)
var (
ErrMessageIsRequest = errors.New("message is request")
ErrMessageIsNotRequest = errors.New("message is not request")
)
type syncAclHandler struct {
acl list.AclList
aclList list.AclList
syncClient SyncClient
syncProtocol AclSyncProtocol
syncStatus syncstatus.StatusUpdater
spaceId string
}
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
func newSyncAclHandler(spaceId string, aclList list.AclList, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncAclHandler{
aclList: aclList,
syncClient: syncClient,
syncProtocol: newAclSyncProtocol(spaceId, aclList, syncClient),
syncStatus: syncStatus,
spaceId: spaceId,
}
}
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(message.Payload, unmarshalled)
if err != nil {
return
}
content := unmarshalled.GetContent()
head := consensusproto.GetHead(unmarshalled)
s.syncStatus.HeadsReceive(senderId, s.aclList.Id(), []string{head})
s.aclList.Lock()
defer s.aclList.Unlock()
switch {
case content.GetHeadUpdate() != nil:
var syncReq *consensusproto.LogSyncMessage
syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate())
if err != nil || syncReq == nil {
return
}
return s.syncClient.QueueRequest(senderId, syncReq)
case content.GetFullSyncRequest() != nil:
return ErrMessageIsRequest
case content.GetFullSyncResponse() != nil:
return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
}
return
}
func (s *syncAclHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest()
if fullSyncRequest == nil {
return nil, ErrMessageIsNotRequest
}
s.aclList.Lock()
defer s.aclList.Unlock()
aclResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest)
if err != nil {
return
}
return spacesyncproto.MarshallSyncMessage(aclResp, s.spaceId, s.aclList.Id())
}

View File

@ -0,0 +1,233 @@
package syncacl
import (
"context"
"fmt"
"github.com/anyproto/any-sync/commonspace/object/acl/list/mock_list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/mock_syncacl"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sync"
"testing"
)
type testAclMock struct {
*mock_list.MockAclList
m sync.RWMutex
}
func newTestAclMock(mockAcl *mock_list.MockAclList) *testAclMock {
return &testAclMock{
MockAclList: mockAcl,
}
}
func (t *testAclMock) Lock() {
t.m.Lock()
}
func (t *testAclMock) RLock() {
t.m.RLock()
}
func (t *testAclMock) Unlock() {
t.m.Unlock()
}
func (t *testAclMock) RUnlock() {
t.m.RUnlock()
}
func (t *testAclMock) TryLock() bool {
return t.m.TryLock()
}
func (t *testAclMock) TryRLock() bool {
return t.m.TryRLock()
}
type syncHandlerFixture struct {
ctrl *gomock.Controller
syncClientMock *mock_syncacl.MockSyncClient
aclMock *testAclMock
syncProtocolMock *mock_syncacl.MockAclSyncProtocol
spaceId string
senderId string
aclId string
syncHandler *syncAclHandler
}
func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture {
ctrl := gomock.NewController(t)
aclMock := newTestAclMock(mock_list.NewMockAclList(ctrl))
syncClientMock := mock_syncacl.NewMockSyncClient(ctrl)
syncProtocolMock := mock_syncacl.NewMockAclSyncProtocol(ctrl)
spaceId := "spaceId"
syncHandler := &syncAclHandler{
aclList: aclMock,
syncClient: syncClientMock,
syncProtocol: syncProtocolMock,
syncStatus: syncstatus.NewNoOpSyncStatus(),
spaceId: spaceId,
}
return &syncHandlerFixture{
ctrl: ctrl,
syncClientMock: syncClientMock,
aclMock: aclMock,
syncProtocolMock: syncProtocolMock,
spaceId: spaceId,
senderId: "senderId",
aclId: "aclId",
syncHandler: syncHandler,
}
}
func (fx *syncHandlerFixture) stop() {
fx.ctrl.Finish()
}
func TestSyncAclHandler_HandleMessage(t *testing.T) {
ctx := context.Background()
t.Run("handle head update, request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
syncReq := &consensusproto.LogSyncMessage{}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(syncReq, nil)
fx.syncClientMock.EXPECT().QueueRequest(fx.senderId, syncReq).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
t.Run("handle head update, no request", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
t.Run("handle head update, returned error", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
expectedErr := fmt.Errorf("some error")
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, expectedErr)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.Error(t, expectedErr, err)
})
t.Run("handle full sync request is forbidden", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullRequest := &consensusproto.LogFullSyncRequest{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.Error(t, ErrMessageIsRequest, err)
})
t.Run("handle full sync response, no error", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullResponse := &consensusproto.LogFullSyncResponse{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapFullResponse(fullResponse, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.syncProtocolMock.EXPECT().FullSyncResponse(ctx, fx.senderId, gomock.Any()).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
}
func TestSyncAclHandler_HandleRequest(t *testing.T) {
ctx := context.Background()
t.Run("handle full sync request, no error", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
fullRequest := &consensusproto.LogFullSyncRequest{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fullResp := &consensusproto.LogSyncMessage{
Content: &consensusproto.LogSyncContentValue{
Value: &consensusproto.LogSyncContentValue_FullSyncResponse{
FullSyncResponse: &consensusproto.LogFullSyncResponse{
Head: "returnedHead",
},
},
},
}
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
fx.syncProtocolMock.EXPECT().FullSyncRequest(ctx, fx.senderId, gomock.Any()).Return(fullResp, nil)
res, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(res.Payload, unmarshalled)
if err != nil {
return
}
require.Equal(t, "returnedHead", consensusproto.GetHead(unmarshalled))
})
t.Run("handle other message returns error", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
chWithId := &consensusproto.RawRecordWithId{}
headUpdate := &consensusproto.LogHeadUpdate{
Head: "h1",
Records: []*consensusproto.RawRecordWithId{chWithId},
}
logMessage := consensusproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(logMessage, fx.spaceId, fx.aclId)
fx.aclMock.EXPECT().Id().AnyTimes().Return(fx.aclId)
_, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.Error(t, ErrMessageIsNotRequest, err)
})
}

View File

@ -0,0 +1,70 @@
package syncacl
import (
"context"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"go.uber.org/zap"
)
type SyncClient interface {
RequestFactory
Broadcast(msg *consensusproto.LogSyncMessage)
SendUpdate(peerId string, msg *consensusproto.LogSyncMessage) (err error)
QueueRequest(peerId string, msg *consensusproto.LogSyncMessage) (err error)
SendRequest(ctx context.Context, peerId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
}
type syncClient struct {
RequestFactory
spaceId string
requestManager requestmanager.RequestManager
peerManager peermanager.PeerManager
}
func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {
return &syncClient{
RequestFactory: &requestFactory{},
spaceId: spaceId,
requestManager: requestManager,
peerManager: peerManager,
}
}
func (s *syncClient) Broadcast(msg *consensusproto.LogSyncMessage) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
err = s.peerManager.Broadcast(context.Background(), objMsg)
if err != nil {
log.Debug("broadcast error", zap.Error(err))
}
}
func (s *syncClient) SendUpdate(peerId string, msg *consensusproto.LogSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.peerManager.SendPeer(context.Background(), peerId, objMsg)
}
func (s *syncClient) SendRequest(ctx context.Context, peerId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.requestManager.SendRequest(ctx, peerId, objMsg)
}
func (s *syncClient) QueueRequest(peerId string, msg *consensusproto.LogSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.requestManager.QueueRequest(peerId, objMsg)
}

View File

@ -13,7 +13,7 @@ import (
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockObjectTree is a mock of ObjectTree interface.

View File

@ -15,7 +15,7 @@ import (
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockSyncTree is a mock of SyncTree interface.

View File

@ -2,6 +2,7 @@ package synctree
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
@ -32,8 +33,9 @@ func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager,
peerManager: peerManager,
}
}
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.RootChange.Id)
if err != nil {
return
}
@ -44,7 +46,7 @@ func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
}
func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "")
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
@ -52,7 +54,7 @@ func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.Tr
}
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "")
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
@ -60,23 +62,9 @@ func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, m
}
func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "")
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
return s.requestManager.QueueRequest(peerId, objMsg)
}
func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {
return
}
objMsg = &spacesyncproto.ObjectSyncMessage{
ReplyId: replyId,
Payload: payload,
ObjectId: objectId,
SpaceId: spaceId,
}
return
}

View File

@ -11,8 +11,8 @@ import (
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)

View File

@ -81,7 +81,7 @@ func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fu
if err != nil {
return
}
response, err = MarshallTreeMessage(treeResp, s.spaceId, s.objTree.Id(), "")
response, err = spacesyncproto.MarshallSyncMessage(treeResp, s.spaceId, s.objTree.Id())
return
}

View File

@ -8,9 +8,10 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
type testObjTreeMock struct {
@ -103,7 +104,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncReq := &treechangeproto.TreeSyncMessage{}
fx.syncHandler.heads = []string{"h2"}
@ -127,7 +128,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h1"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h1"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -136,7 +137,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
require.NoError(t, err)
})
t.Run("handle head update message, empty sync request returned", func(t *testing.T) {
t.Run("handle head update message, no sync request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
@ -145,7 +146,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -167,7 +168,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -186,7 +187,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -209,7 +210,7 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) {
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncResp := &treechangeproto.TreeSyncMessage{}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -230,7 +231,7 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) {
headUpdate := &treechangeproto.TreeHeadUpdate{}
headUpdateMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
for _, msg := range []*treechangeproto.TreeSyncMessage{responseMsg, headUpdateMsg} {
objectMsg, _ := MarshallTreeMessage(msg, "spaceId", treeId, "")
objectMsg, _ := spacesyncproto.MarshallSyncMessage(msg, "spaceId", treeId)
_, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.Equal(t, err, ErrMessageIsNotRequest)

View File

@ -12,8 +12,8 @@ import (
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/gogo/protobuf/proto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
type treeRemoteGetterFixture struct {

View File

@ -60,7 +60,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
log.DebugCtx(ctx, "is empty update", zap.Bool("headEquals", headEquals))
if headEquals {
return
}
@ -70,7 +70,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
return
}
if t.alreadyHasHeads(objTree, update.Heads) {
if t.hasHeads(objTree, update.Heads) {
return
}
@ -82,7 +82,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
return
}
if t.alreadyHasHeads(objTree, update.Heads) {
if t.hasHeads(objTree, update.Heads) {
return
}
@ -109,7 +109,7 @@ func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string,
}
}()
if len(request.Changes) != 0 && !t.alreadyHasHeads(objTree, request.Heads) {
if len(request.Changes) != 0 && !t.hasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
@ -137,7 +137,7 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
log.DebugCtx(ctx, "full sync response succeeded")
}
}()
if t.alreadyHasHeads(objTree, response.Heads) {
if t.hasHeads(objTree, response.Heads) {
return
}
@ -148,6 +148,6 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
return
}
func (t *treeSyncProtocol) alreadyHasHeads(ot objecttree.ObjectTree, heads []string) bool {
func (t *treeSyncProtocol) hasHeads(ot objecttree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...)
}

View File

@ -8,8 +8,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)

View File

@ -8,7 +8,7 @@ import (
reflect "reflect"
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockUpdateListener is a mock of UpdateListener interface.

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockTreeStorage is a mock of TreeStorage interface.

View File

@ -11,7 +11,7 @@ import (
app "github.com/anyproto/any-sync/app"
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
treemanager "github.com/anyproto/any-sync/commonspace/object/treemanager"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockTreeManager is a mock of TreeManager interface.

View File

@ -41,7 +41,7 @@ func (o *objectManager) Init(a *app.App) (err error) {
o.spaceId = state.SpaceId
o.spaceIsClosed = state.SpaceIsClosed
settingsObject := a.MustComponent(settings.CName).(settings.Settings).SettingsObject()
acl := a.MustComponent(syncacl.CName).(*syncacl.SyncAcl)
acl := a.MustComponent(syncacl.CName).(syncacl.SyncAcl)
o.AddObject(settingsObject)
o.AddObject(acl)
return nil

View File

@ -12,7 +12,7 @@ import (
app "github.com/anyproto/any-sync/app"
objectsync "github.com/anyproto/any-sync/commonspace/objectsync"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockObjectSync is a mock of ObjectSync interface.

View File

@ -12,7 +12,7 @@ import (
updatelistener "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
objecttreebuilder "github.com/anyproto/any-sync/commonspace/objecttreebuilder"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockTreeBuilder is a mock of TreeBuilder interface.

View File

@ -82,7 +82,7 @@ func (t *treeBuilder) Init(a *app.App) (err error) {
t.isClosed = state.SpaceIsClosed
t.treesUsed = state.TreesUsed
t.builder = state.TreeBuilderFunc
t.aclList = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl)
t.aclList = a.MustComponent(syncacl.CName).(syncacl.SyncAcl)
t.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync)

View File

@ -11,7 +11,7 @@ import (
app "github.com/anyproto/any-sync/app"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
peer "github.com/anyproto/any-sync/net/peer"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockPeerManager is a mock of PeerManager interface.

View File

@ -12,8 +12,8 @@ import (
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/anyproto/any-sync/net/pool/mock_pool"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
)

View File

@ -6,7 +6,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
"github.com/golang/mock/gomock"
"go.uber.org/mock/gomock"
"testing"
)

View File

@ -6,8 +6,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
"github.com/anyproto/any-sync/commonspace/settings/mock_settings"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
settingsstate "github.com/anyproto/any-sync/commonspace/settings/settingsstate"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockDeletionManager is a mock of DeletionManager interface.

View File

@ -16,8 +16,8 @@ import (
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate/mock_settingsstate"
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sync"
"testing"
"time"

View File

@ -9,7 +9,7 @@ import (
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
settingsstate "github.com/anyproto/any-sync/commonspace/settings/settingsstate"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockStateBuilder is a mock of StateBuilder interface.

View File

@ -4,8 +4,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)

View File

@ -59,6 +59,7 @@ func NewSpaceId(id string, repKey uint64) string {
type Space interface {
Id() string
Init(ctx context.Context) error
Acl() list.AclList
StoredIds() []string
DebugAllHeads() []headsync.TreeHeads
@ -153,6 +154,10 @@ func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
return s.treeBuilder
}
func (s *space) Acl() list.AclList {
return s.aclList
}
func (s *space) Id() string {
return s.state.SpaceId
}

View File

@ -13,7 +13,7 @@ import (
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockSpaceStorage is a mock of SpaceStorage interface.

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
drpc "storj.io/drpc"
)

View File

@ -2,6 +2,7 @@
package spacesyncproto
import (
"github.com/gogo/protobuf/proto"
"storj.io/drpc"
)
@ -16,3 +17,16 @@ func (c ClientFactoryFunc) Client(cc drpc.Conn) DRPCSpaceSyncClient {
type ClientFactory interface {
Client(cc drpc.Conn) DRPCSpaceSyncClient
}
func MarshallSyncMessage(message proto.Marshaler, spaceId, objectId string) (objMsg *ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {
return
}
objMsg = &ObjectSyncMessage{
Payload: payload,
ObjectId: objectId,
SpaceId: spaceId,
}
return
}

View File

@ -36,13 +36,13 @@ type Watcher interface {
type Service interface {
// AddLog adds new log to consensus servers
AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
AddLog(ctx context.Context, rec *consensusproto.RawRecordWithId) (err error)
// AddRecord adds new record to consensus servers
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error)
AddRecord(ctx context.Context, logId string, rec *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error)
// Watch starts watching to given logId and calls watcher when any relative event received
Watch(logId []byte, w Watcher) (err error)
Watch(logId string, w Watcher) (err error)
// UnWatch stops watching given logId and removes watcher
UnWatch(logId []byte) (err error)
UnWatch(logId string) (err error)
app.ComponentRunnable
}
@ -86,10 +86,10 @@ func (s *service) doClient(ctx context.Context, fn func(cl consensusproto.DRPCCo
return fn(consensusproto.NewDRPCConsensusClient(dc))
}
func (s *service) AddLog(ctx context.Context, clog *consensusproto.Log) (err error) {
func (s *service) AddLog(ctx context.Context, rec *consensusproto.RawRecordWithId) (err error) {
return s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error {
if _, err = cl.LogAdd(ctx, &consensusproto.LogAddRequest{
Log: clog,
Record: rec,
}); err != nil {
return rpcerr.Unwrap(err)
}
@ -97,11 +97,11 @@ func (s *service) AddLog(ctx context.Context, clog *consensusproto.Log) (err err
})
}
func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error) {
func (s *service) AddRecord(ctx context.Context, logId string, rec *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error) {
err = s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error {
if record, err = cl.RecordAdd(ctx, &consensusproto.RecordAddRequest{
LogId: logId,
Record: clog,
Record: rec,
}); err != nil {
return rpcerr.Unwrap(err)
}
@ -110,30 +110,30 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr
return
}
func (s *service) Watch(logId []byte, w Watcher) (err error) {
func (s *service) Watch(logId string, w Watcher) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; ok {
if _, ok := s.watchers[logId]; ok {
return ErrWatcherExists
}
s.watchers[string(logId)] = w
s.watchers[logId] = w
if s.stream != nil {
if wErr := s.stream.WatchIds([][]byte{logId}); wErr != nil {
if wErr := s.stream.WatchIds([]string{logId}); wErr != nil {
log.Warn("WatchIds error", zap.Error(wErr))
}
}
return
}
func (s *service) UnWatch(logId []byte) (err error) {
func (s *service) UnWatch(logId string) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; !ok {
if _, ok := s.watchers[logId]; !ok {
return ErrWatcherNotExists
}
delete(s.watchers, string(logId))
delete(s.watchers, logId)
if s.stream != nil {
if wErr := s.stream.UnwatchIds([][]byte{logId}); wErr != nil {
if wErr := s.stream.UnwatchIds([]string{logId}); wErr != nil {
log.Warn("UnWatchIds error", zap.Error(wErr))
}
}
@ -182,9 +182,9 @@ func (s *service) streamWatcher() {
// collect ids and setup stream
s.mu.Lock()
var logIds = make([][]byte, 0, len(s.watchers))
var logIds = make([]string, 0, len(s.watchers))
for id := range s.watchers {
logIds = append(logIds, []byte(id))
logIds = append(logIds, id)
}
s.stream = st
s.mu.Unlock()
@ -212,17 +212,19 @@ func (s *service) streamReader() error {
if len(events) == 0 {
return s.stream.Err()
}
s.mu.Lock()
for _, e := range events {
if w, ok := s.watchers[string(e.LogId)]; ok {
if w, ok := s.watchers[e.LogId]; ok {
if e.Error == nil {
w.AddConsensusRecords(e.Records)
} else {
w.AddConsensusError(rpcerr.Err(uint64(e.Error.Error)))
}
} else {
log.Warn("received unexpected log id", zap.Binary("logId", e.LogId))
log.Warn("received unexpected log id", zap.String("logId", e.LogId))
}
}
s.mu.Unlock()
}
}

View File

@ -12,9 +12,9 @@ import (
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/util/cidutil"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sync"
"testing"
"time"
@ -24,13 +24,13 @@ func TestService_Watch(t *testing.T) {
t.Run("not found error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
var logId = "1"
w := &testWatcher{ready: make(chan struct{})}
require.NoError(t, fx.Watch(logId, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId}, req.WatchIds)
assert.Equal(t, []string{logId}, req.WatchIds)
require.NoError(t, st.Send(&consensusproto.LogWatchEvent{
LogId: logId,
Error: &consensusproto.Err{
@ -44,7 +44,7 @@ func TestService_Watch(t *testing.T) {
t.Run("watcherExists error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
var logId = "1"
w := &testWatcher{}
require.NoError(t, fx.Watch(logId, w))
require.Error(t, fx.Watch(logId, w))
@ -55,20 +55,20 @@ func TestService_Watch(t *testing.T) {
t.Run("watch", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId1 = []byte{'1'}
var logId1 = "1"
w := &testWatcher{}
require.NoError(t, fx.Watch(logId1, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId1}, req.WatchIds)
assert.Equal(t, []string{logId1}, req.WatchIds)
var logId2 = []byte{'2'}
var logId2 = "2"
w = &testWatcher{}
require.NoError(t, fx.Watch(logId2, w))
req, err = st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId2}, req.WatchIds)
assert.Equal(t, []string{logId2}, req.WatchIds)
fx.testServer.releaseStream <- nil
})
@ -78,14 +78,14 @@ func TestService_UnWatch(t *testing.T) {
t.Run("no watcher", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
require.Error(t, fx.UnWatch([]byte{'1'}))
require.Error(t, fx.UnWatch("1"))
})
t.Run("success", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
w := &testWatcher{}
require.NoError(t, fx.Watch([]byte{'1'}, w))
assert.NoError(t, fx.UnWatch([]byte{'1'}))
require.NoError(t, fx.Watch("1", w))
assert.NoError(t, fx.UnWatch("1"))
})
}
@ -113,13 +113,13 @@ func TestService_Init(t *testing.T) {
func TestService_AddLog(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
assert.NoError(t, fx.AddLog(ctx, &consensusproto.Log{}))
assert.NoError(t, fx.AddLog(ctx, &consensusproto.RawRecordWithId{}))
}
func TestService_AddRecord(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
rec, err := fx.AddRecord(ctx, []byte{'1'}, &consensusproto.RawRecord{})
rec, err := fx.AddRecord(ctx, "1", &consensusproto.RawRecord{})
require.NoError(t, err)
assert.NotEmpty(t, rec)
}

View File

@ -11,7 +11,7 @@ import (
app "github.com/anyproto/any-sync/app"
consensusclient "github.com/anyproto/any-sync/consensus/consensusclient"
consensusproto "github.com/anyproto/any-sync/consensus/consensusproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockService is a mock of Service interface.
@ -38,7 +38,7 @@ func (m *MockService) EXPECT() *MockServiceMockRecorder {
}
// AddLog mocks base method.
func (m *MockService) AddLog(arg0 context.Context, arg1 *consensusproto.Log) error {
func (m *MockService) AddLog(arg0 context.Context, arg1 *consensusproto.RawRecordWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddLog", arg0, arg1)
ret0, _ := ret[0].(error)
@ -52,7 +52,7 @@ func (mr *MockServiceMockRecorder) AddLog(arg0, arg1 interface{}) *gomock.Call {
}
// AddRecord mocks base method.
func (m *MockService) AddRecord(arg0 context.Context, arg1 []byte, arg2 *consensusproto.RawRecord) (*consensusproto.RawRecordWithId, error) {
func (m *MockService) AddRecord(arg0 context.Context, arg1 string, arg2 *consensusproto.RawRecord) (*consensusproto.RawRecordWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRecord", arg0, arg1, arg2)
ret0, _ := ret[0].(*consensusproto.RawRecordWithId)
@ -123,7 +123,7 @@ func (mr *MockServiceMockRecorder) Run(arg0 interface{}) *gomock.Call {
}
// UnWatch mocks base method.
func (m *MockService) UnWatch(arg0 []byte) error {
func (m *MockService) UnWatch(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UnWatch", arg0)
ret0, _ := ret[0].(error)
@ -137,7 +137,7 @@ func (mr *MockServiceMockRecorder) UnWatch(arg0 interface{}) *gomock.Call {
}
// Watch mocks base method.
func (m *MockService) Watch(arg0 []byte, arg1 consensusclient.Watcher) error {
func (m *MockService) Watch(arg0 string, arg1 consensusclient.Watcher) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Watch", arg0, arg1)
ret0, _ := ret[0].(error)

View File

@ -23,13 +23,13 @@ type stream struct {
err error
}
func (s *stream) WatchIds(logIds [][]byte) (err error) {
func (s *stream) WatchIds(logIds []string) (err error) {
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
WatchIds: logIds,
})
}
func (s *stream) UnwatchIds(logIds [][]byte) (err error) {
func (s *stream) UnwatchIds(logIds []string) (err error) {
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
UnwatchIds: logIds,
})

View File

@ -0,0 +1,45 @@
package consensusproto
func WrapHeadUpdate(update *LogHeadUpdate, rootRecord *RawRecordWithId) *LogSyncMessage {
return &LogSyncMessage{
Content: &LogSyncContentValue{
Value: &LogSyncContentValue_HeadUpdate{HeadUpdate: update},
},
Id: rootRecord.Id,
Payload: rootRecord.Payload,
}
}
func WrapFullRequest(request *LogFullSyncRequest, rootRecord *RawRecordWithId) *LogSyncMessage {
return &LogSyncMessage{
Content: &LogSyncContentValue{
Value: &LogSyncContentValue_FullSyncRequest{FullSyncRequest: request},
},
Id: rootRecord.Id,
Payload: rootRecord.Payload,
}
}
func WrapFullResponse(response *LogFullSyncResponse, rootRecord *RawRecordWithId) *LogSyncMessage {
return &LogSyncMessage{
Content: &LogSyncContentValue{
Value: &LogSyncContentValue_FullSyncResponse{FullSyncResponse: response},
},
Id: rootRecord.Id,
Payload: rootRecord.Payload,
}
}
func GetHead(msg *LogSyncMessage) (head string) {
content := msg.GetContent()
switch {
case content.GetHeadUpdate() != nil:
return content.GetHeadUpdate().Head
case content.GetFullSyncRequest() != nil:
return content.GetFullSyncRequest().Head
case content.GetFullSyncResponse() != nil:
return content.GetFullSyncResponse().Head
default:
return ""
}
}

File diff suppressed because it is too large Load Diff

View File

@ -9,8 +9,10 @@ import (
var (
errGroup = rpcerr.ErrGroup(consensusproto.ErrCodes_ErrorOffset)
ErrUnexpected = errGroup.Register(fmt.Errorf("unexpected consensus error"), uint64(consensusproto.ErrCodes_Unexpected))
ErrConflict = errGroup.Register(fmt.Errorf("records conflict"), uint64(consensusproto.ErrCodes_RecordConflict))
ErrLogExists = errGroup.Register(fmt.Errorf("log exists"), uint64(consensusproto.ErrCodes_LogExists))
ErrLogNotFound = errGroup.Register(fmt.Errorf("log not found"), uint64(consensusproto.ErrCodes_LogNotFound))
ErrUnexpected = errGroup.Register(fmt.Errorf("unexpected consensus error"), uint64(consensusproto.ErrCodes_Unexpected))
ErrConflict = errGroup.Register(fmt.Errorf("records conflict"), uint64(consensusproto.ErrCodes_RecordConflict))
ErrLogExists = errGroup.Register(fmt.Errorf("log exists"), uint64(consensusproto.ErrCodes_LogExists))
ErrLogNotFound = errGroup.Register(fmt.Errorf("log not found"), uint64(consensusproto.ErrCodes_LogNotFound))
ErrForbidden = errGroup.Register(fmt.Errorf("forbidden"), uint64(consensusproto.ErrCodes_Forbidden))
ErrInvalidPayload = errGroup.Register(fmt.Errorf("invalid payload"), uint64(consensusproto.ErrCodes_InvalidPayload))
)

View File

@ -8,13 +8,14 @@ enum ErrCodes {
LogExists = 1;
LogNotFound = 2;
RecordConflict = 3;
Forbidden = 4;
InvalidPayload = 5;
ErrorOffset = 500;
}
message Log {
bytes id = 1;
bytes payload = 2;
string id = 1;
repeated RawRecordWithId records = 3;
}
@ -53,25 +54,60 @@ service Consensus {
message Ok {}
message LogAddRequest {
Log log = 1;
// first record in the log, consensus node not sign it
RawRecordWithId record = 1;
}
message RecordAddRequest {
bytes logId = 1;
string logId = 1;
RawRecord record = 2;
}
message LogWatchRequest {
repeated bytes watchIds = 1;
repeated bytes unwatchIds = 2;
repeated string watchIds = 1;
repeated string unwatchIds = 2;
}
message LogWatchEvent {
bytes logId = 1;
string logId = 1;
repeated RawRecordWithId records = 2;
Err error = 3;
}
message Err {
ErrCodes error = 1;
}
}
// LogSyncContentValue provides different types for log sync
message LogSyncContentValue {
oneof value {
LogHeadUpdate headUpdate = 1;
LogFullSyncRequest fullSyncRequest = 2;
LogFullSyncResponse fullSyncResponse = 3;
}
}
// LogSyncMessage is a message sent when we are syncing logs
message LogSyncMessage {
string id = 1;
bytes payload = 2;
LogSyncContentValue content = 3;
}
// LogHeadUpdate is a message sent on consensus log head update
message LogHeadUpdate {
string head = 1;
repeated RawRecordWithId records = 2;
}
// LogFullSyncRequest is a message sent when consensus log needs full sync
message LogFullSyncRequest {
string head = 1;
repeated RawRecordWithId records = 2;
}
// LogFullSyncResponse is a message sent as a response for a specific full sync
message LogFullSyncResponse {
string head = 1;
repeated RawRecordWithId records = 2;
}

View File

@ -12,7 +12,7 @@ import (
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
coordinatorclient "github.com/anyproto/any-sync/coordinator/coordinatorclient"
coordinatorproto "github.com/anyproto/any-sync/coordinator/coordinatorproto"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockCoordinatorClient is a mock of CoordinatorClient interface.

2
go.mod
View File

@ -12,7 +12,6 @@ require (
github.com/gobwas/glob v0.2.3
github.com/goccy/go-graphviz v0.1.1
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/hashicorp/yamux v0.1.1
github.com/huandu/skiplist v1.2.0
@ -34,6 +33,7 @@ require (
github.com/tyler-smith/go-bip39 v1.1.0
github.com/zeebo/blake3 v0.2.3
go.uber.org/atomic v1.11.0
go.uber.org/mock v0.2.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.11.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1

4
go.sum
View File

@ -54,7 +54,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
@ -281,6 +280,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@ -365,7 +366,6 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
time "time"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
drpc "storj.io/drpc"
)

View File

@ -6,9 +6,9 @@ import (
"github.com/anyproto/any-sync/net/secureservice/handshake"
"github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto"
"github.com/anyproto/any-sync/net/transport/mock_transport"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"io"
"net"
_ "net/http/pprof"

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
peer "github.com/anyproto/any-sync/net/peer"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockPool is a mock of Pool interface.

View File

@ -9,9 +9,9 @@ import (
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/testnodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"net"
"testing"
)

View File

@ -10,7 +10,7 @@ import (
reflect "reflect"
transport "github.com/anyproto/any-sync/net/transport"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockTransport is a mock of Transport interface.

View File

@ -10,9 +10,9 @@ import (
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/testutil/testnodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"io"
"net"
"sync"

View File

@ -11,7 +11,7 @@ import (
app "github.com/anyproto/any-sync/app"
nodeconf "github.com/anyproto/any-sync/nodeconf"
chash "github.com/anyproto/go-chash"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockService is a mock of Service interface.

View File

@ -7,7 +7,7 @@ package mock_periodicsync
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockPeriodicSync is a mock of PeriodicSync interface.

View File

@ -3,8 +3,8 @@ package periodicsync
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
"time"
)