net utils wip

This commit is contained in:
Sergey Cherepanov 2022-08-05 12:07:34 +03:00
parent 1400e57cf7
commit 62c5d8e3b9
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
32 changed files with 3261 additions and 385 deletions

View File

@ -1,169 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/sec"
"go.uber.org/zap"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"syscall"
"time"
)
var log = logger.NewNamed("client")
var (
flagConfigFile = flag.String("c", "etc/config.yml", "path to config file")
flagVersion = flag.Bool("v", false, "show version and exit")
flagHelp = flag.Bool("h", false, "show help and exit")
)
func main() {
flag.Parse()
if *flagVersion {
fmt.Println(app.VersionDescription())
return
}
if *flagHelp {
flag.PrintDefaults()
return
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
go func() {
http.ListenAndServe(debug, nil)
}()
}
// create app
ctx := context.Background()
a := new(app.App)
// open config file
conf, err := config.NewFromFile(*flagConfigFile)
if err != nil {
log.Fatal("can't open config file", zap.Error(err))
}
// bootstrap components
a.Register(conf).
Register(transport.New()).
Register(&Client{})
// start app
if err := a.Start(ctx); err != nil {
log.Fatal("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
// wait exit signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-exit
log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig)))
// close app
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := a.Close(ctx); err != nil {
log.Fatal("close error", zap.Error(err))
} else {
log.Info("goodbye!")
}
time.Sleep(time.Second / 3)
}
type Client struct {
conf config.GrpcServer
tr transport.Service
sc sec.SecureConn
}
func (c *Client) Init(ctx context.Context, a *app.App) (err error) {
c.tr = a.MustComponent(transport.CName).(transport.Service)
c.conf = a.MustComponent(config.CName).(*config.Config).GrpcServer
return nil
}
func (c *Client) Name() (name string) {
return "testClient"
}
func (c *Client) Run(ctx context.Context) (err error) {
tcpConn, err := net.Dial("tcp", c.conf.ListenAddrs[0])
if err != nil {
return
}
c.sc, err = c.tr.TLSConn(ctx, tcpConn)
if err != nil {
return
}
log.Info("connected with server", zap.String("serverPeer", c.sc.RemotePeer().String()), zap.String("per", c.sc.LocalPeer().String()))
dconn := drpcconn.New(c.sc)
stream, err := dconn.NewStream(ctx, "", enc{})
if err != nil {
return
}
go c.handleStream(stream)
return nil
}
func (c *Client) handleStream(stream drpc.Stream) {
var err error
defer func() {
log.Info("stream closed", zap.Error(err))
}()
var n int64 = 100000
for i := int64(0); i < n; i++ {
st := time.Now()
if err = stream.MsgSend(&syncproto.SyncMessage{Seq: i}, enc{}); err != nil {
if err == io.EOF {
return
}
log.Fatal("send error", zap.Error(err))
}
log.Debug("message sent", zap.Int64("seq", i))
msg := &syncproto.SyncMessage{}
if err := stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
log.Error("msg recv error", zap.Error(err))
}
log.Debug("message received", zap.Int64("seq", msg.Seq), zap.Duration("dur", time.Since(st)))
time.Sleep(time.Second)
}
}
func (c *Client) Close(ctx context.Context) (err error) {
if c.sc != nil {
return c.sc.Close()
}
return
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -7,8 +7,11 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/drpcserver"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/example"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
@ -82,6 +85,9 @@ func main() {
}
func Bootstrap(a *app.App) {
a.Register(transport.New()).
Register(drpcserver.New())
a.Register(secure.New()).
Register(server.New()).
Register(dialer.New()).
Register(pool.NewPool()).
Register(&example.Example{})
}

View File

@ -26,6 +26,7 @@ func NewFromFile(path string) (c *Config, err error) {
type Config struct {
Anytype Anytype `yaml:"anytype"`
GrpcServer GrpcServer `yaml:"grpcServer"`
PeerList PeerList `yaml:"peerList"`
}
func (c *Config) Init(ctx context.Context, a *app.App) (err error) {

View File

@ -2,7 +2,4 @@ package config
type GrpcServer struct {
ListenAddrs []string `yaml:"listenAddrs"`
TLS bool `yaml:"tls"`
TLSCertFile string `yaml:"tlsCertFile"`
TLSKeyFile string `yaml:"tlsKeyFile"`
}

14
config/peer.go Normal file
View File

@ -0,0 +1,14 @@
package config
type PeerList struct {
MyId struct {
PeerId string `yaml:"peerId"`
PrivKey string `yaml:"privKey"`
} `yaml:"myId"`
Remote []PeerRemote `yaml:"remote"`
}
type PeerRemote struct {
PeerId string `yaml:"peerId"`
Addr string `yaml:"addr"`
}

16
etc/config.1.yml Normal file
View File

@ -0,0 +1,16 @@
anytype:
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- "127.0.0.1:4431"
peerList:
myId:
peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
privKey: "InCGjb55V9+jj2PebUExUuwrpOIBc4hmgk2dSqyk3k4DjmgrdoNVuFe7xCFaFdUVb0RJYj6A+OTp2yXASTmq2w=="
remote:
- peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
addr: "127.0.0.1:4430"
- peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
addr: "127.0.0.1:4432"

16
etc/config.2.yml Normal file
View File

@ -0,0 +1,16 @@
anytype:
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- "127.0.0.1:4432"
peerList:
myId:
peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
privKey: "jynYZBgtM4elT+6e7M5UERTJCZgUd3hDdmQjCqTpApyJ4h53V6TQan4Ru4OXqz+91rCLjpIVdphhaB0l+TvNsA=="
remote:
- peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
addr: "127.0.0.1:4431"
- peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
addr: "127.0.0.1:4430"

View File

@ -4,7 +4,13 @@ anytype:
grpcServer:
listenAddrs:
- "127.0.0.1:4430"
- "127.0.0.1:4431"
tls: false
tlsKeyFile: "etc/x509/key.pem"
tlsCertFile: "etc/x509/cert.pem"
peerList:
myId:
peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
privKey: "3BhkWxi0Vzc2AhLtw7LhFHa9Ys4P0wHOlPCW02O6FKJvS8V2nzkJGDlM3vcSLZZ0m9tyYuCKqK4TWUKXgu/FVQ=="
remote:
- peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
addr: "127.0.0.1:4431"
- peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
addr: "127.0.0.1:4432"

View File

@ -1,30 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFJTCCAw2gAwIBAgIUNiBcO8wV6YezcDQ+cLpZe/iXbGYwDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIyMDcxMzE5NDgwNFoXDTMyMDcx
MDE5NDgwNFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEAt+6cVBVkEe9I3CRmPuAqZGnLq48DTXaRH7xz6u4ld7tU
cpDaXw+aURRYKGfYIvcfQav/i+mYUxTQDFbfN8SULis/DckTqeBEaGgUbJJZ6w73
kkB8BuCiIjh3W9hyUHr+WbdF9wU8K1G6GmjimBJ+qlBBewQm0kzqosVwjQVWarN4
aEhgiyjnLF9XVYQZRVqGxKzP/MssEU7YjSPPfBEsmi6pAqiDYuZ3+sVlKuDrki0d
r1XOcV/dcSJa4NRazxiWME+GJQ/x7gA2GlC5FbQyhrOs1sNWaBsaWQGl4oHB90T7
b0GxKOTqI7oCMho+Ajt7eB4zN0fqeCw0nwFyzok3f/GKFk0vLnecFt+gCLyMyS6X
Dqf/DAlvoP41UeFI/rheOwc0UcZbU0HZ2zYrzOYR4eHMHSYKma2DHvIyOYZIV0Uw
tkyOFnoEQGSXK3TMOb7oWyrs4gl+5euPEDKqDuP28xBr4GsFbNq7/7Kd9rKwXb3L
iyy3kot2Vf5QqDFGdyOLDFFKF7MgMrUiEX8onW+fSmjooqUq7ZyLAs8W0uim4moQ
52t7CGUuglaTHYMi2rfoAS5qPXT76jiwu7H351Psca6EIY4V+dIiElvbYryhTNsV
4eDdWGJoZUyGACUhPdfO9l7Wp2+Yy/HGfPWm6mKX8VPHO4llYvwgGzm6Is/pFmUC
AwEAAaNvMG0wHQYDVR0OBBYEFKl07s6kNnGmJN/ASYQTml5UkK0AMB8GA1UdIwQY
MBaAFKl07s6kNnGmJN/ASYQTml5UkK0AMA8GA1UdEwEB/wQFMAMBAf8wGgYDVR0R
BBMwEYIJbG9jYWxob3N0hwR/AAABMA0GCSqGSIb3DQEBCwUAA4ICAQBs5JmRhddd
KuyhkSWd6T/HqAQISgP72ZUAr3gt2j34GLrhDYcvKFZwcoJFCFjG3pVmvJCORVGO
x2TYt2ntsmIyFCZlGE/TpLxbSgsykoUVBnc8ySDnTQTDJr6S7AyWQsznSD6j1/FA
a9E8ZrsyopqIn2eZy9/Asgy1qeJVO4F1kIq+19HUDR2z1rXqVSycOQEJkF84Kgvd
+nDJQ5W3EdamYuDQOhTOeEFfZy1HyM3APhR9JyFHHnZ2D3vsoys/LIWolBJPOq6B
o5JjXgLrA1e12TVXlnTqZ3986vGOyjfut7o2NPO1Se5OeGr6XFwO1nhIJ4gj8OTv
2XuBcslLXI5+6UIsXtFHXAfH7eYErkBCQGiwjYj0V8Kb4M7UZ0seqjK+gMKfvM4Z
hAPlKP2AUNYS7TNyqW3t8SA0c52ASdAezzh/OklCO5vyzxQT4wXTQt5Bub83m9uY
Jnrv6Kg5UPQMrTpo9usJ2zAyj+qkk8KubKOA7grtblmCTvyJFDwyiWZkr0nuvNTV
BsKis/DbJ2hneF+2D/B2pGKxyEP1LCIV/JDTUGX3F8ljTgSihZay/ZZnPUZpbCa0
czodlAQk4wkGxJWSH6SSkq4dD+JnBQpufBMLa1qShfUn+N1N02yiDPq9XxJytoOR
+vHqqrPS0PcTk1x2Og0xLn6kN+MH0+jRNQ==
-----END CERTIFICATE-----

View File

@ -1,52 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC37pxUFWQR70jc
JGY+4CpkacurjwNNdpEfvHPq7iV3u1RykNpfD5pRFFgoZ9gi9x9Bq/+L6ZhTFNAM
Vt83xJQuKz8NyROp4ERoaBRsklnrDveSQHwG4KIiOHdb2HJQev5Zt0X3BTwrUboa
aOKYEn6qUEF7BCbSTOqixXCNBVZqs3hoSGCLKOcsX1dVhBlFWobErM/8yywRTtiN
I898ESyaLqkCqINi5nf6xWUq4OuSLR2vVc5xX91xIlrg1FrPGJYwT4YlD/HuADYa
ULkVtDKGs6zWw1ZoGxpZAaXigcH3RPtvQbEo5OojugIyGj4CO3t4HjM3R+p4LDSf
AXLOiTd/8YoWTS8ud5wW36AIvIzJLpcOp/8MCW+g/jVR4Uj+uF47BzRRxltTQdnb
NivM5hHh4cwdJgqZrYMe8jI5hkhXRTC2TI4WegRAZJcrdMw5vuhbKuziCX7l648Q
MqoO4/bzEGvgawVs2rv/sp32srBdvcuLLLeSi3ZV/lCoMUZ3I4sMUUoXsyAytSIR
fyidb59KaOiipSrtnIsCzxbS6KbiahDna3sIZS6CVpMdgyLat+gBLmo9dPvqOLC7
sffnU+xxroQhjhX50iISW9tivKFM2xXh4N1YYmhlTIYAJSE91872Xtanb5jL8cZ8
9abqYpfxU8c7iWVi/CAbOboiz+kWZQIDAQABAoICAAcytJX6z1N/WonY99Jime4b
kM+qNV4g8317NcseHkPBBkVEg3NAbFDbe7a7F6OIqcW7ajEKx34K3Lh65tnrHMRw
x1MuCRG3F95BInl8Qb4X9BraYLAxNs8hiBRFVHXvVkhjCn8oCoqEdITHkYSThrb6
FJHwn+dPgMg1c1nleVQMKXxlRrfzdhaPtZ6AYK/M4uTMAYi6V8Nmo0VkvyAzGNyA
0nbq6tdiPxRGiNbEfhuWneiIcl+P+Z3NkyJk1RfxNaF230BQjy55/iQKADBIAMky
O2OdOeKxmtacYKZMXy+i2LcqKjl4OeyDgUX+LmqOct7IycbSTuv8iOOnHhvUlWIg
nFYHx2i/t5mHp0kPjHUzqu40HaRKPpGxAzzSL/W2WNuSFzbZB6SMTGIxG77hGkUh
saoOSLRDL7DhEvvZ5cvhVM0l2mrufak0sUR7K/TooheQ+0A6bU+pZW1kzfwt4ICI
BlGZt6rw7dSEDMButhfMkWPCIowULrtevB9pOULfvwGcpslMNN6KiAl8mjR6OwJB
7AdWCCCC52p240OKe8N4eBoPr6T44LoBsKFpCI2Ztd8IG3LP6AGJueWFLrmPj1J+
WePs5WX5IGfHSHJondm1BWS1nwkdSZQUaeL/POi0YbaAlgndpZAbpkfxjcLuB7Wd
5iOqiIkrGCjVyFTXBipxAoIBAQDE0amd/TrSPwcdLLkhGZswSgGatlenqw/DLswy
30Vu6NzZ2dRHKk1eX3GApgBtYmkP4n3Jadaxl8OXGCw+mPv9fBHgVrmcwt9CCTNL
uwKvGtEpvXH2LgcmFTEL8CnKoM7ZlXDAlGSPf46pOU8HG8nOqOywkIaoiiLAPZHZ
Y3FMLs7s0UEDYSM0EWHr6/po8VQLSnyN5NDcUhn6dsNgjS4p9W90lHHK24J7dqGh
ye4vWslzB5K47JitYKzpo3+dOUpu1/jo9uwzDsUw7SON7CKtKgldwrzvZGiRBhVW
j+cLHdRdCNz2gF/aI2JwgkW1HC+0X3RQs+H99g+yWNDYbyGVAoIBAQDvPPbql0kz
FzdpInbtBY7Z4V0ZRveeWDMln04qtEuifHzme0K03itACcxAhWAwAV8fCjZJdmkc
nQVJ/0i05IFSXiFAKpuhyA9TkPiwxYwlFpvKDk2lhAQ0yR7ig8wCYs1ZA6PKWwHl
Zd7FJKAjTkHey/KAagA1ya8XTBeIDiIQKriMVU9DdJ+4tEFqZLq9EUd/QHJkrxaV
jwmwcc26oE8XAFdFdZtqqvKjhpO4OjFN8C9TIX70krrzTSN2FTK1Y5IVSanzkYEB
Ovy/gedWzq9evuGRoKKNxYqYUXy/p6DiD36OMHZveGssPCkI4NIdPLVo1L6YXuMS
pLl2PswHmI2RAoIBAAiMCk+gFZPXxNlRfeCgGgsoy5UKYhgX56FUQO3coElGt4CG
Yx+MCLwWbPlnPBGD+ogED+5VOEuSCZ5gIFu/jQHfB2+0lG7oT4WwnJenUWCFS5wC
bBaCvTC8LtsT3Ny9yv3L7Y+PYiaRJYLXyETIwiTFVTH9tLtQ9F1gzxqfpOXoCnhi
Re59o2e5cYUrRD6WbE3pOCt5SlCnCBXGXoms19penC5129MxYSM3baF3AW7xBFqI
c6iwLZkp35htzzbmrALQQjDruCondAzB349kN8VJVArMUCQdOiVCHF8b9K6Y5wX7
Qo052e/BJZ85KQnKZY/xrT1r8l5y9w+Jp9geS1kCggEBAJxp34XBI7qjkzbJsbeF
yr/o+FVucLa2M7qFTTXeaxTxDzghnptiJiTYQxJsIVdBjk9c/eFJ6a8reinHHmIm
g+a2ZEbvlJFRm7OnNPFeNyKIhZK1h06P4bAhTnAKe3eT4W4xUwUaO0MgN2XtbEWp
BKgF76bFpx2Dn1Y8CaaKlvq6863MmOYhecvpDlvhP7YddgFcwW3Si5F803jo7vj1
lsATGPvwyIwU+E6xziLE6TdrsYVIgRimVlR8OpMZiO3PC9OfNd5pY07KojUTWY0H
1OC9K/1qaN0IKnUr0cP8dNNYDgYo6UY4FNn2+10yoC09Y94GOhak8xFdYWRN6leN
BgECggEAbS7bbv1gIB8toaAWBXxEnLtAU0Ob8e7uD2JYKZqe6NaC+ZX5NTLn+QSG
Y4SkBORGw+v1BIw7Rdk6jlEPpYWncQigQ6YbhqerL39+lmatXoSbQBcMRsjPP0gF
yuOb48ff/uXJhlnbVBJHXMfOW8LDFXL3bgMS2HpWnr5Buu7zGB4ERHg91+tuczNL
qEa85pyvY974arF/53T7Kmzdd/fx7I6RZeZpBGWKqwuZSJTMYL7V/LyKyfU8CTP1
nz86BQIF1Cr8UXyOUI33UZMFwIXb62HRDXAfij7Ew33rj803l4cedNluXHSx/kNH
3XcrP5qofkzfgz1calZv6phIGJdHLQ==
-----END PRIVATE KEY-----

1
go.mod
View File

@ -47,6 +47,7 @@ require (
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
)

2
go.sum
View File

@ -160,6 +160,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View File

@ -13,6 +13,7 @@ var (
ErrClosed = errors.New("object cache closed")
ErrExists = errors.New("object exists")
ErrTimeout = errors.New("loading object timed out")
ErrNotExists = errors.New("object not exists")
)
var (
@ -20,10 +21,6 @@ var (
defaultGC = 20 * time.Second
)
type key int
const CacheTimeout key = 0
var log = logger.NewNamed("ocache")
type LoadFunc func(ctx context.Context, id string) (value Object, err error)
@ -61,7 +58,9 @@ func New(loadFunc LoadFunc, opts ...Option) OCache {
for _, o := range opts {
o(c)
}
if c.ttl != 0 {
go c.ticker()
}
return c
}
@ -99,6 +98,8 @@ type OCache interface {
// Increases the object refs counter on successful
// When 'loadFunc' returns a non-nil error, an object will not be stored to cache
Get(ctx context.Context, id string) (value Object, err error)
// Pick returns value if it's presents in cache (will not call loadFunc)
Pick(id string) (value Object, err error)
// Add adds new object to cache
// Returns error when object exists
Add(id string, value Object) (err error)
@ -154,26 +155,26 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
e.refCount++
c.mu.Unlock()
timeout := ctx.Value(CacheTimeout)
if load {
if timeout != nil {
go c.load(ctx, id, e)
} else {
c.load(ctx, id, e)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.load:
}
return e.value, e.loadErr
}
if timeout != nil {
duration := timeout.(time.Duration)
select {
case <-e.load:
return e.value, e.loadErr
case <-time.After(duration):
return nil, ErrTimeout
func (c *oCache) Pick(id string) (value Object, err error) {
c.mu.Lock()
val, ok := c.data[id]
c.mu.Unlock()
if !ok {
return nil, ErrNotExists
}
}
<-e.load
return e.value, e.loadErr
<-val.load
return val.value, val.loadErr
}
func (c *oCache) load(ctx context.Context, id string, e *entry) {

View File

@ -92,6 +92,20 @@ func TestOCache_Get(t *testing.T) {
_, err := c.Get(context.TODO(), "id")
assert.Equal(t, ErrClosed, err)
})
t.Run("context cancel", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
time.Sleep(time.Second / 3)
return &testObject{
name: "id",
}, nil
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.Get(ctx, "id")
assert.Equal(t, context.Canceled, err)
assert.NoError(t, c.Close())
})
}
func TestOCache_GC(t *testing.T) {

View File

@ -0,0 +1,84 @@
package example
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"go.uber.org/zap"
"strings"
"time"
)
var log = logger.NewNamed("example")
type Example struct {
pool pool.Pool
peerConf config.PeerList
}
func (e *Example) Init(ctx context.Context, a *app.App) (err error) {
e.pool = a.MustComponent(pool.CName).(pool.Pool)
e.peerConf = a.MustComponent(config.CName).(*config.Config).PeerList
// subscribe for sync messages
e.pool.AddHandler(syncproto.MessageType_MessageTypeSync, e.syncHandler)
return
}
func (e *Example) Name() (name string) {
return "example"
}
func (e *Example) Run(ctx context.Context) (err error) {
// dial manually with all peers
for _, rp := range e.peerConf.Remote {
if er := e.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil {
log.Info("can't dial to peer", zap.Error(er))
} else {
log.Info("connected with peer", zap.String("peerId", rp.PeerId))
}
}
go e.doRequests()
return nil
}
func (e *Example) syncHandler(ctx context.Context, msg *pool.Message) (err error) {
data := string(msg.Data) // you need unmarshal this bytes
log.Info("msg received", zap.String("peerId", msg.Peer().Id()), zap.String("data", data))
if strings.HasPrefix(data, "ack:") {
if err = msg.Ack(); err != nil {
log.Error("ack error", zap.Error(err))
}
} else if strings.HasPrefix(data, "ackErr:") {
if err = msg.AckError(42, "ack error description"); err != nil {
log.Error("ackErr error", zap.Error(err))
}
} else if strings.HasPrefix(data, "reply:") {
if err = msg.Reply([]byte("reply for:" + strings.TrimPrefix(data, "reply:"))); err != nil {
log.Error("reply error", zap.Error(err))
}
}
return nil
}
func (e *Example) doRequests() {
time.Sleep(time.Second)
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
st := time.Now()
err := e.pool.SendAndWait(ctx, e.peerConf.Remote[0].PeerId, &syncproto.Message{
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
Data: []byte("ack: something"),
})
log.Info("sent with ack:", zap.Error(err), zap.Duration("dur", time.Since(st)))
}
func (e *Example) Close(ctx context.Context) (err error) {
return
}

View File

@ -0,0 +1,104 @@
package dialer
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"github.com/libp2p/go-libp2p-core/sec"
"go.uber.org/zap"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"sync"
)
const CName = "net/dialer"
var ErrArrdsNotFound = errors.New("addrs for peer not found")
var log = logger.NewNamed(CName)
func New() Dialer {
return &dialer{}
}
type Dialer interface {
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
UpdateAddrs(addrs map[string][]string)
app.Component
}
type dialer struct {
transport secure.Service
peerAddrs map[string][]string
mu sync.RWMutex
}
func (d *dialer) Init(ctx context.Context, a *app.App) (err error) {
d.transport = a.MustComponent(secure.CName).(secure.Service)
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote
d.peerAddrs = map[string][]string{}
for _, rp := range peerConf {
d.peerAddrs[rp.PeerId] = []string{rp.Addr}
}
return
}
func (d *dialer) Name() (name string) {
return CName
}
func (d *dialer) UpdateAddrs(addrs map[string][]string) {
d.mu.Lock()
d.peerAddrs = addrs
d.mu.Unlock()
}
func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) {
d.mu.RLock()
defer d.mu.RUnlock()
addrs, ok := d.peerAddrs[peerId]
if !ok || len(addrs) == 0 {
return nil, ErrArrdsNotFound
}
var (
stream drpc.Stream
sc sec.SecureConn
)
for _, addr := range addrs {
stream, sc, err = d.makeStream(ctx, addr)
if err != nil {
log.Info("can't connect to host", zap.String("addr", addr))
} else {
err = nil
break
}
}
if err != nil {
return
}
return rpc.PeerFromStream(sc, stream, false), nil
}
func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Stream, sc sec.SecureConn, err error) {
tcpConn, err := net.Dial("tcp", addr)
if err != nil {
return
}
sc, err = d.transport.TLSConn(ctx, tcpConn)
if err != nil {
return
}
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String()))
stream, err = drpcconn.New(sc).NewStream(ctx, "", rpc.Encoding)
if err != nil {
return
}
return stream, sc, err
}

