Merge remote-tracking branch 'origin/grpc-experiments' into sync-prototype

# Conflicts:
#	cmd/node/node.go
This commit is contained in:
mcrakhman 2022-07-18 14:45:25 +02:00
commit 693c159360
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
17 changed files with 1003 additions and 18 deletions

View File

@ -34,6 +34,10 @@ protos-go:
$(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES))
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. $(P_SYNC_CHANGES_PATH_PB)/protos/*.proto; mv $(P_SYNC_CHANGES_PATH_PB)/protos/*.go $(P_SYNC_CHANGES_PATH_PB)
protos:
GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1 protoc --gogofaster_out=plugins=grpc:. syncproto/proto/*.proto
build:
@$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app))
go build -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go

169
cmd/client/client.go Normal file
View File

@ -0,0 +1,169 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/sec"
"go.uber.org/zap"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"syscall"
"time"
)
var log = logger.NewNamed("client")
var (
flagConfigFile = flag.String("c", "etc/config.yml", "path to config file")
flagVersion = flag.Bool("v", false, "show version and exit")
flagHelp = flag.Bool("h", false, "show help and exit")
)
func main() {
flag.Parse()
if *flagVersion {
fmt.Println(app.VersionDescription())
return
}
if *flagHelp {
flag.PrintDefaults()
return
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
go func() {
http.ListenAndServe(debug, nil)
}()
}
// create app
ctx := context.Background()
a := new(app.App)
// open config file
conf, err := config.NewFromFile(*flagConfigFile)
if err != nil {
log.Fatal("can't open config file", zap.Error(err))
}
// bootstrap components
a.Register(conf).
Register(transport.New()).
Register(&Client{})
// start app
if err := a.Start(ctx); err != nil {
log.Fatal("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
// wait exit signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-exit
log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig)))
// close app
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := a.Close(ctx); err != nil {
log.Fatal("close error", zap.Error(err))
} else {
log.Info("goodbye!")
}
time.Sleep(time.Second / 3)
}
type Client struct {
conf config.GrpcServer
tr transport.Service
sc sec.SecureConn
}
func (c *Client) Init(ctx context.Context, a *app.App) (err error) {
c.tr = a.MustComponent(transport.CName).(transport.Service)
c.conf = a.MustComponent(config.CName).(*config.Config).GrpcServer
return nil
}
func (c *Client) Name() (name string) {
return "testClient"
}
func (c *Client) Run(ctx context.Context) (err error) {
tcpConn, err := net.Dial("tcp", c.conf.ListenAddrs[0])
if err != nil {
return
}
c.sc, err = c.tr.TLSConn(ctx, tcpConn)
if err != nil {
return
}
log.Info("connected with server", zap.String("serverPeer", c.sc.RemotePeer().String()), zap.String("per", c.sc.LocalPeer().String()))
dconn := drpcconn.New(c.sc)
stream, err := dconn.NewStream(ctx, "", enc{})
if err != nil {
return
}
go c.handleStream(stream)
return nil
}
func (c *Client) handleStream(stream drpc.Stream) {
var err error
defer func() {
log.Info("stream closed", zap.Error(err))
}()
var n int64 = 100000
for i := int64(0); i < n; i++ {
st := time.Now()
if err = stream.MsgSend(&syncproto.SyncMessage{Seq: i}, enc{}); err != nil {
if err == io.EOF {
return
}
log.Fatal("send error", zap.Error(err))
}
log.Debug("message sent", zap.Int64("seq", i))
msg := &syncproto.SyncMessage{}
if err := stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
log.Error("msg recv error", zap.Error(err))
}
log.Debug("message received", zap.Int64("seq", msg.Seq), zap.Duration("dur", time.Since(st)))
time.Sleep(time.Second)
}
}
func (c *Client) Close(ctx context.Context) (err error) {
if c.sc != nil {
return c.sc.Close()
}
return
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -8,8 +8,12 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/drpcserver"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
@ -37,6 +41,12 @@ func main() {
return
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
go func() {
http.ListenAndServe(debug, nil)
}()
}
// create app
ctx := context.Background()
a := new(app.App)
@ -61,7 +71,7 @@ func main() {
// start app
if err := a.Start(ctx); err != nil {
log.Error("can't start app", zap.Error(err))
log.Fatal("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
@ -79,8 +89,10 @@ func main() {
} else {
log.Info("goodbye!")
}
time.Sleep(time.Second / 3)
}
func Bootstrap(a *app.App) {
//a.Register(mycomponent.New())
a.Register(transport.New()).
Register(drpcserver.New())
}

View File

@ -2,4 +2,7 @@ package config
type GrpcServer struct {
ListenAddrs []string `yaml:"listenAddrs"`
TLS bool `yaml:"tls"`
TLSCertFile string `yaml:"tlsCertFile"`
TLSKeyFile string `yaml:"tlsKeyFile"`
}

View File

@ -3,4 +3,8 @@ anytype:
grpcServer:
listenAddrs:
- ":443"
- "127.0.0.1:4430"
- "127.0.0.1:4431"
tls: false
tlsKeyFile: "etc/x509/key.pem"
tlsCertFile: "etc/x509/cert.pem"

30
etc/x509/cert.pem Normal file
View File

@ -0,0 +1,30 @@
-----BEGIN CERTIFICATE-----
MIIFJTCCAw2gAwIBAgIUNiBcO8wV6YezcDQ+cLpZe/iXbGYwDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIyMDcxMzE5NDgwNFoXDTMyMDcx
MDE5NDgwNFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEAt+6cVBVkEe9I3CRmPuAqZGnLq48DTXaRH7xz6u4ld7tU
cpDaXw+aURRYKGfYIvcfQav/i+mYUxTQDFbfN8SULis/DckTqeBEaGgUbJJZ6w73
kkB8BuCiIjh3W9hyUHr+WbdF9wU8K1G6GmjimBJ+qlBBewQm0kzqosVwjQVWarN4
aEhgiyjnLF9XVYQZRVqGxKzP/MssEU7YjSPPfBEsmi6pAqiDYuZ3+sVlKuDrki0d
r1XOcV/dcSJa4NRazxiWME+GJQ/x7gA2GlC5FbQyhrOs1sNWaBsaWQGl4oHB90T7
b0GxKOTqI7oCMho+Ajt7eB4zN0fqeCw0nwFyzok3f/GKFk0vLnecFt+gCLyMyS6X
Dqf/DAlvoP41UeFI/rheOwc0UcZbU0HZ2zYrzOYR4eHMHSYKma2DHvIyOYZIV0Uw
tkyOFnoEQGSXK3TMOb7oWyrs4gl+5euPEDKqDuP28xBr4GsFbNq7/7Kd9rKwXb3L
iyy3kot2Vf5QqDFGdyOLDFFKF7MgMrUiEX8onW+fSmjooqUq7ZyLAs8W0uim4moQ
52t7CGUuglaTHYMi2rfoAS5qPXT76jiwu7H351Psca6EIY4V+dIiElvbYryhTNsV
4eDdWGJoZUyGACUhPdfO9l7Wp2+Yy/HGfPWm6mKX8VPHO4llYvwgGzm6Is/pFmUC
AwEAAaNvMG0wHQYDVR0OBBYEFKl07s6kNnGmJN/ASYQTml5UkK0AMB8GA1UdIwQY
MBaAFKl07s6kNnGmJN/ASYQTml5UkK0AMA8GA1UdEwEB/wQFMAMBAf8wGgYDVR0R
BBMwEYIJbG9jYWxob3N0hwR/AAABMA0GCSqGSIb3DQEBCwUAA4ICAQBs5JmRhddd
KuyhkSWd6T/HqAQISgP72ZUAr3gt2j34GLrhDYcvKFZwcoJFCFjG3pVmvJCORVGO
x2TYt2ntsmIyFCZlGE/TpLxbSgsykoUVBnc8ySDnTQTDJr6S7AyWQsznSD6j1/FA
a9E8ZrsyopqIn2eZy9/Asgy1qeJVO4F1kIq+19HUDR2z1rXqVSycOQEJkF84Kgvd
+nDJQ5W3EdamYuDQOhTOeEFfZy1HyM3APhR9JyFHHnZ2D3vsoys/LIWolBJPOq6B
o5JjXgLrA1e12TVXlnTqZ3986vGOyjfut7o2NPO1Se5OeGr6XFwO1nhIJ4gj8OTv
2XuBcslLXI5+6UIsXtFHXAfH7eYErkBCQGiwjYj0V8Kb4M7UZ0seqjK+gMKfvM4Z
hAPlKP2AUNYS7TNyqW3t8SA0c52ASdAezzh/OklCO5vyzxQT4wXTQt5Bub83m9uY
Jnrv6Kg5UPQMrTpo9usJ2zAyj+qkk8KubKOA7grtblmCTvyJFDwyiWZkr0nuvNTV
BsKis/DbJ2hneF+2D/B2pGKxyEP1LCIV/JDTUGX3F8ljTgSihZay/ZZnPUZpbCa0
czodlAQk4wkGxJWSH6SSkq4dD+JnBQpufBMLa1qShfUn+N1N02yiDPq9XxJytoOR
+vHqqrPS0PcTk1x2Og0xLn6kN+MH0+jRNQ==
-----END CERTIFICATE-----

52
etc/x509/key.pem Normal file
View File

@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC37pxUFWQR70jc
JGY+4CpkacurjwNNdpEfvHPq7iV3u1RykNpfD5pRFFgoZ9gi9x9Bq/+L6ZhTFNAM
Vt83xJQuKz8NyROp4ERoaBRsklnrDveSQHwG4KIiOHdb2HJQev5Zt0X3BTwrUboa
aOKYEn6qUEF7BCbSTOqixXCNBVZqs3hoSGCLKOcsX1dVhBlFWobErM/8yywRTtiN
I898ESyaLqkCqINi5nf6xWUq4OuSLR2vVc5xX91xIlrg1FrPGJYwT4YlD/HuADYa
ULkVtDKGs6zWw1ZoGxpZAaXigcH3RPtvQbEo5OojugIyGj4CO3t4HjM3R+p4LDSf
AXLOiTd/8YoWTS8ud5wW36AIvIzJLpcOp/8MCW+g/jVR4Uj+uF47BzRRxltTQdnb
NivM5hHh4cwdJgqZrYMe8jI5hkhXRTC2TI4WegRAZJcrdMw5vuhbKuziCX7l648Q
MqoO4/bzEGvgawVs2rv/sp32srBdvcuLLLeSi3ZV/lCoMUZ3I4sMUUoXsyAytSIR
fyidb59KaOiipSrtnIsCzxbS6KbiahDna3sIZS6CVpMdgyLat+gBLmo9dPvqOLC7
sffnU+xxroQhjhX50iISW9tivKFM2xXh4N1YYmhlTIYAJSE91872Xtanb5jL8cZ8
9abqYpfxU8c7iWVi/CAbOboiz+kWZQIDAQABAoICAAcytJX6z1N/WonY99Jime4b
kM+qNV4g8317NcseHkPBBkVEg3NAbFDbe7a7F6OIqcW7ajEKx34K3Lh65tnrHMRw
x1MuCRG3F95BInl8Qb4X9BraYLAxNs8hiBRFVHXvVkhjCn8oCoqEdITHkYSThrb6
FJHwn+dPgMg1c1nleVQMKXxlRrfzdhaPtZ6AYK/M4uTMAYi6V8Nmo0VkvyAzGNyA
0nbq6tdiPxRGiNbEfhuWneiIcl+P+Z3NkyJk1RfxNaF230BQjy55/iQKADBIAMky
O2OdOeKxmtacYKZMXy+i2LcqKjl4OeyDgUX+LmqOct7IycbSTuv8iOOnHhvUlWIg
nFYHx2i/t5mHp0kPjHUzqu40HaRKPpGxAzzSL/W2WNuSFzbZB6SMTGIxG77hGkUh
saoOSLRDL7DhEvvZ5cvhVM0l2mrufak0sUR7K/TooheQ+0A6bU+pZW1kzfwt4ICI
BlGZt6rw7dSEDMButhfMkWPCIowULrtevB9pOULfvwGcpslMNN6KiAl8mjR6OwJB
7AdWCCCC52p240OKe8N4eBoPr6T44LoBsKFpCI2Ztd8IG3LP6AGJueWFLrmPj1J+
WePs5WX5IGfHSHJondm1BWS1nwkdSZQUaeL/POi0YbaAlgndpZAbpkfxjcLuB7Wd
5iOqiIkrGCjVyFTXBipxAoIBAQDE0amd/TrSPwcdLLkhGZswSgGatlenqw/DLswy
30Vu6NzZ2dRHKk1eX3GApgBtYmkP4n3Jadaxl8OXGCw+mPv9fBHgVrmcwt9CCTNL
uwKvGtEpvXH2LgcmFTEL8CnKoM7ZlXDAlGSPf46pOU8HG8nOqOywkIaoiiLAPZHZ
Y3FMLs7s0UEDYSM0EWHr6/po8VQLSnyN5NDcUhn6dsNgjS4p9W90lHHK24J7dqGh
ye4vWslzB5K47JitYKzpo3+dOUpu1/jo9uwzDsUw7SON7CKtKgldwrzvZGiRBhVW
j+cLHdRdCNz2gF/aI2JwgkW1HC+0X3RQs+H99g+yWNDYbyGVAoIBAQDvPPbql0kz
FzdpInbtBY7Z4V0ZRveeWDMln04qtEuifHzme0K03itACcxAhWAwAV8fCjZJdmkc
nQVJ/0i05IFSXiFAKpuhyA9TkPiwxYwlFpvKDk2lhAQ0yR7ig8wCYs1ZA6PKWwHl
Zd7FJKAjTkHey/KAagA1ya8XTBeIDiIQKriMVU9DdJ+4tEFqZLq9EUd/QHJkrxaV
jwmwcc26oE8XAFdFdZtqqvKjhpO4OjFN8C9TIX70krrzTSN2FTK1Y5IVSanzkYEB
Ovy/gedWzq9evuGRoKKNxYqYUXy/p6DiD36OMHZveGssPCkI4NIdPLVo1L6YXuMS
pLl2PswHmI2RAoIBAAiMCk+gFZPXxNlRfeCgGgsoy5UKYhgX56FUQO3coElGt4CG
Yx+MCLwWbPlnPBGD+ogED+5VOEuSCZ5gIFu/jQHfB2+0lG7oT4WwnJenUWCFS5wC
bBaCvTC8LtsT3Ny9yv3L7Y+PYiaRJYLXyETIwiTFVTH9tLtQ9F1gzxqfpOXoCnhi
Re59o2e5cYUrRD6WbE3pOCt5SlCnCBXGXoms19penC5129MxYSM3baF3AW7xBFqI
c6iwLZkp35htzzbmrALQQjDruCondAzB349kN8VJVArMUCQdOiVCHF8b9K6Y5wX7
Qo052e/BJZ85KQnKZY/xrT1r8l5y9w+Jp9geS1kCggEBAJxp34XBI7qjkzbJsbeF
yr/o+FVucLa2M7qFTTXeaxTxDzghnptiJiTYQxJsIVdBjk9c/eFJ6a8reinHHmIm
g+a2ZEbvlJFRm7OnNPFeNyKIhZK1h06P4bAhTnAKe3eT4W4xUwUaO0MgN2XtbEWp
BKgF76bFpx2Dn1Y8CaaKlvq6863MmOYhecvpDlvhP7YddgFcwW3Si5F803jo7vj1
lsATGPvwyIwU+E6xziLE6TdrsYVIgRimVlR8OpMZiO3PC9OfNd5pY07KojUTWY0H
1OC9K/1qaN0IKnUr0cP8dNNYDgYo6UY4FNn2+10yoC09Y94GOhak8xFdYWRN6leN
BgECggEAbS7bbv1gIB8toaAWBXxEnLtAU0Ob8e7uD2JYKZqe6NaC+ZX5NTLn+QSG
Y4SkBORGw+v1BIw7Rdk6jlEPpYWncQigQ6YbhqerL39+lmatXoSbQBcMRsjPP0gF
yuOb48ff/uXJhlnbVBJHXMfOW8LDFXL3bgMS2HpWnr5Buu7zGB4ERHg91+tuczNL
qEa85pyvY974arF/53T7Kmzdd/fx7I6RZeZpBGWKqwuZSJTMYL7V/LyKyfU8CTP1
nz86BQIF1Cr8UXyOUI33UZMFwIXb62HRDXAfij7Ew33rj803l4cedNluXHSx/kNH
3XcrP5qofkzfgz1calZv6phIGJdHLQ==
-----END PRIVATE KEY-----

27
go.mod
View File

@ -6,32 +6,47 @@ require (
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab
github.com/goccy/go-graphviz v0.0.9
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-cid v0.1.0
github.com/libp2p/go-libp2p v0.20.3
github.com/libp2p/go-libp2p-core v0.16.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-multihash v0.1.0
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.21.0
gopkg.in/yaml.v3 v3.0.1
storj.io/drpc v0.0.32
)
require (
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/klauspost/cpuid/v2 v2.0.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.5.0 // indirect
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/zeebo/errs v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
)

57
go.sum
View File

@ -1,12 +1,22 @@
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab h1:+cdNqtOJWjvepyhxy23G7z7vmpYCoC65AP0nqi1f53s=
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab/go.mod h1:GEV5wmg4YquNw7v1kkyoX9etIk8yVmXj+AkDHuuETHs=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c=
github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y=
github.com/btcsuite/btcd/btcec/v2 v2.1.3 h1:xM/n3yIhHAhHy04z4i43C8p4ehixJZMsnrVJkgl+MTE=
github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA=
github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/goccy/go-graphviz v0.0.9 h1:s/FMMJ1Joj6La3S5ApO3Jk2cwM4LpXECC2muFx3IPQQ=
@ -15,18 +25,28 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-cid v0.1.0 h1:YN33LQulcRHjfom/i25yoOZR4Telp1Hr/2RU3d0PnC0=
github.com/ipfs/go-cid v0.1.0/go.mod h1:rH5/Xv83Rfy8Rw6xG+id3DYAMUVmem1MowoKwdXmN2o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-libp2p v0.20.3 h1:tjjDNfp7FqdI/7v1rXtB/BtELaPlAThL2uzlj18kcrw=
github.com/libp2p/go-libp2p v0.20.3/go.mod h1:I+vndVanE/p/SjFbnA+BEmmfAUEpWxrdXZeyQ1Dus5c=
github.com/libp2p/go-libp2p-core v0.16.1 h1:bWoiEBqVkpJ13hbv/f69tHODp86t6mvc4fBN4DkK73M=
github.com/libp2p/go-libp2p-core v0.16.1/go.mod h1:O3i/7y+LqUb0N+qhzXjBjjpchgptWAVMG1Voegk7b4c=
github.com/libp2p/go-openssl v0.0.7 h1:eCAzdLejcNVBzP/iZM9vqHnQm+XyCEbSSIheIPRGNsw=
github.com/libp2p/go-openssl v0.0.7/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
@ -40,11 +60,17 @@ github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4=
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multiaddr v0.5.0 h1:i/JuOoVg4szYQ4YEzDGtb2h0o8M7CG/Yq6cGlcjWZpM=
github.com/multiformats/go-multiaddr v0.5.0/go.mod h1:3KAxNkUqLTJ20AAwN4XVX4kZar+bR+gh4zgbfr3SNug=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multicodec v0.4.1 h1:BSJbf+zpghcZMZrwTYBGwy0CPcVZGWiC72Cp8bBd4R4=
github.com/multiformats/go-multicodec v0.4.1/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM=
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg=
github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtOWcJT0q9+EA=
github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
@ -55,6 +81,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -63,6 +92,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
@ -79,8 +111,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg=
golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@ -100,13 +133,15 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -126,9 +161,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c=
lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=

View File

@ -0,0 +1,157 @@
package drpcserver
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"io"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcserver"
"strings"
"time"
)
var log = logger.NewNamed("drpcserver")
const CName = "drpcserver"
func New() DRPCServer {
return &drpcServer{}
}
type DRPCServer interface {
app.ComponentRunnable
}
type drpcServer struct {
config config.GrpcServer
drpcServer *drpcserver.Server
transport transport.Service
listeners []transport.ContextListener
cancel func()
}
func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) {
s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer
s.transport = a.MustComponent(transport.CName).(transport.Service)
return nil
}
func (s *drpcServer) Name() (name string) {
return CName
}
func (s *drpcServer) Run(ctx context.Context) (err error) {
s.drpcServer = drpcserver.New(s)
ctx, s.cancel = context.WithCancel(ctx)
for _, addr := range s.config.ListenAddrs {
tcpList, err := net.Listen("tcp", addr)
if err != nil {
return err
}
tlsList := s.transport.TLSListener(tcpList)
go s.serve(ctx, tlsList)
}
return
}
func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
l.Debug("drpc listener stopped")
}()
for {
select {
case <-ctx.Done():
return
default:
}
ctx, conn, err := lis.Accept(ctx)
if err != nil {
if isTemporary(err) {
l.Debug("listener temporary accept error", zap.Error(err))
t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
case <-ctx.Done():
return
}
continue
}
if _, ok := err.(transport.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}
l.Error("listener accept error", zap.Error(err))
return
}
go s.serveConn(ctx, conn)
}
}
func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) {
l := log.With(zap.String("remoteAddr", conn.RemoteAddr().String())).With(zap.String("localAddr", conn.LocalAddr().String()))
l.Debug("connection opened")
if err := s.drpcServer.ServeOne(ctx, conn); err != nil {
if err == context.Canceled || strings.Contains(err.Error(), "EOF") {
l.Debug("connection closed")
} else {
l.Warn("serve connection error", zap.Error(err))
}
}
}
func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) {
ctx := stream.Context()
sc, err := transport.CtxSecureConn(ctx)
if err != nil {
return
}
l := log.With(zap.String("peer", sc.RemotePeer().String()))
l.Info("stream opened")
defer func() {
l.Info("stream closed", zap.Error(err))
}()
for {
msg := &syncproto.SyncMessage{}
if err = stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
}
//log.Debug("receive msg", zap.Int("seq", int(msg.Seq)))
if err = stream.MsgSend(msg, enc{}); err != nil {
return
}
}
return nil
}
func (s *drpcServer) Close(ctx context.Context) (err error) {
if s.cancel != nil {
s.cancel()
}
for _, l := range s.listeners {
if e := l.Close(); e != nil {
log.Warn("close listener error", zap.Error(e))
}
}
return
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -0,0 +1,18 @@
//go:build !windows
package drpcserver
import (
"errors"
"net"
)
// isTemporary checks if an error is temporary.
func isTemporary(err error) bool {
var nErr net.Error
if errors.As(err, &nErr) {
return nErr.Temporary()
}
return false
}

View File

@ -0,0 +1,41 @@
//go:build windows
package drpcserver
import (
"errors"
"net"
"os"
"syscall"
)
const (
_WSAEMFILE syscall.Errno = 10024
_WSAENETRESET syscall.Errno = 10052
_WSAENOBUFS syscall.Errno = 10055
)
// isTemporary checks if an error is temporary.
// see related go issue for more detail: https://go-review.googlesource.com/c/go/+/208537/
func isTemporary(err error) bool {
var nErr net.Error
if !errors.As(err, &nErr) {
return false
}
if nErr.Temporary() {
return true
}
var sErr *os.SyscallError
if errors.As(err, &sErr) {
switch sErr.Err {
case _WSAENETRESET,
_WSAEMFILE,
_WSAENOBUFS:
return true
}
}
return false
}

View File

@ -0,0 +1,28 @@
package transport
import (
"context"
"errors"
"github.com/libp2p/go-libp2p-core/sec"
)
var (
ErrSecureConnNotFoundInContext = errors.New("secure connection not found in context")
)
type contextKey uint
const (
contextKeySecureConn contextKey = iota
)
func CtxSecureConn(ctx context.Context) (sec.SecureConn, error) {
if conn, ok := ctx.Value(contextKeySecureConn).(sec.SecureConn); ok {
return conn, nil
}
return nil, ErrSecureConnNotFoundInContext
}
func ctxWithSecureConn(ctx context.Context, conn sec.SecureConn) context.Context {
return context.WithValue(ctx, contextKeySecureConn, conn)
}

View File

@ -0,0 +1,50 @@
package transport
import (
"context"
"github.com/libp2p/go-libp2p-core/crypto"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"net"
)
type ContextListener interface {
// Accept works like net.Listener accept but add context
Accept(ctx context.Context) (context.Context, net.Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() net.Addr
}
func newTLSListener(key crypto.PrivKey, lis net.Listener) ContextListener {
tr, _ := libp2ptls.New(key)
return &tlsListener{
tr: tr,
Listener: lis,
}
}
type tlsListener struct {
net.Listener
tr *libp2ptls.Transport
}
func (p *tlsListener) Accept(ctx context.Context) (context.Context, net.Conn, error) {
conn, err := p.Listener.Accept()
if err != nil {
return nil, nil, err
}
return p.upgradeConn(ctx, conn)
}
func (p *tlsListener) upgradeConn(ctx context.Context, conn net.Conn) (context.Context, net.Conn, error) {
secure, err := p.tr.SecureInbound(ctx, conn, "")
if err != nil {
return nil, nil, HandshakeError(err)
}
ctx = ctxWithSecureConn(ctx, secure)
return ctx, secure, nil
}

View File

@ -0,0 +1,60 @@
package transport
import (
"context"
"crypto/rand"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/sec"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"go.uber.org/zap"
"net"
)
type HandshakeError error
var log = logger.NewNamed("transport")
const CName = "transport"
func New() Service {
return &service{}
}
type Service interface {
TLSListener(lis net.Listener) ContextListener
TLSConn(ctx context.Context, conn net.Conn) (sec.SecureConn, error)
app.Component
}
type service struct {
key crypto.PrivKey
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
var pubKey crypto.PubKey
s.key, pubKey, err = crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return
}
pubKeyRaw, _ := pubKey.Raw()
log.Info("transport keys generated", zap.Binary("pubKey", pubKeyRaw))
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) TLSListener(lis net.Listener) ContextListener {
return newTLSListener(s.key, lis)
}
func (s *service) TLSConn(ctx context.Context, conn net.Conn) (sec.SecureConn, error) {
tr, err := libp2ptls.New(s.key)
if err != nil {
return nil, err
}
return tr.SecureOutbound(ctx, conn, "")
}

View File

@ -0,0 +1,7 @@
syntax = "proto3";
package anytype;
option go_package = "/syncproto";
message SyncMessage {
int64 seq = 1;
}

296
syncproto/sync.pb.go Normal file
View File

@ -0,0 +1,296 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: syncproto/proto/sync.proto
package syncproto
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type SyncMessage struct {
Seq int64 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"`
}
func (m *SyncMessage) Reset() { *m = SyncMessage{} }
func (m *SyncMessage) String() string { return proto.CompactTextString(m) }
func (*SyncMessage) ProtoMessage() {}
func (*SyncMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{0}
}
func (m *SyncMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SyncMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SyncMessage.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SyncMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncMessage.Merge(m, src)
}
func (m *SyncMessage) XXX_Size() int {
return m.Size()
}
func (m *SyncMessage) XXX_DiscardUnknown() {
xxx_messageInfo_SyncMessage.DiscardUnknown(m)
}
var xxx_messageInfo_SyncMessage proto.InternalMessageInfo
func (m *SyncMessage) GetSeq() int64 {
if m != nil {
return m.Seq
}
return 0
}
func init() {
proto.RegisterType((*SyncMessage)(nil), "anytype.SyncMessage")
}
func init() { proto.RegisterFile("syncproto/proto/sync.proto", fileDescriptor_4b28dfdd48a89166) }
var fileDescriptor_4b28dfdd48a89166 = []byte{
// 119 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2a, 0xae, 0xcc, 0x4b,
0x2e, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x87, 0x90, 0x20, 0xbe, 0x1e, 0x98, 0x29, 0xc4, 0x9e, 0x98,
0x57, 0x59, 0x52, 0x59, 0x90, 0xaa, 0x24, 0xcf, 0xc5, 0x1d, 0x5c, 0x99, 0x97, 0xec, 0x9b, 0x5a,
0x5c, 0x9c, 0x98, 0x9e, 0x2a, 0x24, 0xc0, 0xc5, 0x5c, 0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8,
0xc1, 0x1c, 0x04, 0x62, 0x3a, 0xa9, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83,
0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43,
0x14, 0x97, 0x3e, 0xdc, 0x82, 0x24, 0x36, 0x30, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x7e,
0xbc, 0x46, 0xd2, 0x74, 0x00, 0x00, 0x00,
}
func (m *SyncMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SyncMessage) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SyncMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Seq != 0 {
i = encodeVarintSync(dAtA, i, uint64(m.Seq))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintSync(dAtA []byte, offset int, v uint64) int {
offset -= sovSync(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *SyncMessage) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Seq != 0 {
n += 1 + sovSync(uint64(m.Seq))
}
return n
}
func sovSync(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozSync(x uint64) (n int) {
return sovSync(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *SyncMessage) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SyncMessage: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SyncMessage: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType)
}
m.Seq = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Seq |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipSync(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipSync(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthSync
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupSync
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthSync
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthSync = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowSync = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupSync = fmt.Errorf("proto: unexpected end of group")
)