Update connections on space level

This commit is contained in:
mcrakhman 2023-06-04 11:17:56 +02:00
parent 248205cddd
commit b85f545fa3
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 19 additions and 4 deletions

View File

@ -106,8 +106,14 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
ctx = logger.CtxWithFields(ctx, zap.String("peerId", p.Id())) ctx = logger.CtxWithFields(ctx, zap.String("peerId", p.Id()))
conn, err := p.AcquireDrpcConn(ctx)
if err != nil {
return
}
defer p.ReleaseDrpcConn(conn)
var ( var (
cl = d.clientFactory.Client(p) cl = d.clientFactory.Client(conn)
rdiff = NewRemoteDiff(d.spaceId, cl) rdiff = NewRemoteDiff(d.spaceId, cl)
stateCounter = d.syncStatus.StateCounter() stateCounter = d.syncStatus.StateCounter()
) )

View File

@ -28,6 +28,7 @@ import (
"github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"storj.io/drpc"
"sync/atomic" "sync/atomic"
) )
@ -222,8 +223,12 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string)
return return
} }
cl := spacesyncproto.NewDRPCSpaceSyncClient(p) var res *spacesyncproto.SpacePullResponse
res, err := cl.SpacePull(ctx, &spacesyncproto.SpacePullRequest{Id: id}) err = p.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := spacesyncproto.NewDRPCSpaceSyncClient(conn)
res, err = cl.SpacePull(ctx, &spacesyncproto.SpacePullRequest{Id: id})
return err
})
if err != nil { if err != nil {
err = rpcerr.Unwrap(err) err = rpcerr.Unwrap(err)
return return

View File

@ -177,6 +177,10 @@ func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId st
type mockPool struct { type mockPool struct {
} }
func (m *mockPool) AddPeer(ctx context.Context, p peer.Peer) (err error) {
return nil
}
func (m *mockPool) Init(a *app.App) (err error) { func (m *mockPool) Init(a *app.App) (err error) {
return nil return nil
} }

2
go.mod
View File

@ -33,7 +33,6 @@ require (
github.com/stretchr/testify v1.8.3 github.com/stretchr/testify v1.8.3
github.com/tyler-smith/go-bip39 v1.1.0 github.com/tyler-smith/go-bip39 v1.1.0
github.com/zeebo/blake3 v0.2.3 github.com/zeebo/blake3 v0.2.3
github.com/zeebo/errs v1.3.0
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/crypto v0.9.0 golang.org/x/crypto v0.9.0
@ -103,6 +102,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect