Add sync protocol interfaces
This commit is contained in:
parent
b10d72a092
commit
51ac955f1c
@ -55,6 +55,7 @@ type AclList interface {
|
|||||||
|
|
||||||
ValidateRawRecord(record *consensusproto.RawRecord) (err error)
|
ValidateRawRecord(record *consensusproto.RawRecord) (err error)
|
||||||
AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error)
|
AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error)
|
||||||
|
AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error)
|
||||||
|
|
||||||
Close() (err error)
|
Close() (err error)
|
||||||
}
|
}
|
||||||
@ -195,6 +196,16 @@ func (a *aclList) ValidateRawRecord(rawRec *consensusproto.RawRecord) (err error
|
|||||||
return a.aclState.Validator().ValidateAclRecordContents(record)
|
return a.aclState.Validator().ValidateAclRecordContents(record)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) {
|
||||||
|
for _, rec := range rawRecords {
|
||||||
|
err = a.AddRawRecord(rec)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (a *aclList) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) {
|
func (a *aclList) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) {
|
||||||
if _, ok := a.indexes[rawRec.Id]; ok {
|
if _, ok := a.indexes[rawRec.Id]; ok {
|
||||||
return ErrRecordAlreadyExists
|
return ErrRecordAlreadyExists
|
||||||
|
|||||||
44
commonspace/object/acl/syncacl/aclsyncprotocol.go
Normal file
44
commonspace/object/acl/syncacl/aclsyncprotocol.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package syncacl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
|
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AclSyncProtocol interface {
|
||||||
|
HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error)
|
||||||
|
FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error)
|
||||||
|
FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type aclSyncProtocol struct {
|
||||||
|
log logger.CtxLogger
|
||||||
|
spaceId string
|
||||||
|
aclList list.AclList
|
||||||
|
reqFactory RequestFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *aclSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *aclSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *aclSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAclSyncProtocol(spaceId string, aclList list.AclList, reqFactory RequestFactory) *aclSyncProtocol {
|
||||||
|
return &aclSyncProtocol{
|
||||||
|
log: log.With(zap.String("spaceId", spaceId), zap.String("aclId", aclList.Id())),
|
||||||
|
spaceId: spaceId,
|
||||||
|
aclList: aclList,
|
||||||
|
reqFactory: reqFactory,
|
||||||
|
}
|
||||||
|
}
|
||||||
30
commonspace/object/acl/syncacl/requestfactory.go
Normal file
30
commonspace/object/acl/syncacl/requestfactory.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package syncacl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
|
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RequestFactory interface {
|
||||||
|
CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage)
|
||||||
|
CreateFullSyncRequest(theirHead string) (req *consensusproto.LogSyncMessage, err error)
|
||||||
|
CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRequestFactory() RequestFactory {
|
||||||
|
return &requestFactory{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type requestFactory struct{}
|
||||||
|
|
||||||
|
func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *requestFactory) CreateFullSyncRequest(theirHead string) (req *consensusproto.LogSyncMessage, err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *requestFactory) CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
@ -2,30 +2,46 @@ package syncacl
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/accountservice"
|
"github.com/anyproto/any-sync/accountservice"
|
||||||
"github.com/anyproto/any-sync/app"
|
"github.com/anyproto/any-sync/app"
|
||||||
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/requestmanager"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||||
|
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CName = "common.acl.syncacl"
|
const CName = "common.acl.syncacl"
|
||||||
|
|
||||||
|
var (
|
||||||
|
log = logger.NewNamed(CName)
|
||||||
|
|
||||||
|
ErrSyncAclClosed = errors.New("sync acl is closed")
|
||||||
|
)
|
||||||
|
|
||||||
func New() *SyncAcl {
|
func New() *SyncAcl {
|
||||||
return &SyncAcl{}
|
return &SyncAcl{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncAcl struct {
|
type SyncAcl struct {
|
||||||
list.AclList
|
list.AclList
|
||||||
|
syncClient SyncClient
|
||||||
|
syncHandler synchandler.SyncHandler
|
||||||
|
isClosed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
return nil, nil
|
return s.HandleRequest(ctx, senderId, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
return nil
|
return s.HandleMessage(ctx, senderId, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncAcl) Init(a *app.App) (err error) {
|
func (s *SyncAcl) Init(a *app.App) (err error) {
|
||||||
@ -36,9 +52,58 @@ func (s *SyncAcl) Init(a *app.App) (err error) {
|
|||||||
}
|
}
|
||||||
acc := a.MustComponent(accountservice.CName).(accountservice.Service)
|
acc := a.MustComponent(accountservice.CName).(accountservice.Service)
|
||||||
s.AclList, err = list.BuildAclListWithIdentity(acc.Account(), aclStorage, list.NoOpAcceptorVerifier{})
|
s.AclList, err = list.BuildAclListWithIdentity(acc.Account(), aclStorage, list.NoOpAcceptorVerifier{})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
spaceId := storage.Id()
|
||||||
|
requestManager := a.MustComponent(requestmanager.CName).(requestmanager.RequestManager)
|
||||||
|
peerManager := a.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
||||||
|
syncStatus := a.MustComponent(syncstatus.CName).(syncstatus.StatusService)
|
||||||
|
s.syncClient = NewSyncClient(spaceId, requestManager, peerManager)
|
||||||
|
s.syncHandler = newSyncAclHandler(storage.Id(), s, s.syncClient, syncStatus)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SyncAcl) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err error) {
|
||||||
|
if s.isClosed {
|
||||||
|
return ErrSyncAclClosed
|
||||||
|
}
|
||||||
|
err = s.AclList.AddRawRecord(rawRec)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
headUpdate := s.syncClient.CreateHeadUpdate(s, []*consensusproto.RawRecordWithId{rawRec})
|
||||||
|
s.syncClient.Broadcast(headUpdate)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) {
|
||||||
|
if s.isClosed {
|
||||||
|
return ErrSyncAclClosed
|
||||||
|
}
|
||||||
|
err = s.AclList.AddRawRecords(rawRecords)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
headUpdate := s.syncClient.CreateHeadUpdate(s, rawRecords)
|
||||||
|
s.syncClient.Broadcast(headUpdate)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||||
|
return s.syncClient.SendUpdate(peerId, s.Id(), headUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyncAcl) Close() (err error) {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
s.isClosed = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SyncAcl) Name() (name string) {
|
func (s *SyncAcl) Name() (name string) {
|
||||||
return CName
|
return CName
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,13 +4,33 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type syncAclHandler struct {
|
type syncAclHandler struct {
|
||||||
acl list.AclList
|
aclList list.AclList
|
||||||
|
syncClient SyncClient
|
||||||
|
syncProtocol AclSyncProtocol
|
||||||
|
syncStatus syncstatus.StatusUpdater
|
||||||
|
spaceId string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncAclHandler(spaceId string, aclList list.AclList, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
|
||||||
|
return &syncAclHandler{
|
||||||
|
aclList: aclList,
|
||||||
|
syncClient: syncClient,
|
||||||
|
syncProtocol: newAclSyncProtocol(spaceId, aclList, syncClient),
|
||||||
|
syncStatus: syncStatus,
|
||||||
|
spaceId: spaceId,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
return nil
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncAclHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
49
commonspace/object/acl/syncacl/syncclient.go
Normal file
49
commonspace/object/acl/syncacl/syncclient.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package syncacl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/requestmanager"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncClient interface {
|
||||||
|
RequestFactory
|
||||||
|
Broadcast(msg *consensusproto.LogSyncMessage)
|
||||||
|
SendUpdate(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error)
|
||||||
|
QueueRequest(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error)
|
||||||
|
SendRequest(ctx context.Context, peerId, objectId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type syncClient struct {
|
||||||
|
RequestFactory
|
||||||
|
spaceId string
|
||||||
|
requestManager requestmanager.RequestManager
|
||||||
|
peerManager peermanager.PeerManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncClient) Broadcast(msg *consensusproto.LogSyncMessage) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncClient) SendUpdate(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncClient) QueueRequest(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {
|
||||||
|
return &syncClient{
|
||||||
|
RequestFactory: &requestFactory{},
|
||||||
|
spaceId: spaceId,
|
||||||
|
requestManager: requestManager,
|
||||||
|
peerManager: peerManager,
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package synctree
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
"github.com/anyproto/any-sync/commonspace/peermanager"
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
||||||
"github.com/anyproto/any-sync/commonspace/requestmanager"
|
"github.com/anyproto/any-sync/commonspace/requestmanager"
|
||||||
@ -32,6 +33,7 @@ func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager,
|
|||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
|
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
|
||||||
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
|
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@ -75,3 +75,36 @@ message LogWatchEvent {
|
|||||||
message Err {
|
message Err {
|
||||||
ErrCodes error = 1;
|
ErrCodes error = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LogSyncContentValue provides different types for log sync
|
||||||
|
message LogSyncContentValue {
|
||||||
|
oneof value {
|
||||||
|
LogHeadUpdate headUpdate = 1;
|
||||||
|
LogFullSyncRequest fullSyncRequest = 2;
|
||||||
|
LogFullSyncResponse fullSyncResponse = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogSyncMessage is a message sent when we are syncing logs
|
||||||
|
message LogSyncMessage {
|
||||||
|
string id = 1;
|
||||||
|
string payload = 2;
|
||||||
|
LogSyncContentValue content = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogHeadUpdate is a message sent on consensus log head update
|
||||||
|
message LogHeadUpdate {
|
||||||
|
string head = 1;
|
||||||
|
repeated RawRecordWithId records = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogFullSyncRequest is a message sent when consensus log needs full sync
|
||||||
|
message LogFullSyncRequest {
|
||||||
|
string head = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogFullSyncResponse is a message sent as a response for a specific full sync
|
||||||
|
message LogFullSyncResponse {
|
||||||
|
string head = 1;
|
||||||
|
repeated RawRecordWithId records = 2;
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user