35
service/net/peer/peer.go Normal file
View File

@ -0,0 +1,35 @@
package peer
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"time"
)
type Dir uint
const (
// DirInbound indicates peer created connection
DirInbound Dir = iota
// DirOutbound indicates that our host created connection
DirOutbound
)
type Info struct {
Id string
Dir Dir
LastActiveUnix int64
}
func (i Info) LastActive() time.Time {
return time.Unix(i.LastActiveUnix, 0)
}
type Peer interface {
Id() string
Info() Info
Recv() (*syncproto.Message, error)
Send(msg *syncproto.Message) (err error)
Context() context.Context
Close() error
}

View File

@ -0,0 +1,71 @@
package pool
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"gopkg.in/mgo.v2/bson"
)
type Message struct {
*syncproto.Message
peer peer.Peer
}
func (m *Message) Peer() peer.Peer {
return m.peer
}
func (m *Message) Reply(data []byte) (err error) {
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSync,
},
Data: data,
}
return m.peer.Send(rep)
}
func (m *Message) Ack() (err error) {
ack := &syncproto.System{
Ack: &syncproto.SystemAck{},
}
data, err := ack.Marshal()
if err != nil {
return
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
},
Data: data,
}
return m.peer.Send(rep)
}
func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) {
ack := &syncproto.System{
Ack: &syncproto.SystemAck{
Error: &syncproto.SystemError{
Code: code,
Description: description,
},
},
}
data, err := ack.Marshal()
if err != nil {
return
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: []byte(bson.NewObjectId()),
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
},
Data: data,
}
return m.peer.Send(rep)
}

