Prepare components for testing

This commit is contained in:
mcrakhman 2022-09-30 09:28:13 +02:00 committed by Mikhail Iudin
parent a775006e2b
commit c234046f91
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
6 changed files with 234 additions and 190 deletions

View File

@ -38,9 +38,11 @@ func NewDiffService(
conf nodeconf.Configuration,
cache cache.TreeCache,
log *zap.Logger) DiffService {
diff := ldiff.New(16, 16)
l := log.With(zap.String("spaceId", spaceId))
syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, l)
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient)
syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, factory, l)
periodicSync := newPeriodicSync(syncPeriod, syncer, l)
return &diffService{

View File

@ -24,6 +24,7 @@ func newDiffSyncer(
nconf nodeconf.Configuration,
cache cache.TreeCache,
storage storage.SpaceStorage,
clientFactory spacesyncproto.ClientFactory,
log *zap.Logger) DiffSyncer {
return &diffSyncer{
diff: diff,
@ -36,12 +37,13 @@ func newDiffSyncer(
}
type diffSyncer struct {
diff ldiff.Diff
nconf nodeconf.Configuration
spaceId string
cache cache.TreeCache
storage storage.SpaceStorage
log *zap.Logger
spaceId string
diff ldiff.Diff
nconf nodeconf.Configuration
cache cache.TreeCache
storage storage.SpaceStorage
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
}
func (d *diffSyncer) Sync(ctx context.Context) error {
@ -61,7 +63,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
}
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
cl := spacesyncproto.NewDRPCSpaceClient(p)
cl := d.clientFactory.Client(p)
rdiff := remotediff.NewRemoteDiff(d.spaceId, cl)
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
err = rpcerr.Unwrap(err)

View File

@ -0,0 +1,193 @@
package commonspace
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"hash/fnv"
"math/rand"
"time"
)
func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload storage.SpaceStorageCreatePayload, err error) {
// unmarshalling signing and encryption keys
identity, err := payload.SigningKey.GetPublic().Raw()
if err != nil {
return
}
encPubKey, err := payload.EncryptionKey.GetPublic().Raw()
if err != nil {
return
}
// preparing header and space id
bytes := make([]byte, 32)
_, err = rand.Read(bytes)
if err != nil {
return
}
header := &spacesyncproto.SpaceHeader{
Identity: identity,
Timestamp: time.Now().UnixNano(),
SpaceType: payload.SpaceType,
ReplicationKey: payload.ReplicationKey,
Seed: bytes,
}
marshalled, err := header.Marshal()
if err != nil {
return
}
id, err := cid.NewCIDFromBytes(marshalled)
if err != nil {
return
}
spaceId := NewSpaceId(id, payload.ReplicationKey)
// encrypting read key
hasher := fnv.New64()
_, err = hasher.Write(payload.ReadKey)
if err != nil {
return
}
readKeyHash := hasher.Sum64()
encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(payload.ReadKey)
if err != nil {
return
}
// preparing acl
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey)
if err != nil {
return
}
// creating storage
storagePayload = storage.SpaceStorageCreatePayload{
RecWithId: rawWithId,
SpaceHeader: header,
Id: id,
}
return
}
func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload storage.SpaceStorageCreatePayload, err error) {
// unmarshalling signing and encryption keys
identity, err := payload.SigningKey.GetPublic().Raw()
if err != nil {
return
}
signPrivKey, err := payload.SigningKey.Raw()
if err != nil {
return
}
encPubKey, err := payload.EncryptionKey.GetPublic().Raw()
if err != nil {
return
}
encPrivKey, err := payload.EncryptionKey.Raw()
if err != nil {
return
}
// preparing replication key
hasher := fnv.New64()
_, err = hasher.Write(identity)
if err != nil {
return
}
repKey := hasher.Sum64()
// preparing header and space id
header := &spacesyncproto.SpaceHeader{
Identity: identity,
SpaceType: SpaceTypeDerived,
ReplicationKey: repKey,
}
marshalled, err := header.Marshal()
if err != nil {
return
}
id, err := cid.NewCIDFromBytes(marshalled)
if err != nil {
return
}
spaceId := NewSpaceId(id, repKey)
// deriving and encrypting read key
readKey, err := aclrecordproto.ACLReadKeyDerive(signPrivKey, encPrivKey)
if err != nil {
return
}
hasher = fnv.New64()
_, err = hasher.Write(readKey.Bytes())
if err != nil {
return
}
readKeyHash := hasher.Sum64()
encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(readKey.Bytes())
if err != nil {
return
}
// preparing acl
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey)
if err != nil {
return
}
// creating storage
storagePayload = storage.SpaceStorageCreatePayload{
RecWithId: rawWithId,
SpaceHeader: header,
Id: id,
}
return
}
func marshalACLRoot(aclRoot *aclrecordproto.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto.RawACLRecordWithId, err error) {
marshalledRoot, err := aclRoot.Marshal()
if err != nil {
return
}
signature, err := key.Sign(marshalledRoot)
if err != nil {
return
}
raw := &aclrecordproto.RawACLRecord{
Payload: marshalledRoot,
Signature: signature,
}
marshalledRaw, err := raw.Marshal()
if err != nil {
return
}
aclHeadId, err := cid.NewCIDFromBytes(marshalledRaw)
if err != nil {
return
}
rawWithId = &aclrecordproto.RawACLRecordWithId{
Payload: marshalledRaw,
Id: aclHeadId,
}
return
}

