diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index eab5c441..e5a94a1f 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -38,9 +38,11 @@ func NewDiffService( conf nodeconf.Configuration, cache cache.TreeCache, log *zap.Logger) DiffService { + diff := ldiff.New(16, 16) l := log.With(zap.String("spaceId", spaceId)) - syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, l) + factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) + syncer := newDiffSyncer(spaceId, diff, conf, cache, storage, factory, l) periodicSync := newPeriodicSync(syncPeriod, syncer, l) return &diffService{ diff --git a/common/commonspace/diffservice/diffsyncer.go b/common/commonspace/diffservice/diffsyncer.go index 8eec6c9b..88cd00f4 100644 --- a/common/commonspace/diffservice/diffsyncer.go +++ b/common/commonspace/diffservice/diffsyncer.go @@ -24,6 +24,7 @@ func newDiffSyncer( nconf nodeconf.Configuration, cache cache.TreeCache, storage storage.SpaceStorage, + clientFactory spacesyncproto.ClientFactory, log *zap.Logger) DiffSyncer { return &diffSyncer{ diff: diff, @@ -36,12 +37,13 @@ func newDiffSyncer( } type diffSyncer struct { - diff ldiff.Diff - nconf nodeconf.Configuration - spaceId string - cache cache.TreeCache - storage storage.SpaceStorage - log *zap.Logger + spaceId string + diff ldiff.Diff + nconf nodeconf.Configuration + cache cache.TreeCache + storage storage.SpaceStorage + clientFactory spacesyncproto.ClientFactory + log *zap.Logger } func (d *diffSyncer) Sync(ctx context.Context) error { @@ -61,7 +63,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error { } func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { - cl := spacesyncproto.NewDRPCSpaceClient(p) + cl := d.clientFactory.Client(p) rdiff := remotediff.NewRemoteDiff(d.spaceId, cl) newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) err = rpcerr.Unwrap(err) diff --git a/common/commonspace/payloads.go b/common/commonspace/payloads.go new file mode 100644 index 00000000..fae285ab --- /dev/null +++ b/common/commonspace/payloads.go @@ -0,0 +1,193 @@ +package commonspace + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey" + "hash/fnv" + "math/rand" + "time" +) + +func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload storage.SpaceStorageCreatePayload, err error) { + // unmarshalling signing and encryption keys + identity, err := payload.SigningKey.GetPublic().Raw() + if err != nil { + return + } + encPubKey, err := payload.EncryptionKey.GetPublic().Raw() + if err != nil { + return + } + + // preparing header and space id + bytes := make([]byte, 32) + _, err = rand.Read(bytes) + if err != nil { + return + } + header := &spacesyncproto.SpaceHeader{ + Identity: identity, + Timestamp: time.Now().UnixNano(), + SpaceType: payload.SpaceType, + ReplicationKey: payload.ReplicationKey, + Seed: bytes, + } + marshalled, err := header.Marshal() + if err != nil { + return + } + id, err := cid.NewCIDFromBytes(marshalled) + if err != nil { + return + } + spaceId := NewSpaceId(id, payload.ReplicationKey) + + // encrypting read key + hasher := fnv.New64() + _, err = hasher.Write(payload.ReadKey) + if err != nil { + return + } + readKeyHash := hasher.Sum64() + encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(payload.ReadKey) + if err != nil { + return + } + + // preparing acl + aclRoot := &aclrecordproto.ACLRoot{ + Identity: identity, + EncryptionKey: encPubKey, + SpaceId: spaceId, + EncryptedReadKey: encReadKey, + DerivationScheme: "", + CurrentReadKeyHash: readKeyHash, + Timestamp: time.Now().UnixNano(), + } + rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey) + if err != nil { + return + } + + // creating storage + storagePayload = storage.SpaceStorageCreatePayload{ + RecWithId: rawWithId, + SpaceHeader: header, + Id: id, + } + return +} + +func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload storage.SpaceStorageCreatePayload, err error) { + // unmarshalling signing and encryption keys + identity, err := payload.SigningKey.GetPublic().Raw() + if err != nil { + return + } + signPrivKey, err := payload.SigningKey.Raw() + if err != nil { + return + } + encPubKey, err := payload.EncryptionKey.GetPublic().Raw() + if err != nil { + return + } + encPrivKey, err := payload.EncryptionKey.Raw() + if err != nil { + return + } + + // preparing replication key + hasher := fnv.New64() + _, err = hasher.Write(identity) + if err != nil { + return + } + repKey := hasher.Sum64() + + // preparing header and space id + header := &spacesyncproto.SpaceHeader{ + Identity: identity, + SpaceType: SpaceTypeDerived, + ReplicationKey: repKey, + } + marshalled, err := header.Marshal() + if err != nil { + return + } + id, err := cid.NewCIDFromBytes(marshalled) + if err != nil { + return + } + spaceId := NewSpaceId(id, repKey) + + // deriving and encrypting read key + readKey, err := aclrecordproto.ACLReadKeyDerive(signPrivKey, encPrivKey) + if err != nil { + return + } + hasher = fnv.New64() + _, err = hasher.Write(readKey.Bytes()) + if err != nil { + return + } + readKeyHash := hasher.Sum64() + encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(readKey.Bytes()) + if err != nil { + return + } + + // preparing acl + aclRoot := &aclrecordproto.ACLRoot{ + Identity: identity, + EncryptionKey: encPubKey, + SpaceId: spaceId, + EncryptedReadKey: encReadKey, + DerivationScheme: "", + CurrentReadKeyHash: readKeyHash, + Timestamp: time.Now().UnixNano(), + } + rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey) + if err != nil { + return + } + + // creating storage + storagePayload = storage.SpaceStorageCreatePayload{ + RecWithId: rawWithId, + SpaceHeader: header, + Id: id, + } + return +} + +func marshalACLRoot(aclRoot *aclrecordproto.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto.RawACLRecordWithId, err error) { + marshalledRoot, err := aclRoot.Marshal() + if err != nil { + return + } + signature, err := key.Sign(marshalledRoot) + if err != nil { + return + } + raw := &aclrecordproto.RawACLRecord{ + Payload: marshalledRoot, + Signature: signature, + } + marshalledRaw, err := raw.Marshal() + if err != nil { + return + } + aclHeadId, err := cid.NewCIDFromBytes(marshalledRaw) + if err != nil { + return + } + rawWithId = &aclrecordproto.RawACLRecordWithId{ + Payload: marshalledRaw, + Id: aclHeadId, + } + return +} diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 48f4b4cf..ef287659 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -6,17 +6,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid" - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey" - "hash/fnv" - "math/rand" - "time" ) const CName = "common.commonspace" @@ -57,171 +50,32 @@ func (s *service) CreateSpace( ctx context.Context, cache cache.TreeCache, payload SpaceCreatePayload) (sp Space, err error) { - - // unmarshalling signing and encryption keys - identity, err := payload.SigningKey.GetPublic().Raw() + storageCreate, err := storagePayloadForSpaceCreate(payload) if err != nil { return } - encPubKey, err := payload.EncryptionKey.GetPublic().Raw() - if err != nil { - return - } - - // preparing header and space id - bytes := make([]byte, 32) - _, err = rand.Read(bytes) - if err != nil { - return - } - header := &spacesyncproto.SpaceHeader{ - Identity: identity, - Timestamp: time.Now().UnixNano(), - SpaceType: payload.SpaceType, - ReplicationKey: payload.ReplicationKey, - Seed: bytes, - } - marshalled, err := header.Marshal() - if err != nil { - return - } - id, err := cid.NewCIDFromBytes(marshalled) - if err != nil { - return - } - spaceId := NewSpaceId(id, payload.ReplicationKey) - - // encrypting read key - hasher := fnv.New64() - _, err = hasher.Write(payload.ReadKey) - if err != nil { - return - } - readKeyHash := hasher.Sum64() - encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(payload.ReadKey) - if err != nil { - return - } - - // preparing acl - aclRoot := &aclrecordproto.ACLRoot{ - Identity: identity, - EncryptionKey: encPubKey, - SpaceId: spaceId, - EncryptedReadKey: encReadKey, - DerivationScheme: "", - CurrentReadKeyHash: readKeyHash, - Timestamp: time.Now().UnixNano(), - } - rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey) - if err != nil { - return - } - - // creating storage - storageCreate := storage.SpaceStorageCreatePayload{ - RecWithId: rawWithId, - SpaceHeader: header, - Id: id, - } _, err = s.storageProvider.CreateSpaceStorage(storageCreate) if err != nil { return } - return s.GetSpace(ctx, spaceId) + return s.GetSpace(ctx, storageCreate.Id) } func (s *service) DeriveSpace( ctx context.Context, cache cache.TreeCache, payload SpaceDerivePayload) (sp Space, err error) { - - // unmarshalling signing and encryption keys - identity, err := payload.SigningKey.GetPublic().Raw() + storageCreate, err := storagePayloadForSpaceDerive(payload) if err != nil { return } - signPrivKey, err := payload.SigningKey.Raw() - if err != nil { - return - } - encPubKey, err := payload.EncryptionKey.GetPublic().Raw() - if err != nil { - return - } - encPrivKey, err := payload.EncryptionKey.Raw() - if err != nil { - return - } - - // preparing replication key - hasher := fnv.New64() - _, err = hasher.Write(identity) - if err != nil { - return - } - repKey := hasher.Sum64() - - // preparing header and space id - header := &spacesyncproto.SpaceHeader{ - Identity: identity, - SpaceType: SpaceTypeDerived, - ReplicationKey: repKey, - } - marshalled, err := header.Marshal() - if err != nil { - return - } - id, err := cid.NewCIDFromBytes(marshalled) - if err != nil { - return - } - spaceId := NewSpaceId(id, repKey) - - // deriving and encrypting read key - readKey, err := aclrecordproto.ACLReadKeyDerive(signPrivKey, encPrivKey) - if err != nil { - return - } - hasher = fnv.New64() - _, err = hasher.Write(readKey.Bytes()) - if err != nil { - return - } - readKeyHash := hasher.Sum64() - encReadKey, err := payload.EncryptionKey.GetPublic().Encrypt(readKey.Bytes()) - if err != nil { - return - } - - // preparing acl - aclRoot := &aclrecordproto.ACLRoot{ - Identity: identity, - EncryptionKey: encPubKey, - SpaceId: spaceId, - EncryptedReadKey: encReadKey, - DerivationScheme: "", - CurrentReadKeyHash: readKeyHash, - Timestamp: time.Now().UnixNano(), - } - rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey) - if err != nil { - return - } - - // creating storage - storageCreate := storage.SpaceStorageCreatePayload{ - RecWithId: rawWithId, - SpaceHeader: header, - Id: id, - } _, err = s.storageProvider.CreateSpaceStorage(storageCreate) if err != nil { return } - return s.GetSpace(ctx, spaceId) + return s.GetSpace(ctx, storageCreate.Id) } func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { @@ -244,31 +98,3 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { } return sp, nil } - -func marshalACLRoot(aclRoot *aclrecordproto.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto.RawACLRecordWithId, err error) { - marshalledRoot, err := aclRoot.Marshal() - if err != nil { - return - } - signature, err := key.Sign(marshalledRoot) - if err != nil { - return - } - raw := &aclrecordproto.RawACLRecord{ - Payload: marshalledRoot, - Signature: signature, - } - marshalledRaw, err := raw.Marshal() - if err != nil { - return - } - aclHeadId, err := cid.NewCIDFromBytes(marshalledRaw) - if err != nil { - return - } - rawWithId = &aclrecordproto.RawACLRecordWithId{ - Payload: marshalledRaw, - Id: aclHeadId, - } - return -} diff --git a/common/commonspace/spacesyncproto/spacesync.go b/common/commonspace/spacesyncproto/spacesync.go index 2e22737a..d06102c4 100644 --- a/common/commonspace/spacesyncproto/spacesync.go +++ b/common/commonspace/spacesyncproto/spacesync.go @@ -1,9 +1,22 @@ package spacesyncproto -import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" + "storj.io/drpc" +) type SpaceStream = DRPCSpace_StreamStream +type ClientFactoryFunc func(cc drpc.Conn) DRPCSpaceClient + +func (c ClientFactoryFunc) Client(cc drpc.Conn) DRPCSpaceClient { + return c(cc) +} + +type ClientFactory interface { + Client(cc drpc.Conn) DRPCSpaceClient +} + func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage { return &ObjectSyncMessage{ Content: &ObjectSyncContentValue{ diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 0d9bf722..64ac67a6 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -38,6 +38,7 @@ type syncService struct { streamPool StreamPool headNotifiable HeadNotifiable configuration nodeconf.Configuration + clientFactory spacesyncproto.ClientFactory streamLoopCtx context.Context stopStreamLoop context.CancelFunc @@ -50,7 +51,13 @@ func NewSyncService(spaceId string, headNotifiable HeadNotifiable, cache cache.T return syncHandler.HandleMessage(ctx, senderId, message) }) syncHandler = newSyncHandler(spaceId, cache, streamPool) - return newSyncService(spaceId, headNotifiable, syncHandler, streamPool, configuration) + return newSyncService( + spaceId, + headNotifiable, + syncHandler, + streamPool, + spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient), + configuration) } func newSyncService( @@ -58,12 +65,14 @@ func newSyncService( headNotifiable HeadNotifiable, syncHandler SyncHandler, streamPool StreamPool, + clientFactory spacesyncproto.ClientFactory, configuration nodeconf.Configuration) *syncService { return &syncService{ syncHandler: syncHandler, streamPool: streamPool, headNotifiable: headNotifiable, configuration: configuration, + clientFactory: clientFactory, spaceId: spaceId, streamLoopDone: make(chan struct{}), } @@ -100,8 +109,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { if s.streamPool.HasActiveStream(peer.Id()) { continue } - cl := spacesyncproto.NewDRPCSpaceClient(peer) - stream, err := cl.Stream(ctx) + stream, err := s.clientFactory.Client(peer).Stream(ctx) if err != nil { err = rpcerr.Unwrap(err) log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)