28
service/net/pool/peer.go Normal file
View File

@ -0,0 +1,28 @@
package pool
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type peerEntry struct {
peer peer.Peer
groupIds []string
ready chan struct{}
}
func (pe *peerEntry) addGroup(groupId string) (ok bool) {
if slice.FindPos(pe.groupIds, groupId) != -1 {
return false
}
pe.groupIds = append(pe.groupIds, groupId)
return true
}
func (pe *peerEntry) removeGroup(groupId string) (ok bool) {
if slice.FindPos(pe.groupIds, groupId) == -1 {
return false
}
pe.groupIds = slice.Remove(pe.groupIds, groupId)
return true
}

305
service/net/pool/pool.go Normal file
View File

@ -0,0 +1,305 @@
package pool
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
"sync"
"sync/atomic"
)
const CName = "sync/peerPool"
var log = logger.NewNamed("peerPool")
var (
ErrPoolClosed = errors.New("peer pool is closed")
ErrPeerNotFound = errors.New("peer not found")
)
func NewPool() Pool {
return &pool{closed: true}
}
type Handler func(ctx context.Context, msg *Message) (err error)
type Pool interface {
DialAndAddPeer(ctx context.Context, id string) (err error)
AddAndReadPeer(peer peer.Peer) (err error)
AddHandler(msgType syncproto.MessageType, h Handler)
AddPeerIdToGroup(peerId, groupId string) (err error)
RemovePeerIdFromGroup(peerId, groupId string) (err error)
SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error)
Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error)
app.ComponentRunnable
}
type pool struct {
peersById map[string]*peerEntry
waiters waiters
handlers map[syncproto.MessageType][]Handler
peersIdsByGroup map[string][]string
dialer dialer.Dialer
closed bool
mu sync.RWMutex
wg *sync.WaitGroup
}
func (p *pool) Init(ctx context.Context, a *app.App) (err error) {
p.peersById = map[string]*peerEntry{}
p.handlers = map[syncproto.MessageType][]Handler{}
p.peersIdsByGroup = map[string][]string{}
p.waiters = waiters{waiters: map[uint64]*waiter{}}
p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer)
p.wg = &sync.WaitGroup{}
return nil
}
func (p *pool) Name() (name string) {
return CName
}
func (p *pool) Run(ctx context.Context) (err error) {
p.closed = false
return nil
}
func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) {
p.mu.Lock()
defer p.mu.Unlock()
if !p.closed {
// unable to add handler after Run
return
}
p.handlers[msgType] = append(p.handlers[msgType], h)
}
func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrPoolClosed
}
if _, ok := p.peersById[peerId]; ok {
return nil
}
peer, err := p.dialer.Dial(ctx, peerId)
if err != nil {
return
}
p.peersById[peer.Id()] = &peerEntry{
peer: peer,
}
p.wg.Add(1)
go p.readPeerLoop(peer)
return nil
}
func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return ErrPoolClosed
}
p.peersById[peer.Id()] = &peerEntry{
peer: peer,
}
p.wg.Add(1)
p.mu.Unlock()
return p.readPeerLoop(peer)
}
func (p *pool) AddPeerIdToGroup(peerId, groupId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
peer, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
if slice.FindPos(peer.groupIds, groupId) != -1 {
return nil
}
peer.addGroup(groupId)
p.peersIdsByGroup[groupId] = append(p.peersIdsByGroup[groupId], peerId)
return
}
func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
peer, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
if slice.FindPos(peer.groupIds, groupId) == -1 {
return nil
}
peer.removeGroup(groupId)
p.peersIdsByGroup[groupId] = slice.Remove(p.peersIdsByGroup[groupId], peerId)
return
}
func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) {
p.mu.RLock()
peer := p.peersById[peerId]
p.mu.RUnlock()
if peer == nil {
return ErrPeerNotFound
}
repId := p.waiters.NewReplyId()
msg.GetHeader().RequestId = repId
ch := make(chan Reply, 1)
p.waiters.Add(repId, &waiter{ch: ch})
defer p.waiters.Remove(repId)
if err = peer.peer.Send(msg); err != nil {
return
}
select {
case rep := <-ch:
if rep.Error != nil {
return rep.Error
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) {
//TODO implement me
panic("implement me")
}
func (p *pool) readPeerLoop(peer peer.Peer) (err error) {
defer p.wg.Done()
for {
msg, err := peer.Recv()
if err != nil {
log.Debug("peer receive error", zap.Error(err), zap.String("peerId", peer.Id()))
break
}
p.handleMessage(peer, msg)
}
if err = p.removePeer(peer.Id()); err != nil {
log.Error("remove peer error", zap.String("peerId", peer.Id()))
}
return
}
func (p *pool) removePeer(peerId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
delete(p.peersById, peerId)
return
}
func (p *pool) handleMessage(peer peer.Peer, msg *syncproto.Message) {
replyId := msg.GetHeader().GetReplyId()
if replyId != 0 {
if !p.waiters.Send(replyId, Reply{
PeerInfo: peer.Info(),
Message: &Message{
Message: msg,
peer: peer,
},
}) {
log.Debug("received reply with unknown (or expired) replyId", zap.Uint64("replyId", replyId))
}
return
}
handlers := p.handlers[msg.GetHeader().GetType()]
if len(handlers) == 0 {
return
}
message := &Message{Message: msg, peer: peer}
for _, h := range handlers {
if err := h(peer.Context(), message); err != nil {
log.Error("handle message error", zap.Error(err))
}
}
}
func (p *pool) Close(ctx context.Context) (err error) {
p.mu.Lock()
for _, peer := range p.peersById {
peer.peer.Close()
}
wg := p.wg
p.mu.Unlock()
if wg != nil {
wg.Wait()
}
return nil
}
type waiter struct {
sent int
ch chan<- Reply
}
type waiters struct {
waiters map[uint64]*waiter
replySeq uint64
mu sync.Mutex
}
func (w waiters) Send(replyId uint64, r Reply) (ok bool) {
w.mu.Lock()
wait := w.waiters[replyId]
if wait == nil {
w.mu.Unlock()
return false
}
wait.sent++
var lastMessage = wait.sent == cap(wait.ch)
if lastMessage {
delete(w.waiters, replyId)
}
w.mu.Unlock()
wait.ch <- r
if lastMessage {
close(wait.ch)
}
return true
}
func (w waiters) Add(replyId uint64, wait *waiter) {
w.mu.Lock()
w.waiters[replyId] = wait
w.mu.Unlock()
}
func (w waiters) Remove(id uint64) error {
w.mu.Lock()
defer w.mu.Unlock()
if _, ok := w.waiters[id]; ok {
delete(w.waiters, id)
return nil
}
return fmt.Errorf("waiter not found")
}
func (w waiters) NewReplyId() uint64 {
res := atomic.AddUint64(&w.replySeq, 1)
if res == 0 {
return w.NewReplyId()
}
return res
}

View File

@ -0,0 +1,45 @@
package pool
import "context"
// 1. message for one peerId with ack
// pool.SendAndWait(ctx context,.C
// 2. message for many peers without ack (or group)
type Request struct {
groupId string
oneOf []string
all []string
tryDial bool
needReply bool
pool *pool
}
func (r *Request) GroupId(groupId string) *Request {
r.groupId = groupId
return r
}
func (r *Request) All(peerIds ...string) *Request {
r.all = peerIds
return r
}
func (r *Request) OneOf(peerIds ...string) *Request {
r.oneOf = peerIds
return r
}
func (r *Request) TryDial(is bool) *Request {
r.tryDial = is
return r
}
func (r *Request) NeedReply(is bool) *Request {
r.needReply = is
return r
}
func (r *Request) Exec(ctx context.Context, msg *Message) *Results {
return nil
}

View File

@ -0,0 +1,53 @@
package pool
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
)
// Results of request collects replies and errors
// Must be closed after usage r.Close()
type Results struct {
ctx context.Context
cancel func()
waiterId uint64
ch chan Reply
pool *pool
}
// Iterate iterates over replies
// if callback will return a non-nil error then iteration stops
func (r *Results) Iterate(callback func(r Reply) (err error)) (err error) {
if r.ctx == nil || r.ch == nil {
return fmt.Errorf("results not initialized")
}
for {
select {
case <-r.ctx.Done():
return r.ctx.Err()
case m, ok := <-r.ch:
if ok {
if err = callback(m); err != nil {
return err
}
} else {
return
}
}
}
}
// Close cancels iteration and unregister reply handler in the pool
// Required to call to avoid memory leaks
func (r *Results) Close() (err error) {
r.cancel()
return r.pool.waiters.Remove(r.waiterId)
}
// Reply presents the result of request executing can be error or result message
type Reply struct {
PeerInfo peer.Info
Error error
Message *Message
}

View File

@ -0,0 +1,18 @@
package rpc
import (
"github.com/gogo/protobuf/proto"
"storj.io/drpc"
)
var Encoding = enc{}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -1,15 +1,14 @@
package drpcserver
package server
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"go.uber.org/zap"
"io"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcserver"
@ -17,9 +16,9 @@ import (
"time"
)
var log = logger.NewNamed("drpcserver")
const CName = "net/drpcserver"
const CName = "drpcserver"
var log = logger.NewNamed(CName)
func New() DRPCServer {
return &drpcServer{}
@ -32,14 +31,16 @@ type DRPCServer interface {
type drpcServer struct {
config config.GrpcServer
drpcServer *drpcserver.Server
transport transport.Service
listeners []transport.ContextListener
transport secure.Service
listeners []secure.ContextListener
pool pool.Pool
cancel func()
}
func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) {
s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer
s.transport = a.MustComponent(transport.CName).(transport.Service)
s.transport = a.MustComponent(secure.CName).(secure.Service)
s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil
}
@ -61,7 +62,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
return
}
func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) {
func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
@ -85,7 +86,7 @@ func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) {
}
continue
}
if _, ok := err.(transport.HandshakeError); ok {
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}
@ -108,30 +109,14 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) {
}
}
func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) {
func (s *drpcServer) HandleRPC(stream drpc.Stream, _ string) (err error) {
ctx := stream.Context()
sc, err := transport.CtxSecureConn(ctx)
sc, err := secure.CtxSecureConn(ctx)
if err != nil {
return
}
l := log.With(zap.String("peer", sc.RemotePeer().String()))
l.Info("stream opened")
defer func() {
l.Info("stream closed", zap.Error(err))
}()
for {
msg := &syncproto.SyncMessage{}
if err = stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
}
//log.Debug("receive msg", zap.Int("seq", int(msg.Seq)))
if err = stream.MsgSend(msg, enc{}); err != nil {
return
}
}
return nil
log.With(zap.String("peer", sc.RemotePeer().String())).Debug("stream opened")
return s.pool.AddAndReadPeer(rpc.PeerFromStream(sc, stream, true))
}
func (s *drpcServer) Close(ctx context.Context) (err error) {
@ -145,13 +130,3 @@ func (s *drpcServer) Close(ctx context.Context) (err error) {
}
return
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -1,6 +1,6 @@
//go:build !windows
package drpcserver
package server
import (
"errors"

View File

@ -1,6 +1,6 @@
//go:build windows
package drpcserver
package server
import (
"errors"

55
service/net/rpc/stream.go Normal file
View File

@ -0,0 +1,55 @@
package rpc
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/libp2p/go-libp2p-core/sec"
"storj.io/drpc"
"sync/atomic"
"time"
)
func PeerFromStream(sc sec.SecureConn, stream drpc.Stream, incoming bool) peer.Peer {
dp := &drpcPeer{
sc: sc,
Stream: stream,
}
dp.info.Id = sc.RemotePeer().String()
if incoming {
dp.info.Dir = peer.DirInbound
} else {
dp.info.Dir = peer.DirOutbound
}
return dp
}
type drpcPeer struct {
sc sec.SecureConn
info peer.Info
drpc.Stream
}
func (d *drpcPeer) Id() string {
return d.info.Id
}
func (d *drpcPeer) Info() peer.Info {
return d.info
}
func (d *drpcPeer) Recv() (msg *syncproto.Message, err error) {
msg = &syncproto.Message{}
if err = d.Stream.MsgRecv(msg, Encoding); err != nil {
return
}
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
return
}
func (d *drpcPeer) Send(msg *syncproto.Message) (err error) {
if err = d.Stream.MsgSend(msg, Encoding); err != nil {
return
}
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
return
}

View File

@ -1,4 +1,4 @@
package transport
package secure
import (
"context"

View File

@ -1,4 +1,4 @@
package transport
package secure
import (
"context"

View File

@ -1,11 +1,13 @@
package transport
package secure
import (
"context"
"crypto/rand"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"go.uber.org/zap"
@ -14,9 +16,9 @@ import (
type HandshakeError error
var log = logger.NewNamed("transport")
const CName = "net/secure"
const CName = "transport"
var log = logger.NewNamed(CName)
func New() Service {
return &service{}
@ -33,13 +35,39 @@ type service struct {
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
var pubKey crypto.PubKey
s.key, pubKey, err = crypto.GenerateEd25519Key(rand.Reader)
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList
pkb, err := crypto.ConfigDecodeKey(peerConf.MyId.PrivKey)
if err != nil {
return
}
pubKeyRaw, _ := pubKey.Raw()
log.Info("transport keys generated", zap.Binary("pubKey", pubKeyRaw))
if s.key, err = crypto.UnmarshalEd25519PrivateKey(pkb); err != nil {
return
}
pid, err := peer.Decode(peerConf.MyId.PeerId)
if err != nil {
return
}
var testData = []byte("test data")
sign, err := s.key.Sign(testData)
if err != nil {
return
}
pubKey, err := pid.ExtractPublicKey()
if err != nil {
return
}
ok, err := pubKey.Verify(testData, sign)
if err != nil {
return
}
if !ok {
return fmt.Errorf("peerId and privateKey mismatched")
}
log.Info("secure service init", zap.String("peerId", peerConf.MyId.PeerId))
return nil
}

View File

@ -2,6 +2,63 @@ syntax = "proto3";
package anytype;
option go_package = "/syncproto";
message SyncMessage {
int64 seq = 1;
message Message {
Header header = 1;
bytes data = 2;
}
message Header {
bytes traceId = 1;
uint64 requestId = 2;
uint64 replyId = 3;
MessageType type = 4;
}
enum MessageType {
MessageTypeSystem = 0;
MessageTypeSubscription = 1;
MessageTypeSync = 2;
}
message System {
Handshake handshake = 1;
Ping ping = 2;
Ack ack = 3;
message Handshake {
string protocolVersion = 1;
}
message Ping {
uint64 unixTime = 1;
}
message Ack {
Error error = 2;
}
message Error {
Code code = 1;
string description = 2;
enum Code {
UNKNOWN = 0;
UNSUPPORTED_PROTOCOL_VERSION = 10;
}
}
}
message Subscription {
SubscribeSpace subscribeSpace = 1;
UnsubscribeSpace unsubscribeSpace = 2;
message SubscribeSpace {
string spaceId = 1;
}
message UnsubscribeSpace {
string spaceId = 1;
}
}
message Sync {
string spaceId = 1;
}

File diff suppressed because it is too large Load Diff