View File

@ -6,17 +6,10 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"hash/fnv"
"math/rand"
"time"
)
const CName = "common.commonspace"
@ -57,171 +50,32 @@ func (s *service) CreateSpace(
ctx context.Context,
cache cache.TreeCache,
payload SpaceCreatePayload) (sp Space, err error) {
// unmarshalling signing and encryption keys
identity, err := payload.SigningKey.GetPublic().Raw()
storageCreate, err := storagePayloadForSpaceCreate(payload)
if err != nil {
return
}
encPubKey, err := payload.EncryptionKey.GetPublic().Raw()
if err != nil {
return
}
// preparing header and space id
bytes := make([]byte, 32)
_, err = rand.Read(bytes)
if err != nil {
return
}
header := &spacesyncproto.SpaceHeader{
Identity: identity,
Timestamp: time.Now().UnixNano(),
SpaceType: payload.SpaceType,
ReplicationKey: payload.ReplicationKey,
Seed: bytes,
}
marshalled, err := header.Marshal()
if err != nil {
return
}
id, err := cid.NewCIDFromBytes(marshalled)
if err != nil {
return
}
spaceId := NewSpaceId(id, payload.ReplicationKey)
// encrypting read key
hasher := fnv.New64()
_, err = hasher.Write(payload.ReadKey)
if err != nil {
return
}
readKeyHash := hasher.Sum64()
encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(payload.ReadKey)
if err != nil {
return
}
// preparing acl
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey)
if err != nil {
return
}
// creating storage
storageCreate := storage.SpaceStorageCreatePayload{
RecWithId: rawWithId,
SpaceHeader: header,
Id: id,
}
_, err = s.storageProvider.CreateSpaceStorage(storageCreate)
if err != nil {
return
}
return s.GetSpace(ctx, spaceId)
return s.GetSpace(ctx, storageCreate.Id)
}
func (s *service) DeriveSpace(
ctx context.Context,
cache cache.TreeCache,
payload SpaceDerivePayload) (sp Space, err error) {
// unmarshalling signing and encryption keys
identity, err := payload.SigningKey.GetPublic().Raw()
storageCreate, err := storagePayloadForSpaceDerive(payload)
if err != nil {
return
}
signPrivKey, err := payload.SigningKey.Raw()
if err != nil {
return
}
encPubKey, err := payload.EncryptionKey.GetPublic().Raw()
if err != nil {
return
}
encPrivKey, err := payload.EncryptionKey.Raw()
if err != nil {
return
}
// preparing replication key
hasher := fnv.New64()
_, err = hasher.Write(identity)
if err != nil {
return
}
repKey := hasher.Sum64()
// preparing header and space id
header := &spacesyncproto.SpaceHeader{
Identity: identity,
SpaceType: SpaceTypeDerived,
ReplicationKey: repKey,
}
marshalled, err := header.Marshal()
if err != nil {
return
}
id, err := cid.NewCIDFromBytes(marshalled)
if err != nil {
return
}
spaceId := NewSpaceId(id, repKey)
// deriving and encrypting read key
readKey, err := aclrecordproto.ACLReadKeyDerive(signPrivKey, encPrivKey)
if err != nil {
return
}
hasher = fnv.New64()
_, err = hasher.Write(readKey.Bytes())
if err != nil {
return
}
readKeyHash := hasher.Sum64()
encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(readKey.Bytes())
if err != nil {
return
}
// preparing acl
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey)
if err != nil {
return
}
// creating storage
storageCreate := storage.SpaceStorageCreatePayload{
RecWithId: rawWithId,
SpaceHeader: header,
Id: id,
}
_, err = s.storageProvider.CreateSpaceStorage(storageCreate)
if err != nil {
return
}
return s.GetSpace(ctx, spaceId)
return s.GetSpace(ctx, storageCreate.Id)
}
func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {
@ -244,31 +98,3 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {
}
return sp, nil
}
func marshalACLRoot(aclRoot *aclrecordproto.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto.RawACLRecordWithId, err error) {
marshalledRoot, err := aclRoot.Marshal()
if err != nil {
return
}
signature, err := key.Sign(marshalledRoot)
if err != nil {
return
}
raw := &aclrecordproto.RawACLRecord{
Payload: marshalledRoot,
Signature: signature,
}
marshalledRaw, err := raw.Marshal()
if err != nil {
return
}
aclHeadId, err := cid.NewCIDFromBytes(marshalledRaw)
if err != nil {
return
}
rawWithId = &aclrecordproto.RawACLRecordWithId{
Payload: marshalledRaw,
Id: aclHeadId,
}
return
}

