Merge pull request #39 from anytypeio/coordinator

This commit is contained in:
Mikhail Rakhmanov 2023-03-06 18:59:17 +01:00 committed by GitHub
commit d22d8f4f25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1810 additions and 137 deletions

View File

@ -1,11 +1,18 @@
package ocache package ocache
import "github.com/prometheus/client_golang/prometheus" import (
"github.com/prometheus/client_golang/prometheus"
"strings"
)
func WithPrometheus(reg *prometheus.Registry, namespace, subsystem string) Option { func WithPrometheus(reg *prometheus.Registry, namespace, subsystem string) Option {
if subsystem == "" { if subsystem == "" {
subsystem = "cache" subsystem = "cache"
} }
nameSplit := strings.Split(namespace, ".")
subSplit := strings.Split(subsystem, ".")
namespace = strings.Join(nameSplit, "_")
subsystem = strings.Join(subSplit, "_")
if reg == nil { if reg == nil {
return nil return nil
} }

View File

@ -0,0 +1,19 @@
package ocache
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"strings"
"testing"
)
func TestWithPrometheus_MetricsConvertsDots(t *testing.T) {
opt := WithPrometheus(prometheus.NewRegistry(), "some.name", "some.system")
cache := New(func(ctx context.Context, id string) (value Object, err error) {
return &testObject{}, nil
}, opt).(*oCache)
_, err := cache.Get(context.Background(), "id")
require.NoError(t, err)
require.True(t, strings.Contains(cache.metrics.hit.Desc().String(), "some_name_some_system_hit"))
}

View File

@ -0,0 +1,52 @@
//go:generate mockgen -destination mock_credentialprovider/mock_credentialprovider.go github.com/anytypeio/any-sync/commonspace/credentialprovider CredentialProvider
package credentialprovider
import (
"context"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/coordinator/coordinatorclient"
"github.com/gogo/protobuf/proto"
)
const CName = "common.commonspace.credentialprovider"
func New() app.Component {
return &credentialProvider{}
}
func NewNoOp() CredentialProvider {
return &noOpProvider{}
}
type CredentialProvider interface {
GetCredential(ctx context.Context, spaceHeader *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error)
}
type noOpProvider struct {
}
func (n noOpProvider) GetCredential(ctx context.Context, spaceHeader *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error) {
return nil, nil
}
type credentialProvider struct {
client coordinatorclient.CoordinatorClient
}
func (c *credentialProvider) Init(a *app.App) (err error) {
c.client = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
return
}
func (c *credentialProvider) Name() (name string) {
return CName
}
func (c *credentialProvider) GetCredential(ctx context.Context, spaceHeader *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error) {
receipt, err := c.client.SpaceSign(ctx, spaceHeader.Id, spaceHeader.RawHeader)
if err != nil {
return nil, err
}
return proto.Marshal(receipt)
}

View File

@ -0,0 +1,51 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/any-sync/commonspace/credentialprovider (interfaces: CredentialProvider)
// Package mock_credentialprovider is a generated GoMock package.
package mock_credentialprovider
import (
context "context"
reflect "reflect"
spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
gomock "github.com/golang/mock/gomock"
)
// MockCredentialProvider is a mock of CredentialProvider interface.
type MockCredentialProvider struct {
ctrl *gomock.Controller
recorder *MockCredentialProviderMockRecorder
}
// MockCredentialProviderMockRecorder is the mock recorder for MockCredentialProvider.
type MockCredentialProviderMockRecorder struct {
mock *MockCredentialProvider
}
// NewMockCredentialProvider creates a new mock instance.
func NewMockCredentialProvider(ctrl *gomock.Controller) *MockCredentialProvider {
mock := &MockCredentialProvider{ctrl: ctrl}
mock.recorder = &MockCredentialProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockCredentialProvider) EXPECT() *MockCredentialProviderMockRecorder {
return m.recorder
}
// GetCredential mocks base method.
func (m *MockCredentialProvider) GetCredential(arg0 context.Context, arg1 *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetCredential", arg0, arg1)
ret0, _ := ret[0].([]byte)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetCredential indicates an expected call of GetCredential.
func (mr *MockCredentialProviderMockRecorder) GetCredential(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCredential", reflect.TypeOf((*MockCredentialProvider)(nil).GetCredential), arg0, arg1)
}

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/credentialprovider"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/peermanager"
@ -33,29 +34,32 @@ func newDiffSyncer(
storage spacestorage.SpaceStorage, storage spacestorage.SpaceStorage,
clientFactory spacesyncproto.ClientFactory, clientFactory spacesyncproto.ClientFactory,
syncStatus syncstatus.StatusUpdater, syncStatus syncstatus.StatusUpdater,
credentialProvider credentialprovider.CredentialProvider,
log logger.CtxLogger) DiffSyncer { log logger.CtxLogger) DiffSyncer {
return &diffSyncer{ return &diffSyncer{
diff: diff, diff: diff,
spaceId: spaceId, spaceId: spaceId,
cache: cache, cache: cache,
storage: storage, storage: storage,
peerManager: peerManager, peerManager: peerManager,
clientFactory: clientFactory, clientFactory: clientFactory,
log: log, credentialProvider: credentialProvider,
syncStatus: syncStatus, log: log,
syncStatus: syncStatus,
} }
} }
type diffSyncer struct { type diffSyncer struct {
spaceId string spaceId string
diff ldiff.Diff diff ldiff.Diff
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
cache treegetter.TreeGetter cache treegetter.TreeGetter
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
log logger.CtxLogger log logger.CtxLogger
deletionState settingsstate.ObjectDeletionState deletionState settingsstate.ObjectDeletionState
syncStatus syncstatus.StatusUpdater credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusUpdater
} }
func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) {
@ -86,6 +90,7 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) {
} }
func (d *diffSyncer) Sync(ctx context.Context) error { func (d *diffSyncer) Sync(ctx context.Context) error {
// TODO: split diffsyncer into components
st := time.Now() st := time.Now()
// diffing with responsible peers according to configuration // diffing with responsible peers according to configuration
peers, err := d.peerManager.GetResponsiblePeers(ctx) peers, err := d.peerManager.GetResponsiblePeers(ctx)
@ -117,6 +122,10 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
err = rpcerr.Unwrap(err) err = rpcerr.Unwrap(err)
if err != nil && err != spacesyncproto.ErrSpaceMissing { if err != nil && err != spacesyncproto.ErrSpaceMissing {
if err == spacesyncproto.ErrSpaceIsDeleted {
d.log.Debug("got space deleted while syncing")
d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()})
}
d.syncStatus.SetNodesOnline(p.Id(), false) d.syncStatus.SetNodesOnline(p.Id(), false)
return fmt.Errorf("diff error: %v", err) return fmt.Errorf("diff error: %v", err)
} }
@ -192,6 +201,10 @@ func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl
return return
} }
cred, err := d.credentialProvider.GetCredential(ctx, header)
if err != nil {
return
}
spacePayload := &spacesyncproto.SpacePayload{ spacePayload := &spacesyncproto.SpacePayload{
SpaceHeader: header, SpaceHeader: header,
AclPayload: root.Payload, AclPayload: root.Payload,
@ -200,7 +213,8 @@ func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl
SpaceSettingsPayloadId: spaceSettingsRoot.Id, SpaceSettingsPayloadId: spaceSettingsRoot.Id,
} }
_, err = cl.SpacePush(ctx, &spacesyncproto.SpacePushRequest{ _, err = cl.SpacePush(ctx, &spacesyncproto.SpacePushRequest{
Payload: spacePayload, Payload: spacePayload,
Credential: cred,
}) })
if err != nil { if err != nil {
return return

View File

@ -1,11 +1,13 @@
package headsync package headsync
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/ldiff/mock_ldiff" "github.com/anytypeio/any-sync/app/ldiff/mock_ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/credentialprovider/mock_credentialprovider"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
@ -30,6 +32,7 @@ type pushSpaceRequestMatcher struct {
spaceId string spaceId string
aclRootId string aclRootId string
settingsId string settingsId string
credential []byte
spaceHeader *spacesyncproto.RawSpaceHeaderWithId spaceHeader *spacesyncproto.RawSpaceHeaderWithId
} }
@ -39,7 +42,7 @@ func (p pushSpaceRequestMatcher) Matches(x interface{}) bool {
return false return false
} }
return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId && bytes.Equal(p.credential, res.Credential)
} }
func (p pushSpaceRequestMatcher) String() string { func (p pushSpaceRequestMatcher) String() string {
@ -83,11 +86,13 @@ func newPushSpaceRequestMatcher(
spaceId string, spaceId string,
aclRootId string, aclRootId string,
settingsId string, settingsId string,
credential []byte,
spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher { spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher {
return &pushSpaceRequestMatcher{ return &pushSpaceRequestMatcher{
spaceId: spaceId, spaceId: spaceId,
aclRootId: aclRootId, aclRootId: aclRootId,
settingsId: settingsId, settingsId: settingsId,
credential: credential,
spaceHeader: spaceHeader, spaceHeader: spaceHeader,
} }
} }
@ -106,11 +111,12 @@ func TestDiffSyncer_Sync(t *testing.T) {
factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient {
return clientMock return clientMock
}) })
credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl)
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
spaceId := "spaceId" spaceId := "spaceId"
aclRootId := "aclRootId" aclRootId := "aclRootId"
l := logger.NewNamed(spaceId) l := logger.NewNamed(spaceId)
diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), l) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l)
delState.EXPECT().AddObserver(gomock.Any()) delState.EXPECT().AddObserver(gomock.Any())
diffSyncer.Init(delState) diffSyncer.Init(delState)
@ -172,6 +178,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
} }
spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
spaceSettingsId := "spaceSettingsId" spaceSettingsId := "spaceSettingsId"
credential := []byte("credential")
peerManagerMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()). GetResponsiblePeers(gomock.Any()).
@ -189,15 +196,18 @@ func TestDiffSyncer_Sync(t *testing.T) {
aclStorageMock.EXPECT(). aclStorageMock.EXPECT().
Root(). Root().
Return(aclRoot, nil) Return(aclRoot, nil)
credentialProvider.EXPECT().
GetCredential(gomock.Any(), spaceHeader).
Return(credential, nil)
clientMock.EXPECT(). clientMock.EXPECT().
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, spaceHeader)). SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, credential, spaceHeader)).
Return(nil, nil) Return(nil, nil)
peerManagerMock.EXPECT().SendPeer(gomock.Any(), "mockId", gomock.Any()) peerManagerMock.EXPECT().SendPeer(gomock.Any(), "mockId", gomock.Any())
require.NoError(t, diffSyncer.Sync(ctx)) require.NoError(t, diffSyncer.Sync(ctx))
}) })
t.Run("diff syncer sync other error", func(t *testing.T) { t.Run("diff syncer sync unexpected", func(t *testing.T) {
peerManagerMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mockPeer{}}, nil)
@ -207,4 +217,19 @@ func TestDiffSyncer_Sync(t *testing.T) {
require.NoError(t, diffSyncer.Sync(ctx)) require.NoError(t, diffSyncer.Sync(ctx))
}) })
t.Run("diff syncer sync space is deleted error", func(t *testing.T) {
peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil)
diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
stMock.EXPECT().SpaceSettingsId().Return("settingsId")
cacheMock.EXPECT().
GetTree(gomock.Any(), spaceId, "settingsId").
Return(nil, nil)
require.NoError(t, diffSyncer.Sync(ctx))
})
} }

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/credentialprovider"
"github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/settings/settingsstate" "github.com/anytypeio/any-sync/commonspace/settings/settingsstate"
@ -60,12 +61,13 @@ func NewHeadSync(
peerManager peermanager.PeerManager, peerManager peermanager.PeerManager,
cache treegetter.TreeGetter, cache treegetter.TreeGetter,
syncStatus syncstatus.StatusUpdater, syncStatus syncstatus.StatusUpdater,
credentialProvider credentialprovider.CredentialProvider,
log logger.CtxLogger) HeadSync { log logger.CtxLogger) HeadSync {
diff := ldiff.New(16, 16) diff := ldiff.New(16, 16)
l := log.With(zap.String("spaceId", spaceId)) l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, credentialProvider, l)
sync := func(ctx context.Context) (err error) { sync := func(ctx context.Context) (err error) {
// for clients cancelling the sync process // for clients cancelling the sync process
if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) { if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) {

View File

@ -220,7 +220,7 @@ func (c *changeBuilder) Build(payload BuilderContent) (ch *Change, rawIdChange *
} }
func (c *changeBuilder) Marshall(ch *Change) (raw *treechangeproto.RawTreeChangeWithId, err error) { func (c *changeBuilder) Marshall(ch *Change) (raw *treechangeproto.RawTreeChangeWithId, err error) {
if ch.Id == c.rootChange.Id { if c.isRoot(ch.Id) {
return c.rootChange, nil return c.rootChange, nil
} }
treeChange := &treechangeproto.TreeChange{ treeChange := &treechangeproto.TreeChange{
@ -255,7 +255,7 @@ func (c *changeBuilder) Marshall(ch *Change) (raw *treechangeproto.RawTreeChange
} }
func (c *changeBuilder) unmarshallRawChange(raw *treechangeproto.RawTreeChange, id string) (ch *Change, err error) { func (c *changeBuilder) unmarshallRawChange(raw *treechangeproto.RawTreeChange, id string) (ch *Change, err error) {
if c.rootChange.Id == id { if c.isRoot(id) {
unmarshalled := &treechangeproto.RootChange{} unmarshalled := &treechangeproto.RootChange{}
err = proto.Unmarshal(raw.Payload, unmarshalled) err = proto.Unmarshal(raw.Payload, unmarshalled)
if err != nil { if err != nil {
@ -274,3 +274,10 @@ func (c *changeBuilder) unmarshallRawChange(raw *treechangeproto.RawTreeChange,
ch = NewChange(id, unmarshalled, raw.Signature) ch = NewChange(id, unmarshalled, raw.Signature)
return return
} }
func (c *changeBuilder) isRoot(id string) bool {
if c.rootChange != nil {
return c.rootChange.Id == id
}
return false
}

View File

@ -7,6 +7,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
"go.uber.org/zap" "go.uber.org/zap"
@ -32,6 +33,7 @@ type objectSync struct {
messagePool MessagePool messagePool MessagePool
objectGetter syncobjectgetter.SyncObjectGetter objectGetter syncobjectgetter.SyncObjectGetter
configuration nodeconf.Configuration configuration nodeconf.Configuration
spaceStorage spacestorage.SpaceStorage
syncCtx context.Context syncCtx context.Context
cancelSync context.CancelFunc cancelSync context.CancelFunc
@ -43,13 +45,15 @@ func NewObjectSync(
spaceIsDeleted *atomic.Bool, spaceIsDeleted *atomic.Bool,
configuration nodeconf.Configuration, configuration nodeconf.Configuration,
peerManager peermanager.PeerManager, peerManager peermanager.PeerManager,
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync { objectGetter syncobjectgetter.SyncObjectGetter,
storage spacestorage.SpaceStorage) ObjectSync {
syncCtx, cancel := context.WithCancel(context.Background()) syncCtx, cancel := context.WithCancel(context.Background())
os := newObjectSync( os := newObjectSync(
spaceId, spaceId,
spaceIsDeleted, spaceIsDeleted,
configuration, configuration,
objectGetter, objectGetter,
storage,
syncCtx, syncCtx,
cancel) cancel)
msgPool := newMessagePool(peerManager, os.handleMessage) msgPool := newMessagePool(peerManager, os.handleMessage)
@ -62,11 +66,13 @@ func newObjectSync(
spaceIsDeleted *atomic.Bool, spaceIsDeleted *atomic.Bool,
configuration nodeconf.Configuration, configuration nodeconf.Configuration,
objectGetter syncobjectgetter.SyncObjectGetter, objectGetter syncobjectgetter.SyncObjectGetter,
spaceStorage spacestorage.SpaceStorage,
syncCtx context.Context, syncCtx context.Context,
cancel context.CancelFunc, cancel context.CancelFunc,
) *objectSync { ) *objectSync {
return &objectSync{ return &objectSync{
objectGetter: objectGetter, objectGetter: objectGetter,
spaceStorage: spaceStorage,
spaceId: spaceId, spaceId: spaceId,
syncCtx: syncCtx, syncCtx: syncCtx,
cancelSync: cancel, cancelSync: cancel,
@ -95,8 +101,8 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)) log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId))
if s.spaceIsDeleted.Load() { if s.spaceIsDeleted.Load() {
log = log.With(zap.Bool("isDeleted", true)) log = log.With(zap.Bool("isDeleted", true))
// preventing sync with other clients // preventing sync with other clients if they are not just syncing the settings tree
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
return spacesyncproto.ErrSpaceIsDeleted return spacesyncproto.ErrSpaceIsDeleted
} }
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/keychain"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
@ -242,17 +243,7 @@ func (s *settingsObject) verifyDeleteSpace(raw *treechangeproto.RawTreeChangeWit
if err != nil { if err != nil {
return return
} }
content := &spacesyncproto.SettingsData{} return verifyDeleteContent(data, "")
err = proto.Unmarshal(data, content)
if err != nil {
return
}
if len(content.GetContent()) != 1 ||
content.GetContent()[0].GetSpaceDelete() == nil ||
content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() == "" {
return fmt.Errorf("incorrect delete change payload")
}
return
} }
func (s *settingsObject) addContent(data []byte) (err error) { func (s *settingsObject) addContent(data []byte) (err error) {
@ -271,3 +262,30 @@ func (s *settingsObject) addContent(data []byte) (err error) {
s.Update(s) s.Update(s)
return return
} }
func VerifyDeleteChange(raw *treechangeproto.RawTreeChangeWithId, identity []byte, peerId string) (err error) {
changeBuilder := objecttree.NewChangeBuilder(keychain.NewKeychain(), nil)
res, err := changeBuilder.Unmarshall(raw, true)
if err != nil {
return
}
if res.Identity != string(identity) {
return fmt.Errorf("incorrect identity")
}
return verifyDeleteContent(res.Data, peerId)
}
func verifyDeleteContent(data []byte, peerId string) (err error) {
content := &spacesyncproto.SettingsData{}
err = proto.Unmarshal(data, content)
if err != nil {
return
}
if len(content.GetContent()) != 1 ||
content.GetContent()[0].GetSpaceDelete() == nil ||
(peerId == "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() == "") ||
(peerId != "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() != peerId) {
return fmt.Errorf("incorrect delete change payload")
}
return
}

View File

@ -5,6 +5,7 @@ import (
"github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/credentialprovider"
"github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/headsync"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
@ -45,6 +46,7 @@ type spaceService struct {
configurationService nodeconf.Service configurationService nodeconf.Service
storageProvider spacestorage.SpaceStorageProvider storageProvider spacestorage.SpaceStorageProvider
peermanagerProvider peermanager.PeerManagerProvider peermanagerProvider peermanager.PeerManagerProvider
credentialProvider credentialprovider.CredentialProvider
treeGetter treegetter.TreeGetter treeGetter treegetter.TreeGetter
pool pool.Pool pool pool.Pool
} }
@ -56,6 +58,12 @@ func (s *spaceService) Init(a *app.App) (err error) {
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
credProvider := a.Component(credentialprovider.CName)
if credProvider != nil {
s.credentialProvider = credProvider.(credentialprovider.CredentialProvider)
} else {
s.credentialProvider = credentialprovider.NewNoOp()
}
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil return nil
} }
@ -139,8 +147,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
return nil, err return nil, err
} }
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, log) headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, s.credentialProvider, log)
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter) objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter, st)
sp := &space{ sp := &space{
id: id, id: id,
objectSync: objectSync, objectSync: objectSync,

View File

@ -2,21 +2,28 @@
package spacestorage package spacestorage
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/acl/liststorage" "github.com/anytypeio/any-sync/commonspace/object/acl/liststorage"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/util/cidutil"
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/gogo/protobuf/proto"
"strings"
) )
const CName = "common.commonspace.spacestorage" const CName = "common.commonspace.spacestorage"
var ( var (
ErrSpaceStorageExists = errors.New("space storage exists") ErrSpaceStorageExists = errors.New("space storage exists")
ErrSpaceStorageMissing = errors.New("space storage missing") ErrSpaceStorageMissing = errors.New("space storage missing")
ErrIncorrectSpaceHeader = errors.New("incorrect space header")
ErrTreeStorageAlreadyDeleted = errors.New("tree storage already deleted") ErrTreeStorageAlreadyDeleted = errors.New("tree storage already deleted")
) )
@ -63,3 +70,38 @@ func ValidateSpaceStorageCreatePayload(payload SpaceStorageCreatePayload) (err e
// TODO: add proper validation // TODO: add proper validation
return nil return nil
} }
func ValidateSpaceHeader(spaceId string, header, identity []byte) (err error) {
split := strings.Split(spaceId, ".")
if len(split) != 2 {
return ErrIncorrectSpaceHeader
}
if !cidutil.VerifyCid(header, split[0]) {
err = objecttree.ErrIncorrectCid
return
}
raw := &spacesyncproto.RawSpaceHeader{}
err = proto.Unmarshal(header, raw)
if err != nil {
return
}
payload := &spacesyncproto.SpaceHeader{}
err = proto.Unmarshal(raw.SpaceHeader, payload)
if err != nil {
return
}
if identity != nil && !bytes.Equal(identity, payload.Identity) {
err = ErrIncorrectSpaceHeader
return
}
key, err := signingkey.NewSigningEd25519PubKeyFromBytes(payload.Identity)
if err != nil {
return
}
res, err := key.Verify(raw.SpaceHeader, raw.Signature)
if err != nil || !res {
err = ErrIncorrectSpaceHeader
return
}
return
}

View File

@ -66,6 +66,7 @@ message ObjectSyncMessage {
// SpacePushRequest is a request to add space on a node containing only one acl record // SpacePushRequest is a request to add space on a node containing only one acl record
message SpacePushRequest { message SpacePushRequest {
SpacePayload payload = 1; SpacePayload payload = 1;
bytes Credential = 2;
} }
// SpacePushResponse is an empty response // SpacePushResponse is an empty response

View File

@ -437,7 +437,8 @@ func (m *ObjectSyncMessage) GetObjectId() string {
// SpacePushRequest is a request to add space on a node containing only one acl record // SpacePushRequest is a request to add space on a node containing only one acl record
type SpacePushRequest struct { type SpacePushRequest struct {
Payload *SpacePayload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` Payload *SpacePayload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
Credential []byte `protobuf:"bytes,2,opt,name=Credential,proto3" json:"Credential,omitempty"`
} }
func (m *SpacePushRequest) Reset() { *m = SpacePushRequest{} } func (m *SpacePushRequest) Reset() { *m = SpacePushRequest{} }
@ -480,6 +481,13 @@ func (m *SpacePushRequest) GetPayload() *SpacePayload {
return nil return nil
} }
func (m *SpacePushRequest) GetCredential() []byte {
if m != nil {
return m.Credential
}
return nil
}
// SpacePushResponse is an empty response // SpacePushResponse is an empty response
type SpacePushResponse struct { type SpacePushResponse struct {
} }
@ -1232,21 +1240,21 @@ func init() {
} }
var fileDescriptor_80e49f1f4ac27799 = []byte{ var fileDescriptor_80e49f1f4ac27799 = []byte{
// 1019 bytes of a gzipped FileDescriptorProto // 1030 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4b, 0x6f, 0xdb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4b, 0x6f, 0xdb, 0x46,
0x10, 0x16, 0xe9, 0xa7, 0xc6, 0xb2, 0xcc, 0x6c, 0x9c, 0x44, 0x55, 0x0c, 0x45, 0x58, 0x14, 0x85, 0x10, 0x16, 0xe9, 0xa7, 0xc6, 0xb2, 0xcc, 0x6c, 0x9c, 0x44, 0x55, 0x0c, 0x45, 0x58, 0x14, 0x85,
0x91, 0x43, 0x1e, 0x72, 0x51, 0x20, 0x69, 0x7b, 0x48, 0x6c, 0xa5, 0x11, 0x8a, 0xd4, 0xc6, 0xaa, 0x91, 0x43, 0x1e, 0x72, 0x51, 0x20, 0x69, 0x7b, 0x48, 0x64, 0xa7, 0x11, 0x8a, 0xd4, 0xc6, 0xaa,
0x41, 0x81, 0x02, 0x39, 0xac, 0xc9, 0xb1, 0xc4, 0x96, 0x22, 0x59, 0xee, 0xaa, 0xb6, 0x8e, 0x3d, 0x41, 0x81, 0x02, 0x39, 0xac, 0xc9, 0xb1, 0xc4, 0x96, 0x22, 0x59, 0xee, 0xaa, 0xb6, 0x8e, 0x3d,
0xf5, 0xda, 0x73, 0x7b, 0xea, 0x7f, 0xe8, 0x8f, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x8f, 0x14, 0xf5, 0xda, 0x73, 0x7b, 0xea, 0x7f, 0xe8, 0x8f, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x8f, 0x14,
0xbb, 0x5c, 0x3e, 0x24, 0x51, 0x39, 0xf4, 0x22, 0x73, 0xbf, 0x99, 0xf9, 0xe6, 0xb5, 0x3b, 0x63, 0xbb, 0x5c, 0x3e, 0x24, 0x51, 0x01, 0x7a, 0x91, 0xb9, 0xdf, 0xcc, 0x7c, 0xf3, 0xda, 0x9d, 0x31,
0x78, 0xea, 0x46, 0x93, 0x49, 0x14, 0x8a, 0x98, 0xbb, 0xf8, 0x58, 0xff, 0x8a, 0x59, 0xe8, 0xc6, 0x3c, 0x75, 0xa3, 0xc9, 0x24, 0x0a, 0x45, 0xcc, 0x5d, 0x7c, 0xac, 0x7f, 0xc5, 0x2c, 0x74, 0xe3,
0x49, 0x24, 0xa3, 0xc7, 0xfa, 0x57, 0x14, 0xe8, 0x23, 0x0d, 0x90, 0x7a, 0x0e, 0xd0, 0x01, 0xec, 0x24, 0x92, 0xd1, 0x63, 0xfd, 0x2b, 0x0a, 0xf4, 0x91, 0x06, 0x48, 0x3d, 0x07, 0xe8, 0x00, 0x76,
0xbe, 0x46, 0xee, 0x0d, 0x67, 0xa1, 0xcb, 0x78, 0x38, 0x42, 0x42, 0x60, 0xfd, 0x22, 0x89, 0x26, 0x5f, 0x23, 0xf7, 0x86, 0xb3, 0xd0, 0x65, 0x3c, 0x1c, 0x21, 0x21, 0xb0, 0x7e, 0x91, 0x44, 0x93,
0x2d, 0xab, 0x6b, 0x1d, 0xae, 0x33, 0xfd, 0x4d, 0x9a, 0x60, 0xcb, 0xa8, 0x65, 0x6b, 0xc4, 0x96, 0x96, 0xd5, 0xb5, 0x0e, 0xd7, 0x99, 0xfe, 0x26, 0x4d, 0xb0, 0x65, 0xd4, 0xb2, 0x35, 0x62, 0xcb,
0x11, 0xd9, 0x87, 0x8d, 0xc0, 0x9f, 0xf8, 0xb2, 0xb5, 0xd6, 0xb5, 0x0e, 0x77, 0x59, 0x7a, 0xa0, 0x88, 0xec, 0xc3, 0x46, 0xe0, 0x4f, 0x7c, 0xd9, 0x5a, 0xeb, 0x5a, 0x87, 0xbb, 0x2c, 0x3d, 0xd0,
0x57, 0xd0, 0xcc, 0xa9, 0x50, 0x4c, 0x03, 0xa9, 0xb8, 0xc6, 0x5c, 0x8c, 0x35, 0x57, 0x83, 0xe9, 0x2b, 0x68, 0xe6, 0x54, 0x28, 0xa6, 0x81, 0x54, 0x5c, 0x63, 0x2e, 0xc6, 0x9a, 0xab, 0xc1, 0xf4,
0x6f, 0xf2, 0x05, 0x6c, 0x63, 0x80, 0x13, 0x0c, 0xa5, 0x68, 0xd9, 0xdd, 0xb5, 0xc3, 0x9d, 0x5e, 0x37, 0xf9, 0x02, 0xb6, 0x31, 0xc0, 0x09, 0x86, 0x52, 0xb4, 0xec, 0xee, 0xda, 0xe1, 0x4e, 0xaf,
0xf7, 0x51, 0x11, 0xdf, 0x3c, 0x41, 0x3f, 0x55, 0x64, 0xb9, 0x85, 0xf2, 0xec, 0x46, 0xd3, 0x30, 0xfb, 0xa8, 0x88, 0x6f, 0x9e, 0xe0, 0x24, 0x55, 0x64, 0xb9, 0x85, 0xf2, 0xec, 0x46, 0xd3, 0x30,
0xf7, 0xac, 0x0f, 0xf4, 0x73, 0xb8, 0x53, 0x69, 0xa8, 0x02, 0xf7, 0x3d, 0xed, 0xbe, 0xce, 0x6c, 0xf7, 0xac, 0x0f, 0xf4, 0x73, 0xb8, 0x53, 0x69, 0xa8, 0x02, 0xf7, 0x3d, 0xed, 0xbe, 0xce, 0x6c,
0xdf, 0xd3, 0x01, 0x21, 0xf7, 0x74, 0x2a, 0x75, 0xa6, 0xbf, 0xe9, 0x3b, 0xd8, 0x2b, 0x8c, 0x7f, 0xdf, 0xd3, 0x01, 0x21, 0xf7, 0x74, 0x2a, 0x75, 0xa6, 0xbf, 0xe9, 0x3b, 0xd8, 0x2b, 0x8c, 0x7f,
0x9a, 0xa2, 0x90, 0xa4, 0x05, 0x5b, 0x3a, 0xa4, 0x41, 0x66, 0x9b, 0x1d, 0xc9, 0x13, 0xd8, 0x4c, 0x9a, 0xa2, 0x90, 0xa4, 0x05, 0x5b, 0x3a, 0xa4, 0x41, 0x66, 0x9b, 0x1d, 0xc9, 0x13, 0xd8, 0x4c,
@ -1256,47 +1264,48 @@ var fileDescriptor_80e49f1f4ac27799 = []byte{
0x42, 0xf0, 0x11, 0x7e, 0x20, 0xd4, 0x03, 0xa8, 0x27, 0x69, 0x3e, 0x83, 0x2c, 0xe1, 0x02, 0x50, 0x42, 0xf0, 0x11, 0x7e, 0x20, 0xd4, 0x03, 0xa8, 0x27, 0x69, 0x3e, 0x83, 0x2c, 0xe1, 0x02, 0x50,
0x76, 0x09, 0xc6, 0xc1, 0x6c, 0xe0, 0xe9, 0x52, 0xd6, 0x59, 0x76, 0x54, 0x92, 0x98, 0xcf, 0x82, 0x76, 0x09, 0xc6, 0xc1, 0x6c, 0xe0, 0xe9, 0x52, 0xd6, 0x59, 0x76, 0x54, 0x92, 0x98, 0xcf, 0x82,
0x88, 0x7b, 0xad, 0x75, 0xdd, 0xb7, 0xec, 0x48, 0xda, 0xb0, 0x1d, 0xe9, 0x00, 0x06, 0x5e, 0x6b, 0x88, 0x7b, 0xad, 0x75, 0xdd, 0xb7, 0xec, 0x48, 0xda, 0xb0, 0x1d, 0xe9, 0x00, 0x06, 0x5e, 0x6b,
0x43, 0x1b, 0xe5, 0x67, 0xda, 0x07, 0x67, 0xa8, 0x1c, 0x9f, 0x4d, 0xc5, 0x38, 0x2b, 0xe3, 0xd3, 0x43, 0x1b, 0xe5, 0x67, 0x8a, 0xe0, 0x0c, 0x95, 0xe3, 0xb3, 0xa9, 0x18, 0x67, 0x65, 0x7c, 0x5a,
0x82, 0x49, 0xc5, 0xb6, 0xd3, 0xbb, 0x57, 0x4a, 0x33, 0xd5, 0x4e, 0xc5, 0xb9, 0x0b, 0x7a, 0x1b, 0x30, 0xa9, 0xd8, 0x76, 0x7a, 0xf7, 0x4a, 0x69, 0xa6, 0xda, 0xa9, 0xb8, 0x70, 0xd1, 0x01, 0xe8,
0x6e, 0x95, 0x68, 0xd2, 0x72, 0x51, 0x9a, 0x73, 0x07, 0x41, 0xc6, 0xbd, 0xd0, 0x59, 0xfa, 0x2a, 0x27, 0xe8, 0x61, 0x28, 0x7d, 0x1e, 0xe8, 0xa8, 0x1b, 0xac, 0x84, 0xd0, 0xdb, 0x70, 0xab, 0xe4,
0x37, 0x54, 0x3a, 0xa6, 0xce, 0xff, 0x23, 0x80, 0x5f, 0x6c, 0x68, 0x94, 0x25, 0xe4, 0x05, 0xec, 0x26, 0x2d, 0x27, 0xa5, 0xb9, 0xef, 0x20, 0xc8, 0x7c, 0x2f, 0x74, 0x9e, 0xbe, 0xca, 0x0d, 0x95,
0x68, 0x1b, 0xd5, 0x16, 0x4c, 0x0c, 0xcf, 0x83, 0x12, 0x0f, 0xe3, 0x97, 0xc3, 0x42, 0xe1, 0x3b, 0x8e, 0xe9, 0xc3, 0xff, 0x0f, 0x90, 0xfe, 0x62, 0x43, 0xa3, 0x2c, 0x21, 0x2f, 0x60, 0x47, 0xdb,
0x5f, 0x8e, 0x07, 0x1e, 0x2b, 0xdb, 0x90, 0x0e, 0x00, 0x77, 0x03, 0x43, 0xa8, 0x5b, 0xd1, 0x60, 0xa8, 0xb6, 0x61, 0x62, 0x78, 0x1e, 0x94, 0x78, 0x18, 0xbf, 0x1c, 0x16, 0x0a, 0xdf, 0xf9, 0x72,
0x25, 0x84, 0x50, 0x68, 0x14, 0xa7, 0xbc, 0x21, 0x73, 0x18, 0xe9, 0xc1, 0xbe, 0xa6, 0x1c, 0xa2, 0x3c, 0xf0, 0x58, 0xd9, 0x46, 0x25, 0xcd, 0xdd, 0xc0, 0x10, 0x66, 0x49, 0x17, 0x08, 0xa1, 0xd0,
0x94, 0x7e, 0x38, 0x12, 0x67, 0x73, 0x2d, 0xaa, 0x94, 0x91, 0xcf, 0xe0, 0x6e, 0x15, 0x9e, 0x77, 0x28, 0x4e, 0x79, 0xc3, 0xe6, 0x30, 0xd2, 0x83, 0x7d, 0x4d, 0x39, 0x44, 0x29, 0xfd, 0x70, 0x24,
0x6f, 0x85, 0x94, 0xfe, 0x69, 0xc1, 0x4e, 0x29, 0x25, 0xd5, 0x77, 0xdf, 0xc3, 0x50, 0xfa, 0x72, 0xce, 0xe6, 0x5a, 0x58, 0x29, 0x23, 0x9f, 0xc1, 0xdd, 0x2a, 0x3c, 0xef, 0xee, 0x0a, 0x29, 0xfd,
0x66, 0x9e, 0x72, 0x7e, 0x56, 0xb7, 0x4c, 0xfa, 0x13, 0x14, 0x92, 0x4f, 0x62, 0x9d, 0xda, 0x1a, 0xd3, 0x82, 0x9d, 0x52, 0x4a, 0xea, 0x5e, 0xf8, 0xba, 0x41, 0x72, 0x66, 0x9e, 0x7a, 0x7e, 0x56,
0x2b, 0x00, 0x25, 0xd5, 0x3e, 0xbe, 0x9d, 0xc5, 0x68, 0xd2, 0x2a, 0x00, 0xf2, 0x09, 0x34, 0xd5, 0xb7, 0x50, 0xfa, 0x13, 0x14, 0x92, 0x4f, 0x62, 0x9d, 0xda, 0x1a, 0x2b, 0x00, 0x25, 0xd5, 0x3e,
0xa5, 0xf3, 0x5d, 0x2e, 0xfd, 0x28, 0xfc, 0x1a, 0x67, 0x3a, 0x9b, 0x75, 0xb6, 0x80, 0xaa, 0x57, 0xbe, 0x9d, 0xc5, 0x68, 0xd2, 0x2a, 0x00, 0xf2, 0x09, 0x34, 0xd5, 0xa5, 0xf4, 0x5d, 0x2e, 0xfd,
0x2b, 0x10, 0xd3, 0xa8, 0x1b, 0x4c, 0x7f, 0xd3, 0x33, 0x68, 0xce, 0x17, 0x9e, 0x74, 0x97, 0x1b, 0x28, 0xfc, 0x1a, 0x67, 0x3a, 0x9b, 0x75, 0xb6, 0x80, 0xaa, 0x57, 0x2d, 0x10, 0xd3, 0xa8, 0x1b,
0xd5, 0x98, 0xef, 0x83, 0x8a, 0xc6, 0x1f, 0x85, 0x5c, 0x4e, 0x13, 0x34, 0x6d, 0x28, 0x00, 0x7a, 0x4c, 0x7f, 0xd3, 0x33, 0x68, 0xce, 0x17, 0x9e, 0x74, 0x97, 0x1b, 0xd5, 0x98, 0xef, 0x83, 0x8a,
0x02, 0xfb, 0x55, 0xad, 0xd4, 0xef, 0x88, 0x5f, 0xce, 0xb1, 0x16, 0x80, 0xb9, 0x87, 0x76, 0x7e, 0xc6, 0x1f, 0x85, 0x5c, 0x4e, 0x13, 0x34, 0x6d, 0x28, 0x00, 0x7a, 0x0c, 0xfb, 0x55, 0xad, 0xd4,
0x0f, 0x7f, 0xb7, 0x60, 0x7f, 0x58, 0x2e, 0xeb, 0x71, 0x14, 0x4a, 0x35, 0x8a, 0xbe, 0x84, 0x46, 0xef, 0x8c, 0x5f, 0xce, 0xb1, 0x16, 0x80, 0xb9, 0x87, 0x76, 0x7e, 0x0f, 0x7f, 0xb7, 0x60, 0x7f,
0xfa, 0x58, 0x4e, 0x30, 0x40, 0x89, 0x15, 0x17, 0xf2, 0xb4, 0x24, 0x7e, 0x5d, 0x63, 0x73, 0xea, 0x58, 0x2e, 0x6b, 0x3f, 0x0a, 0xa5, 0x1a, 0x55, 0x5f, 0x42, 0x23, 0x7d, 0x4c, 0xc7, 0x18, 0xa0,
0xe4, 0xb9, 0xc9, 0xce, 0x58, 0xdb, 0xda, 0xfa, 0xee, 0xe2, 0x75, 0xce, 0x8d, 0xcb, 0xca, 0x2f, 0xc4, 0x8a, 0x0b, 0x79, 0x5a, 0x12, 0xbf, 0xae, 0xb1, 0x39, 0x75, 0xf2, 0xdc, 0x64, 0x67, 0xac,
0xb7, 0x60, 0xe3, 0x67, 0x1e, 0x4c, 0x91, 0x76, 0xa0, 0x51, 0x76, 0xb2, 0xf4, 0x88, 0x8e, 0x4c, 0x6d, 0x6d, 0x7d, 0x77, 0xf1, 0x3a, 0xe7, 0xc6, 0x65, 0xe5, 0x97, 0x5b, 0xb0, 0xf1, 0x33, 0x0f,
0xdf, 0x8d, 0xf8, 0x63, 0xd8, 0xf5, 0xf4, 0x57, 0x72, 0x86, 0x98, 0xe4, 0x13, 0x66, 0x1e, 0xa4, 0xa6, 0x48, 0x3b, 0xd0, 0x28, 0x3b, 0x59, 0x7a, 0x44, 0x47, 0xa6, 0xef, 0x46, 0xfc, 0x31, 0xec,
0xef, 0xe0, 0xce, 0x5c, 0xc2, 0xc3, 0x90, 0xc7, 0x62, 0x1c, 0x49, 0x75, 0xed, 0x53, 0x4d, 0x6f, 0x7a, 0xfa, 0x2b, 0x39, 0x43, 0x4c, 0xf2, 0x09, 0x34, 0x0f, 0xd2, 0x77, 0x70, 0x67, 0x2e, 0xe1,
0xe0, 0xa5, 0x83, 0xae, 0xce, 0x4a, 0xc8, 0x32, 0xbd, 0x5d, 0x45, 0xff, 0xab, 0x05, 0x8d, 0x8c, 0x61, 0xc8, 0x63, 0x31, 0x8e, 0xa4, 0xba, 0xf6, 0xa9, 0xa6, 0x37, 0xf0, 0xd2, 0x41, 0x58, 0x67,
0xfa, 0x84, 0x4b, 0x4e, 0x9e, 0xc1, 0x96, 0x9b, 0xd6, 0xd4, 0x0c, 0xcf, 0x07, 0x8b, 0x55, 0x58, 0x25, 0x64, 0x99, 0xde, 0xae, 0xa2, 0xff, 0xd5, 0x82, 0x46, 0x46, 0x7d, 0xcc, 0x25, 0x27, 0xcf,
0x28, 0x3d, 0xcb, 0xf4, 0xd5, 0xee, 0x11, 0x26, 0x3a, 0x53, 0xc1, 0xee, 0x2a, 0xdb, 0x2c, 0x0b, 0x60, 0xcb, 0x4d, 0x6b, 0x6a, 0x86, 0xeb, 0x83, 0xc5, 0x2a, 0x2c, 0x94, 0x9e, 0x65, 0xfa, 0x6a,
0x96, 0x5b, 0xd0, 0x1f, 0xcd, 0x88, 0x19, 0x4e, 0xcf, 0x85, 0x9b, 0xf8, 0xb1, 0xba, 0x9e, 0xea, 0x37, 0x09, 0x13, 0x9d, 0xa9, 0x60, 0x77, 0x95, 0x6d, 0x96, 0x05, 0xcb, 0x2d, 0xe8, 0x8f, 0x66,
0x6d, 0x98, 0x81, 0x9b, 0xa5, 0x98, 0x9f, 0xc9, 0x73, 0xd8, 0xe4, 0xae, 0xd2, 0xd2, 0xce, 0x9a, 0xc4, 0x0c, 0xa7, 0xe7, 0xc2, 0x4d, 0xfc, 0x58, 0x5d, 0x4f, 0xf5, 0x36, 0xcc, 0x40, 0xce, 0x52,
0x3d, 0xba, 0xe4, 0xac, 0xc4, 0xf4, 0x42, 0x6b, 0x32, 0x63, 0xf1, 0xf0, 0x12, 0xb6, 0xfb, 0x49, 0xcc, 0xcf, 0xe4, 0x39, 0x6c, 0x72, 0x57, 0x69, 0x69, 0x67, 0xcd, 0x1e, 0x5d, 0x72, 0x56, 0x62,
0x72, 0x1c, 0x79, 0x28, 0x48, 0x13, 0xe0, 0x6d, 0x88, 0x57, 0x31, 0xba, 0x12, 0x3d, 0xa7, 0x46, 0x7a, 0xa1, 0x35, 0x99, 0xb1, 0x78, 0x78, 0x09, 0xdb, 0x27, 0x49, 0xd2, 0x8f, 0x3c, 0x14, 0xa4,
0x1c, 0x33, 0xa2, 0xde, 0xf8, 0x42, 0xf8, 0xe1, 0xc8, 0xb1, 0xc8, 0x9e, 0x69, 0x5c, 0xff, 0xca, 0x09, 0xf0, 0x36, 0xc4, 0xab, 0x18, 0x5d, 0x89, 0x9e, 0x53, 0x23, 0x8e, 0x19, 0x51, 0x6f, 0x7c,
0x17, 0x52, 0x38, 0x36, 0xb9, 0x0d, 0x7b, 0x1a, 0xf8, 0x26, 0x92, 0x83, 0xf0, 0x98, 0xbb, 0x63, 0x21, 0xfc, 0x70, 0xe4, 0x58, 0x64, 0xcf, 0x34, 0xee, 0xe4, 0xca, 0x17, 0x52, 0x38, 0x36, 0xb9,
0x74, 0xd6, 0x08, 0x81, 0xa6, 0x06, 0x07, 0x22, 0x6d, 0xb0, 0xe7, 0xac, 0x2b, 0xcb, 0x7e, 0x92, 0x0d, 0x7b, 0x1a, 0xf8, 0x26, 0x92, 0x83, 0xb0, 0xcf, 0xdd, 0x31, 0x3a, 0x6b, 0x84, 0x40, 0x53,
0x44, 0xc9, 0xe9, 0xc5, 0x85, 0x40, 0xe9, 0x78, 0x0f, 0x9f, 0xc1, 0xbd, 0x15, 0xb1, 0x91, 0x5d, 0x83, 0x03, 0x91, 0x36, 0xd8, 0x73, 0xd6, 0x95, 0xe5, 0x49, 0x92, 0x44, 0xc9, 0xe9, 0xc5, 0x85,
0xa8, 0x1b, 0xf4, 0x1c, 0x9d, 0x9a, 0x32, 0x7d, 0x1b, 0x8a, 0x1c, 0xb0, 0x7a, 0x7f, 0xd9, 0x50, 0x40, 0xe9, 0x78, 0x0f, 0x9f, 0xc1, 0xbd, 0x15, 0xb1, 0x91, 0x5d, 0xa8, 0x1b, 0xf4, 0x1c, 0x9d,
0x4f, 0x6d, 0x67, 0xa1, 0x4b, 0x8e, 0x61, 0x3b, 0x5b, 0x65, 0xa4, 0x5d, 0xb9, 0xdf, 0xf4, 0x24, 0x9a, 0x32, 0x7d, 0x1b, 0x8a, 0x1c, 0xb0, 0x7a, 0x7f, 0xd9, 0x50, 0x4f, 0x6d, 0x67, 0xa1, 0x4b,
0x6f, 0xdf, 0xaf, 0xde, 0x7d, 0xe9, 0x04, 0x7f, 0x65, 0x18, 0xd5, 0x3e, 0x20, 0xf7, 0x97, 0xa6, 0xfa, 0xb0, 0x9d, 0xad, 0x3a, 0xd2, 0xae, 0xdc, 0x7f, 0x7a, 0x92, 0xb7, 0xef, 0x57, 0xef, 0xc6,
0x77, 0xb1, 0x6c, 0xda, 0x07, 0xd5, 0xc2, 0x25, 0x9e, 0x20, 0xa8, 0xe2, 0xc9, 0x17, 0x4b, 0x15, 0x74, 0x82, 0xbf, 0x32, 0x8c, 0x6a, 0x1f, 0x90, 0xfb, 0x4b, 0xd3, 0xbb, 0x58, 0x46, 0xed, 0x83,
0x4f, 0x69, 0xa3, 0x30, 0x70, 0x8a, 0x1d, 0x3c, 0x94, 0x09, 0xf2, 0x09, 0x39, 0x58, 0x7a, 0xc3, 0x6a, 0xe1, 0x12, 0x4f, 0x10, 0x54, 0xf1, 0xe4, 0x8b, 0xa5, 0x8a, 0xa7, 0xb4, 0x51, 0x18, 0x38,
0xa5, 0x05, 0xdd, 0xfe, 0xa0, 0xf4, 0xd0, 0x7a, 0x62, 0xbd, 0xfc, 0xf4, 0xef, 0xeb, 0x8e, 0xf5, 0xc5, 0x8e, 0x1e, 0xca, 0x04, 0xf9, 0x84, 0x1c, 0x2c, 0xbd, 0xe1, 0xd2, 0x02, 0x6f, 0x7f, 0x50,
0xfe, 0xba, 0x63, 0xfd, 0x7b, 0xdd, 0xb1, 0x7e, 0xbb, 0xe9, 0xd4, 0xde, 0xdf, 0x74, 0x6a, 0xff, 0x7a, 0x68, 0x3d, 0xb1, 0x5e, 0x7e, 0xfa, 0xf7, 0x75, 0xc7, 0x7a, 0x7f, 0xdd, 0xb1, 0xfe, 0xbd,
0xdc, 0x74, 0x6a, 0xdf, 0xb7, 0x57, 0xff, 0x6b, 0x77, 0xbe, 0xa9, 0xff, 0x1c, 0xfd, 0x17, 0x00, 0xee, 0x58, 0xbf, 0xdd, 0x74, 0x6a, 0xef, 0x6f, 0x3a, 0xb5, 0x7f, 0x6e, 0x3a, 0xb5, 0xef, 0xdb,
0x00, 0xff, 0xff, 0xa1, 0xe4, 0x04, 0x3d, 0xff, 0x09, 0x00, 0x00, 0xab, 0xff, 0xf5, 0x3b, 0xdf, 0xd4, 0x7f, 0x8e, 0xfe, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x96,
0xb3, 0xaa, 0x1f, 0x0a, 0x00, 0x00,
} }
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
@ -1582,6 +1591,13 @@ func (m *SpacePushRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.Credential) > 0 {
i -= len(m.Credential)
copy(dAtA[i:], m.Credential)
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.Credential)))
i--
dAtA[i] = 0x12
}
if m.Payload != nil { if m.Payload != nil {
{ {
size, err := m.Payload.MarshalToSizedBuffer(dAtA[:i]) size, err := m.Payload.MarshalToSizedBuffer(dAtA[:i])
@ -2276,6 +2292,10 @@ func (m *SpacePushRequest) Size() (n int) {
l = m.Payload.Size() l = m.Payload.Size()
n += 1 + l + sovSpacesync(uint64(l)) n += 1 + l + sovSpacesync(uint64(l))
} }
l = len(m.Credential)
if l > 0 {
n += 1 + l + sovSpacesync(uint64(l))
}
return n return n
} }
@ -3363,6 +3383,40 @@ func (m *SpacePushRequest) Unmarshal(dAtA []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Credential", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSpacesync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSpacesync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Credential = append(m.Credential[:0], dAtA[iNdEx:postIndex]...)
if m.Credential == nil {
m.Credential = []byte{}
}
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipSpacesync(dAtA[iNdEx:]) skippy, err := skipSpacesync(dAtA[iNdEx:])

View File

@ -4,8 +4,10 @@ package coordinatorclient
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/coordinator/coordinatorproto" "github.com/anytypeio/any-sync/coordinator/coordinatorproto"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
) )
@ -16,7 +18,9 @@ func New() CoordinatorClient {
} }
type CoordinatorClient interface { type CoordinatorClient interface {
SpaceSign(ctx context.Context, spaceId string) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error)
StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error)
SpaceSign(ctx context.Context, spaceId string, spaceHeader []byte) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error)
FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (limit uint64, err error)
app.Component app.Component
} }
@ -26,6 +30,40 @@ type coordinatorClient struct {
nodeConf nodeconf.Service nodeConf nodeconf.Service
} }
func (c *coordinatorClient) ChangeStatus(ctx context.Context, spaceId string, deleteRaw *treechangeproto.RawTreeChangeWithId) (status *coordinatorproto.SpaceStatusPayload, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err := cl.SpaceStatusChange(ctx, &coordinatorproto.SpaceStatusChangeRequest{
SpaceId: spaceId,
DeletionChangeId: deleteRaw.GetId(),
DeletionChangePayload: deleteRaw.GetRawChange(),
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
status = resp.Payload
return
}
func (c *coordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) {
cl, err := c.client(ctx)
if err != nil {
return
}
resp, err := cl.SpaceStatusCheck(ctx, &coordinatorproto.SpaceStatusCheckRequest{
SpaceId: spaceId,
})
if err != nil {
err = rpcerr.Unwrap(err)
return
}
status = resp.Payload
return
}
func (c *coordinatorClient) Init(a *app.App) (err error) { func (c *coordinatorClient) Init(a *app.App) (err error) {
c.pool = a.MustComponent(pool.CName).(pool.Service).NewPool(CName) c.pool = a.MustComponent(pool.CName).(pool.Service).NewPool(CName)
c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
@ -36,15 +74,17 @@ func (c *coordinatorClient) Name() (name string) {
return CName return CName
} }
func (c *coordinatorClient) SpaceSign(ctx context.Context, spaceId string) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) { func (c *coordinatorClient) SpaceSign(ctx context.Context, spaceId string, spaceHeader []byte) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) {
cl, err := c.client(ctx) cl, err := c.client(ctx)
if err != nil { if err != nil {
return return
} }
resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{ resp, err := cl.SpaceSign(ctx, &coordinatorproto.SpaceSignRequest{
SpaceId: spaceId, SpaceId: spaceId,
Header: spaceHeader,
}) })
if err != nil { if err != nil {
err = rpcerr.Unwrap(err)
return return
} }
return resp.Receipt, nil return resp.Receipt, nil
@ -60,6 +100,7 @@ func (c *coordinatorClient) FileLimitCheck(ctx context.Context, spaceId string,
SpaceId: spaceId, SpaceId: spaceId,
}) })
if err != nil { if err != nil {
err = rpcerr.Unwrap(err)
return return
} }
return resp.Limit, nil return resp.Limit, nil

View File

@ -9,6 +9,7 @@ import (
reflect "reflect" reflect "reflect"
app "github.com/anytypeio/any-sync/app" app "github.com/anytypeio/any-sync/app"
treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
coordinatorproto "github.com/anytypeio/any-sync/coordinator/coordinatorproto" coordinatorproto "github.com/anytypeio/any-sync/coordinator/coordinatorproto"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
) )
@ -36,6 +37,20 @@ func (m *MockCoordinatorClient) EXPECT() *MockCoordinatorClientMockRecorder {
return m.recorder return m.recorder
} }
// ChangeStatus mocks base method.
func (m *MockCoordinatorClient) ChangeStatus(arg0 context.Context, arg1 string, arg2 *treechangeproto.RawTreeChangeWithId) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChangeStatus", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// ChangeStatus indicates an expected call of ChangeStatus.
func (mr *MockCoordinatorClientMockRecorder) ChangeStatus(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeStatus", reflect.TypeOf((*MockCoordinatorClient)(nil).ChangeStatus), arg0, arg1, arg2)
}
// FileLimitCheck mocks base method. // FileLimitCheck mocks base method.
func (m *MockCoordinatorClient) FileLimitCheck(arg0 context.Context, arg1 string, arg2 []byte) (uint64, error) { func (m *MockCoordinatorClient) FileLimitCheck(arg0 context.Context, arg1 string, arg2 []byte) (uint64, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -80,16 +95,31 @@ func (mr *MockCoordinatorClientMockRecorder) Name() *gomock.Call {
} }
// SpaceSign mocks base method. // SpaceSign mocks base method.
func (m *MockCoordinatorClient) SpaceSign(arg0 context.Context, arg1 string) (*coordinatorproto.SpaceReceiptWithSignature, error) { func (m *MockCoordinatorClient) SpaceSign(arg0 context.Context, arg1 string, arg2 []byte) (*coordinatorproto.SpaceReceiptWithSignature, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SpaceSign", arg0, arg1) ret := m.ctrl.Call(m, "SpaceSign", arg0, arg1, arg2)
ret0, _ := ret[0].(*coordinatorproto.SpaceReceiptWithSignature) ret0, _ := ret[0].(*coordinatorproto.SpaceReceiptWithSignature)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// SpaceSign indicates an expected call of SpaceSign. // SpaceSign indicates an expected call of SpaceSign.
func (mr *MockCoordinatorClientMockRecorder) SpaceSign(arg0, arg1 interface{}) *gomock.Call { func (mr *MockCoordinatorClientMockRecorder) SpaceSign(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpaceSign", reflect.TypeOf((*MockCoordinatorClient)(nil).SpaceSign), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpaceSign", reflect.TypeOf((*MockCoordinatorClient)(nil).SpaceSign), arg0, arg1, arg2)
}
// StatusCheck mocks base method.
func (m *MockCoordinatorClient) StatusCheck(arg0 context.Context, arg1 string) (*coordinatorproto.SpaceStatusCheckResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StatusCheck", arg0, arg1)
ret0, _ := ret[0].(*coordinatorproto.SpaceStatusCheckResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StatusCheck indicates an expected call of StatusCheck.
func (mr *MockCoordinatorClientMockRecorder) StatusCheck(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatusCheck", reflect.TypeOf((*MockCoordinatorClient)(nil).StatusCheck), arg0, arg1)
} }

File diff suppressed because it is too large Load Diff

View File

@ -42,6 +42,8 @@ type DRPCCoordinatorClient interface {
SpaceSign(ctx context.Context, in *SpaceSignRequest) (*SpaceSignResponse, error) SpaceSign(ctx context.Context, in *SpaceSignRequest) (*SpaceSignResponse, error)
FileLimitCheck(ctx context.Context, in *FileLimitCheckRequest) (*FileLimitCheckResponse, error) FileLimitCheck(ctx context.Context, in *FileLimitCheckRequest) (*FileLimitCheckResponse, error)
SpaceStatusCheck(ctx context.Context, in *SpaceStatusCheckRequest) (*SpaceStatusCheckResponse, error)
SpaceStatusChange(ctx context.Context, in *SpaceStatusChangeRequest) (*SpaceStatusChangeResponse, error)
} }
type drpcCoordinatorClient struct { type drpcCoordinatorClient struct {
@ -72,9 +74,29 @@ func (c *drpcCoordinatorClient) FileLimitCheck(ctx context.Context, in *FileLimi
return out, nil return out, nil
} }
func (c *drpcCoordinatorClient) SpaceStatusCheck(ctx context.Context, in *SpaceStatusCheckRequest) (*SpaceStatusCheckResponse, error) {
out := new(SpaceStatusCheckResponse)
err := c.cc.Invoke(ctx, "/coordinator.Coordinator/SpaceStatusCheck", drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcCoordinatorClient) SpaceStatusChange(ctx context.Context, in *SpaceStatusChangeRequest) (*SpaceStatusChangeResponse, error) {
out := new(SpaceStatusChangeResponse)
err := c.cc.Invoke(ctx, "/coordinator.Coordinator/SpaceStatusChange", drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCCoordinatorServer interface { type DRPCCoordinatorServer interface {
SpaceSign(context.Context, *SpaceSignRequest) (*SpaceSignResponse, error) SpaceSign(context.Context, *SpaceSignRequest) (*SpaceSignResponse, error)
FileLimitCheck(context.Context, *FileLimitCheckRequest) (*FileLimitCheckResponse, error) FileLimitCheck(context.Context, *FileLimitCheckRequest) (*FileLimitCheckResponse, error)
SpaceStatusCheck(context.Context, *SpaceStatusCheckRequest) (*SpaceStatusCheckResponse, error)
SpaceStatusChange(context.Context, *SpaceStatusChangeRequest) (*SpaceStatusChangeResponse, error)
} }
type DRPCCoordinatorUnimplementedServer struct{} type DRPCCoordinatorUnimplementedServer struct{}
@ -87,9 +109,17 @@ func (s *DRPCCoordinatorUnimplementedServer) FileLimitCheck(context.Context, *Fi
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
func (s *DRPCCoordinatorUnimplementedServer) SpaceStatusCheck(context.Context, *SpaceStatusCheckRequest) (*SpaceStatusCheckResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCCoordinatorUnimplementedServer) SpaceStatusChange(context.Context, *SpaceStatusChangeRequest) (*SpaceStatusChangeResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCCoordinatorDescription struct{} type DRPCCoordinatorDescription struct{}
func (DRPCCoordinatorDescription) NumMethods() int { return 2 } func (DRPCCoordinatorDescription) NumMethods() int { return 4 }
func (DRPCCoordinatorDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { func (DRPCCoordinatorDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n { switch n {
@ -111,6 +141,24 @@ func (DRPCCoordinatorDescription) Method(n int) (string, drpc.Encoding, drpc.Rec
in1.(*FileLimitCheckRequest), in1.(*FileLimitCheckRequest),
) )
}, DRPCCoordinatorServer.FileLimitCheck, true }, DRPCCoordinatorServer.FileLimitCheck, true
case 2:
return "/coordinator.Coordinator/SpaceStatusCheck", drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCCoordinatorServer).
SpaceStatusCheck(
ctx,
in1.(*SpaceStatusCheckRequest),
)
}, DRPCCoordinatorServer.SpaceStatusCheck, true
case 3:
return "/coordinator.Coordinator/SpaceStatusChange", drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCCoordinatorServer).
SpaceStatusChange(
ctx,
in1.(*SpaceStatusChangeRequest),
)
}, DRPCCoordinatorServer.SpaceStatusChange, true
default: default:
return "", nil, nil, nil, false return "", nil, nil, nil, false
} }
@ -151,3 +199,35 @@ func (x *drpcCoordinator_FileLimitCheckStream) SendAndClose(m *FileLimitCheckRes
} }
return x.CloseSend() return x.CloseSend()
} }
type DRPCCoordinator_SpaceStatusCheckStream interface {
drpc.Stream
SendAndClose(*SpaceStatusCheckResponse) error
}
type drpcCoordinator_SpaceStatusCheckStream struct {
drpc.Stream
}
func (x *drpcCoordinator_SpaceStatusCheckStream) SendAndClose(m *SpaceStatusCheckResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCCoordinator_SpaceStatusChangeStream interface {
drpc.Stream
SendAndClose(*SpaceStatusChangeResponse) error
}
type drpcCoordinator_SpaceStatusChangeStream struct {
drpc.Stream
}
func (x *drpcCoordinator_SpaceStatusChangeStream) SendAndClose(m *SpaceStatusChangeResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_coordinator_coordinatorproto_protos_coordinator_proto{}); err != nil {
return err
}
return x.CloseSend()
}

View File

@ -0,0 +1,16 @@
package coordinatorproto
import (
"errors"
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
)
var (
errGroup = rpcerr.ErrGroup(ErrorCodes_ErrorOffset)
ErrUnexpected = errGroup.Register(errors.New("unexpected error"), uint64(ErrorCodes_Unexpected))
ErrSpaceIsCreated = errGroup.Register(errors.New("space is missing"), uint64(ErrorCodes_SpaceCreated))
ErrSpaceIsDeleted = errGroup.Register(errors.New("space is deleted"), uint64(ErrorCodes_SpaceDeleted))
ErrSpaceDeletionPending = errGroup.Register(errors.New("space is set out for deletion"), uint64(ErrorCodes_SpaceDeletionPending))
ErrSpaceNotExists = errGroup.Register(errors.New("space not exists"), uint64(ErrorCodes_SpaceNotExists))
)

View File

@ -12,10 +12,38 @@ service Coordinator {
// - if a handshake identity matches a given identity // - if a handshake identity matches a given identity
// - if a requester contains in nodeconf list // - if a requester contains in nodeconf list
rpc FileLimitCheck(FileLimitCheckRequest) returns (FileLimitCheckResponse); rpc FileLimitCheck(FileLimitCheckRequest) returns (FileLimitCheckResponse);
// SpaceStatusCheck checks the status of space and tells if it is deleted or not
rpc SpaceStatusCheck(SpaceStatusCheckRequest) returns (SpaceStatusCheckResponse);
// SpaceStatusChange changes the status of space
rpc SpaceStatusChange(SpaceStatusChangeRequest) returns (SpaceStatusChangeResponse);
} }
message SpaceSignRequest { message SpaceSignRequest {
string spaceId = 1; string spaceId = 1;
bytes header = 2;
}
enum ErrorCodes {
Unexpected = 0;
SpaceDeleted = 1;
SpaceDeletionPending = 2;
SpaceCreated = 3;
SpaceNotExists = 4;
ErrorOffset = 300;
}
enum SpaceStatus {
SpaceStatusCreated = 0;
SpaceStatusPendingDeletion = 1;
SpaceStatusDeletionStarted = 2;
SpaceStatusDeleted = 3;
}
message SpaceStatusPayload {
SpaceStatus status = 1;
int64 deletionTimestamp = 2;
} }
message SpaceSignResponse { message SpaceSignResponse {
@ -53,3 +81,25 @@ message FileLimitCheckRequest {
message FileLimitCheckResponse { message FileLimitCheckResponse {
uint64 limit = 1; uint64 limit = 1;
} }
// SpaceStatusCheckRequest contains the spaceId of requested space
message SpaceStatusCheckRequest {
string spaceId = 1;
}
// SpaceStatusCheckResponse contains the current status of space
message SpaceStatusCheckResponse {
SpaceStatusPayload payload = 1;
}
// SpaceStatusChangeRequest contains the deletionChange if we want to delete space, or it is empty otherwise
message SpaceStatusChangeRequest {
string spaceId = 1;
string deletionChangeId = 2;
bytes deletionChangePayload = 3;
}
// SpaceStatusChangeResponse contains changed status of space
message SpaceStatusChangeResponse {
SpaceStatusPayload payload = 1;
}

View File

@ -157,6 +157,20 @@ func (mr *MockConfigurationMockRecorder) ConsensusPeers() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsensusPeers", reflect.TypeOf((*MockConfiguration)(nil).ConsensusPeers)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsensusPeers", reflect.TypeOf((*MockConfiguration)(nil).ConsensusPeers))
} }
// CoordinatorPeers mocks base method.
func (m *MockConfiguration) CoordinatorPeers() []string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CoordinatorPeers")
ret0, _ := ret[0].([]string)
return ret0
}
// CoordinatorPeers indicates an expected call of CoordinatorPeers.
func (mr *MockConfigurationMockRecorder) CoordinatorPeers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CoordinatorPeers", reflect.TypeOf((*MockConfiguration)(nil).CoordinatorPeers))
}
// FilePeers mocks base method. // FilePeers mocks base method.
func (m *MockConfiguration) FilePeers() []string { func (m *MockConfiguration) FilePeers() []string {
m.ctrl.T.Helper() m.ctrl.T.Helper()