View File

@ -1,9 +1,22 @@
package spacesyncproto
import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"storj.io/drpc"
)
type SpaceStream = DRPCSpace_StreamStream
type ClientFactoryFunc func(cc drpc.Conn) DRPCSpaceClient
func (c ClientFactoryFunc) Client(cc drpc.Conn) DRPCSpaceClient {
return c(cc)
}
type ClientFactory interface {
Client(cc drpc.Conn) DRPCSpaceClient
}
func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{

View File

@ -38,6 +38,7 @@ type syncService struct {
streamPool StreamPool
headNotifiable HeadNotifiable
configuration nodeconf.Configuration
clientFactory spacesyncproto.ClientFactory
streamLoopCtx context.Context
stopStreamLoop context.CancelFunc
@ -50,7 +51,13 @@ func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.T
return syncHandler.HandleMessage(ctx, senderId, message)
})
syncHandler = newSyncHandler(spaceId, cache, streamPool)
return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration)
return newSyncService(
spaceId,
headNotifiable,
syncHandler,
streamPool,
spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient),
configuration)
}
func newSyncService(
@ -58,12 +65,14 @@ func newSyncService(
headNotifiable HeadNotifiable,
syncHandler SyncHandler,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
configuration nodeconf.Configuration) *syncService {
return &syncService{
syncHandler: syncHandler,
streamPool: streamPool,
headNotifiable: headNotifiable,
configuration: configuration,
clientFactory: clientFactory,
spaceId: spaceId,
streamLoopDone: make(chan struct{}),
}
@ -100,8 +109,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
if s.streamPool.HasActiveStream(peer.Id()) {
continue
}
cl := spacesyncproto.NewDRPCSpaceClient(peer)
stream, err := cl.Stream(ctx)
stream, err := s.clientFactory.Client(peer).Stream(ctx)
if err != nil {
err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)