From 62c5d8e3b9ffb6240e4ae588ee9d655e022ff404 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 5 Aug 2022 12:07:34 +0300 Subject: [PATCH] net utils wip --- cmd/client/client.go | 169 -- cmd/node/node.go | 14 +- config/config.go | 1 + config/grpc.go | 3 - config/peer.go | 14 + etc/config.1.yml | 16 + etc/config.2.yml | 16 + etc/config.yml | 14 +- etc/x509/cert.pem | 30 - etc/x509/key.pem | 52 - go.mod | 1 + go.sum | 2 + pkg/ocache/ocache.go | 49 +- pkg/ocache/ocache_test.go | 14 + service/example/example.go | 84 + service/net/dialer/dialer.go | 104 + service/net/peer/peer.go | 35 + service/net/pool/message.go | 71 + service/net/pool/peer.go | 28 + service/net/pool/pool.go | 305 +++ service/net/pool/request.go | 45 + service/net/pool/result.go | 53 + service/net/rpc/encoding.go | 18 + .../rpc/server}/drpcserver.go | 59 +- .../drpcserver => net/rpc/server}/util.go | 2 +- .../rpc/server}/util_windows.go | 2 +- service/net/rpc/stream.go | 55 + .../{sync/transport => net/secure}/context.go | 2 +- .../transport => net/secure}/listener.go | 2 +- .../transport.go => net/secure/service.go} | 44 +- syncproto/proto/sync.proto | 63 +- syncproto/sync.pb.go | 2279 ++++++++++++++++- 32 files changed, 3261 insertions(+), 385 deletions(-) delete mode 100644 cmd/client/client.go create mode 100644 config/peer.go create mode 100644 etc/config.1.yml create mode 100644 etc/config.2.yml delete mode 100644 etc/x509/cert.pem delete mode 100644 etc/x509/key.pem create mode 100644 service/example/example.go create mode 100644 service/net/dialer/dialer.go create mode 100644 service/net/peer/peer.go create mode 100644 service/net/pool/message.go create mode 100644 service/net/pool/peer.go create mode 100644 service/net/pool/pool.go create mode 100644 service/net/pool/request.go create mode 100644 service/net/pool/result.go create mode 100644 service/net/rpc/encoding.go rename service/{sync/drpcserver => net/rpc/server}/drpcserver.go (67%) rename service/{sync/drpcserver => net/rpc/server}/util.go (92%) rename service/{sync/drpcserver => net/rpc/server}/util_windows.go (97%) create mode 100644 service/net/rpc/stream.go rename service/{sync/transport => net/secure}/context.go (97%) rename service/{sync/transport => net/secure}/listener.go (98%) rename service/{sync/transport/transport.go => net/secure/service.go} (55%) diff --git a/cmd/client/client.go b/cmd/client/client.go deleted file mode 100644 index 18bb7e99..00000000 --- a/cmd/client/client.go +++ /dev/null @@ -1,169 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/gogo/protobuf/proto" - "github.com/libp2p/go-libp2p-core/sec" - "go.uber.org/zap" - "io" - "net" - "net/http" - _ "net/http/pprof" - "os" - "os/signal" - "storj.io/drpc" - "storj.io/drpc/drpcconn" - "syscall" - "time" -) - -var log = logger.NewNamed("client") - -var ( - flagConfigFile = flag.String("c", "etc/config.yml", "path to config file") - flagVersion = flag.Bool("v", false, "show version and exit") - flagHelp = flag.Bool("h", false, "show help and exit") -) - -func main() { - flag.Parse() - - if *flagVersion { - fmt.Println(app.VersionDescription()) - return - } - if *flagHelp { - flag.PrintDefaults() - return - } - - if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" { - go func() { - http.ListenAndServe(debug, nil) - }() - } - - // create app - ctx := context.Background() - a := new(app.App) - - // open config file - conf, err := config.NewFromFile(*flagConfigFile) - if err != nil { - log.Fatal("can't open config file", zap.Error(err)) - } - - // bootstrap components - a.Register(conf). - Register(transport.New()). - Register(&Client{}) - - // start app - if err := a.Start(ctx); err != nil { - log.Fatal("can't start app", zap.Error(err)) - } - log.Info("app started", zap.String("version", a.Version())) - - // wait exit signal - exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) - sig := <-exit - log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig))) - - // close app - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - if err := a.Close(ctx); err != nil { - log.Fatal("close error", zap.Error(err)) - } else { - log.Info("goodbye!") - } - time.Sleep(time.Second / 3) -} - -type Client struct { - conf config.GrpcServer - tr transport.Service - sc sec.SecureConn -} - -func (c *Client) Init(ctx context.Context, a *app.App) (err error) { - c.tr = a.MustComponent(transport.CName).(transport.Service) - c.conf = a.MustComponent(config.CName).(*config.Config).GrpcServer - return nil -} - -func (c *Client) Name() (name string) { - return "testClient" -} - -func (c *Client) Run(ctx context.Context) (err error) { - tcpConn, err := net.Dial("tcp", c.conf.ListenAddrs[0]) - if err != nil { - return - } - c.sc, err = c.tr.TLSConn(ctx, tcpConn) - if err != nil { - return - } - log.Info("connected with server", zap.String("serverPeer", c.sc.RemotePeer().String()), zap.String("per", c.sc.LocalPeer().String())) - - dconn := drpcconn.New(c.sc) - stream, err := dconn.NewStream(ctx, "", enc{}) - if err != nil { - return - } - go c.handleStream(stream) - return nil -} - -func (c *Client) handleStream(stream drpc.Stream) { - var err error - defer func() { - log.Info("stream closed", zap.Error(err)) - }() - var n int64 = 100000 - for i := int64(0); i < n; i++ { - st := time.Now() - if err = stream.MsgSend(&syncproto.SyncMessage{Seq: i}, enc{}); err != nil { - if err == io.EOF { - return - } - log.Fatal("send error", zap.Error(err)) - } - log.Debug("message sent", zap.Int64("seq", i)) - msg := &syncproto.SyncMessage{} - if err := stream.MsgRecv(msg, enc{}); err != nil { - if err == io.EOF { - return - } - log.Error("msg recv error", zap.Error(err)) - } - log.Debug("message received", zap.Int64("seq", msg.Seq), zap.Duration("dur", time.Since(st))) - time.Sleep(time.Second) - } -} - -func (c *Client) Close(ctx context.Context) (err error) { - if c.sc != nil { - return c.sc.Close() - } - return -} - -type enc struct{} - -func (e enc) Marshal(msg drpc.Message) ([]byte, error) { - return msg.(proto.Marshaler).Marshal() -} - -func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { - return msg.(proto.Unmarshaler).Unmarshal(buf) -} diff --git a/cmd/node/node.go b/cmd/node/node.go index 1446220e..69c5c601 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -7,8 +7,11 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/drpcserver" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/example" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "go.uber.org/zap" "net/http" _ "net/http/pprof" @@ -82,6 +85,9 @@ func main() { } func Bootstrap(a *app.App) { - a.Register(transport.New()). - Register(drpcserver.New()) + a.Register(secure.New()). + Register(server.New()). + Register(dialer.New()). + Register(pool.NewPool()). + Register(&example.Example{}) } diff --git a/config/config.go b/config/config.go index f525e67b..4b610ca9 100644 --- a/config/config.go +++ b/config/config.go @@ -26,6 +26,7 @@ func NewFromFile(path string) (c *Config, err error) { type Config struct { Anytype Anytype `yaml:"anytype"` GrpcServer GrpcServer `yaml:"grpcServer"` + PeerList PeerList `yaml:"peerList"` } func (c *Config) Init(ctx context.Context, a *app.App) (err error) { diff --git a/config/grpc.go b/config/grpc.go index 6a4fc426..01b797fa 100644 --- a/config/grpc.go +++ b/config/grpc.go @@ -2,7 +2,4 @@ package config type GrpcServer struct { ListenAddrs []string `yaml:"listenAddrs"` - TLS bool `yaml:"tls"` - TLSCertFile string `yaml:"tlsCertFile"` - TLSKeyFile string `yaml:"tlsKeyFile"` } diff --git a/config/peer.go b/config/peer.go new file mode 100644 index 00000000..17680e2b --- /dev/null +++ b/config/peer.go @@ -0,0 +1,14 @@ +package config + +type PeerList struct { + MyId struct { + PeerId string `yaml:"peerId"` + PrivKey string `yaml:"privKey"` + } `yaml:"myId"` + Remote []PeerRemote `yaml:"remote"` +} + +type PeerRemote struct { + PeerId string `yaml:"peerId"` + Addr string `yaml:"addr"` +} diff --git a/etc/config.1.yml b/etc/config.1.yml new file mode 100644 index 00000000..093d1ef7 --- /dev/null +++ b/etc/config.1.yml @@ -0,0 +1,16 @@ +anytype: + swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec" + +grpcServer: + listenAddrs: + - "127.0.0.1:4431" + +peerList: + myId: + peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + privKey: "InCGjb55V9+jj2PebUExUuwrpOIBc4hmgk2dSqyk3k4DjmgrdoNVuFe7xCFaFdUVb0RJYj6A+OTp2yXASTmq2w==" + remote: + - peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + addr: "127.0.0.1:4430" + - peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + addr: "127.0.0.1:4432" diff --git a/etc/config.2.yml b/etc/config.2.yml new file mode 100644 index 00000000..701fdc5f --- /dev/null +++ b/etc/config.2.yml @@ -0,0 +1,16 @@ +anytype: + swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec" + +grpcServer: + listenAddrs: + - "127.0.0.1:4432" + +peerList: + myId: + peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + privKey: "jynYZBgtM4elT+6e7M5UERTJCZgUd3hDdmQjCqTpApyJ4h53V6TQan4Ru4OXqz+91rCLjpIVdphhaB0l+TvNsA==" + remote: + - peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + addr: "127.0.0.1:4431" + - peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + addr: "127.0.0.1:4430" diff --git a/etc/config.yml b/etc/config.yml index 245199a3..782b6015 100644 --- a/etc/config.yml +++ b/etc/config.yml @@ -4,7 +4,13 @@ anytype: grpcServer: listenAddrs: - "127.0.0.1:4430" - - "127.0.0.1:4431" - tls: false - tlsKeyFile: "etc/x509/key.pem" - tlsCertFile: "etc/x509/cert.pem" + +peerList: + myId: + peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + privKey: "3BhkWxi0Vzc2AhLtw7LhFHa9Ys4P0wHOlPCW02O6FKJvS8V2nzkJGDlM3vcSLZZ0m9tyYuCKqK4TWUKXgu/FVQ==" + remote: + - peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + addr: "127.0.0.1:4431" + - peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + addr: "127.0.0.1:4432" diff --git a/etc/x509/cert.pem b/etc/x509/cert.pem deleted file mode 100644 index 0ddaa01e..00000000 --- a/etc/x509/cert.pem +++ /dev/null @@ -1,30 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIFJTCCAw2gAwIBAgIUNiBcO8wV6YezcDQ+cLpZe/iXbGYwDQYJKoZIhvcNAQEL -BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIyMDcxMzE5NDgwNFoXDTMyMDcx -MDE5NDgwNFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF -AAOCAg8AMIICCgKCAgEAt+6cVBVkEe9I3CRmPuAqZGnLq48DTXaRH7xz6u4ld7tU -cpDaXw+aURRYKGfYIvcfQav/i+mYUxTQDFbfN8SULis/DckTqeBEaGgUbJJZ6w73 -kkB8BuCiIjh3W9hyUHr+WbdF9wU8K1G6GmjimBJ+qlBBewQm0kzqosVwjQVWarN4 -aEhgiyjnLF9XVYQZRVqGxKzP/MssEU7YjSPPfBEsmi6pAqiDYuZ3+sVlKuDrki0d -r1XOcV/dcSJa4NRazxiWME+GJQ/x7gA2GlC5FbQyhrOs1sNWaBsaWQGl4oHB90T7 -b0GxKOTqI7oCMho+Ajt7eB4zN0fqeCw0nwFyzok3f/GKFk0vLnecFt+gCLyMyS6X -Dqf/DAlvoP41UeFI/rheOwc0UcZbU0HZ2zYrzOYR4eHMHSYKma2DHvIyOYZIV0Uw -tkyOFnoEQGSXK3TMOb7oWyrs4gl+5euPEDKqDuP28xBr4GsFbNq7/7Kd9rKwXb3L -iyy3kot2Vf5QqDFGdyOLDFFKF7MgMrUiEX8onW+fSmjooqUq7ZyLAs8W0uim4moQ -52t7CGUuglaTHYMi2rfoAS5qPXT76jiwu7H351Psca6EIY4V+dIiElvbYryhTNsV -4eDdWGJoZUyGACUhPdfO9l7Wp2+Yy/HGfPWm6mKX8VPHO4llYvwgGzm6Is/pFmUC -AwEAAaNvMG0wHQYDVR0OBBYEFKl07s6kNnGmJN/ASYQTml5UkK0AMB8GA1UdIwQY -MBaAFKl07s6kNnGmJN/ASYQTml5UkK0AMA8GA1UdEwEB/wQFMAMBAf8wGgYDVR0R -BBMwEYIJbG9jYWxob3N0hwR/AAABMA0GCSqGSIb3DQEBCwUAA4ICAQBs5JmRhddd -KuyhkSWd6T/HqAQISgP72ZUAr3gt2j34GLrhDYcvKFZwcoJFCFjG3pVmvJCORVGO -x2TYt2ntsmIyFCZlGE/TpLxbSgsykoUVBnc8ySDnTQTDJr6S7AyWQsznSD6j1/FA -a9E8ZrsyopqIn2eZy9/Asgy1qeJVO4F1kIq+19HUDR2z1rXqVSycOQEJkF84Kgvd -+nDJQ5W3EdamYuDQOhTOeEFfZy1HyM3APhR9JyFHHnZ2D3vsoys/LIWolBJPOq6B -o5JjXgLrA1e12TVXlnTqZ3986vGOyjfut7o2NPO1Se5OeGr6XFwO1nhIJ4gj8OTv -2XuBcslLXI5+6UIsXtFHXAfH7eYErkBCQGiwjYj0V8Kb4M7UZ0seqjK+gMKfvM4Z -hAPlKP2AUNYS7TNyqW3t8SA0c52ASdAezzh/OklCO5vyzxQT4wXTQt5Bub83m9uY -Jnrv6Kg5UPQMrTpo9usJ2zAyj+qkk8KubKOA7grtblmCTvyJFDwyiWZkr0nuvNTV -BsKis/DbJ2hneF+2D/B2pGKxyEP1LCIV/JDTUGX3F8ljTgSihZay/ZZnPUZpbCa0 -czodlAQk4wkGxJWSH6SSkq4dD+JnBQpufBMLa1qShfUn+N1N02yiDPq9XxJytoOR -+vHqqrPS0PcTk1x2Og0xLn6kN+MH0+jRNQ== ------END CERTIFICATE----- diff --git a/etc/x509/key.pem b/etc/x509/key.pem deleted file mode 100644 index 84d64a5e..00000000 --- a/etc/x509/key.pem +++ /dev/null @@ -1,52 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC37pxUFWQR70jc -JGY+4CpkacurjwNNdpEfvHPq7iV3u1RykNpfD5pRFFgoZ9gi9x9Bq/+L6ZhTFNAM -Vt83xJQuKz8NyROp4ERoaBRsklnrDveSQHwG4KIiOHdb2HJQev5Zt0X3BTwrUboa -aOKYEn6qUEF7BCbSTOqixXCNBVZqs3hoSGCLKOcsX1dVhBlFWobErM/8yywRTtiN -I898ESyaLqkCqINi5nf6xWUq4OuSLR2vVc5xX91xIlrg1FrPGJYwT4YlD/HuADYa -ULkVtDKGs6zWw1ZoGxpZAaXigcH3RPtvQbEo5OojugIyGj4CO3t4HjM3R+p4LDSf -AXLOiTd/8YoWTS8ud5wW36AIvIzJLpcOp/8MCW+g/jVR4Uj+uF47BzRRxltTQdnb -NivM5hHh4cwdJgqZrYMe8jI5hkhXRTC2TI4WegRAZJcrdMw5vuhbKuziCX7l648Q -MqoO4/bzEGvgawVs2rv/sp32srBdvcuLLLeSi3ZV/lCoMUZ3I4sMUUoXsyAytSIR -fyidb59KaOiipSrtnIsCzxbS6KbiahDna3sIZS6CVpMdgyLat+gBLmo9dPvqOLC7 -sffnU+xxroQhjhX50iISW9tivKFM2xXh4N1YYmhlTIYAJSE91872Xtanb5jL8cZ8 -9abqYpfxU8c7iWVi/CAbOboiz+kWZQIDAQABAoICAAcytJX6z1N/WonY99Jime4b -kM+qNV4g8317NcseHkPBBkVEg3NAbFDbe7a7F6OIqcW7ajEKx34K3Lh65tnrHMRw -x1MuCRG3F95BInl8Qb4X9BraYLAxNs8hiBRFVHXvVkhjCn8oCoqEdITHkYSThrb6 -FJHwn+dPgMg1c1nleVQMKXxlRrfzdhaPtZ6AYK/M4uTMAYi6V8Nmo0VkvyAzGNyA -0nbq6tdiPxRGiNbEfhuWneiIcl+P+Z3NkyJk1RfxNaF230BQjy55/iQKADBIAMky -O2OdOeKxmtacYKZMXy+i2LcqKjl4OeyDgUX+LmqOct7IycbSTuv8iOOnHhvUlWIg -nFYHx2i/t5mHp0kPjHUzqu40HaRKPpGxAzzSL/W2WNuSFzbZB6SMTGIxG77hGkUh -saoOSLRDL7DhEvvZ5cvhVM0l2mrufak0sUR7K/TooheQ+0A6bU+pZW1kzfwt4ICI -BlGZt6rw7dSEDMButhfMkWPCIowULrtevB9pOULfvwGcpslMNN6KiAl8mjR6OwJB -7AdWCCCC52p240OKe8N4eBoPr6T44LoBsKFpCI2Ztd8IG3LP6AGJueWFLrmPj1J+ -WePs5WX5IGfHSHJondm1BWS1nwkdSZQUaeL/POi0YbaAlgndpZAbpkfxjcLuB7Wd -5iOqiIkrGCjVyFTXBipxAoIBAQDE0amd/TrSPwcdLLkhGZswSgGatlenqw/DLswy -30Vu6NzZ2dRHKk1eX3GApgBtYmkP4n3Jadaxl8OXGCw+mPv9fBHgVrmcwt9CCTNL -uwKvGtEpvXH2LgcmFTEL8CnKoM7ZlXDAlGSPf46pOU8HG8nOqOywkIaoiiLAPZHZ -Y3FMLs7s0UEDYSM0EWHr6/po8VQLSnyN5NDcUhn6dsNgjS4p9W90lHHK24J7dqGh -ye4vWslzB5K47JitYKzpo3+dOUpu1/jo9uwzDsUw7SON7CKtKgldwrzvZGiRBhVW -j+cLHdRdCNz2gF/aI2JwgkW1HC+0X3RQs+H99g+yWNDYbyGVAoIBAQDvPPbql0kz -FzdpInbtBY7Z4V0ZRveeWDMln04qtEuifHzme0K03itACcxAhWAwAV8fCjZJdmkc -nQVJ/0i05IFSXiFAKpuhyA9TkPiwxYwlFpvKDk2lhAQ0yR7ig8wCYs1ZA6PKWwHl -Zd7FJKAjTkHey/KAagA1ya8XTBeIDiIQKriMVU9DdJ+4tEFqZLq9EUd/QHJkrxaV -jwmwcc26oE8XAFdFdZtqqvKjhpO4OjFN8C9TIX70krrzTSN2FTK1Y5IVSanzkYEB -Ovy/gedWzq9evuGRoKKNxYqYUXy/p6DiD36OMHZveGssPCkI4NIdPLVo1L6YXuMS -pLl2PswHmI2RAoIBAAiMCk+gFZPXxNlRfeCgGgsoy5UKYhgX56FUQO3coElGt4CG -Yx+MCLwWbPlnPBGD+ogED+5VOEuSCZ5gIFu/jQHfB2+0lG7oT4WwnJenUWCFS5wC -bBaCvTC8LtsT3Ny9yv3L7Y+PYiaRJYLXyETIwiTFVTH9tLtQ9F1gzxqfpOXoCnhi -Re59o2e5cYUrRD6WbE3pOCt5SlCnCBXGXoms19penC5129MxYSM3baF3AW7xBFqI -c6iwLZkp35htzzbmrALQQjDruCondAzB349kN8VJVArMUCQdOiVCHF8b9K6Y5wX7 -Qo052e/BJZ85KQnKZY/xrT1r8l5y9w+Jp9geS1kCggEBAJxp34XBI7qjkzbJsbeF -yr/o+FVucLa2M7qFTTXeaxTxDzghnptiJiTYQxJsIVdBjk9c/eFJ6a8reinHHmIm -g+a2ZEbvlJFRm7OnNPFeNyKIhZK1h06P4bAhTnAKe3eT4W4xUwUaO0MgN2XtbEWp -BKgF76bFpx2Dn1Y8CaaKlvq6863MmOYhecvpDlvhP7YddgFcwW3Si5F803jo7vj1 -lsATGPvwyIwU+E6xziLE6TdrsYVIgRimVlR8OpMZiO3PC9OfNd5pY07KojUTWY0H -1OC9K/1qaN0IKnUr0cP8dNNYDgYo6UY4FNn2+10yoC09Y94GOhak8xFdYWRN6leN -BgECggEAbS7bbv1gIB8toaAWBXxEnLtAU0Ob8e7uD2JYKZqe6NaC+ZX5NTLn+QSG -Y4SkBORGw+v1BIw7Rdk6jlEPpYWncQigQ6YbhqerL39+lmatXoSbQBcMRsjPP0gF -yuOb48ff/uXJhlnbVBJHXMfOW8LDFXL3bgMS2HpWnr5Buu7zGB4ERHg91+tuczNL -qEa85pyvY974arF/53T7Kmzdd/fx7I6RZeZpBGWKqwuZSJTMYL7V/LyKyfU8CTP1 -nz86BQIF1Cr8UXyOUI33UZMFwIXb62HRDXAfij7Ew33rj803l4cedNluXHSx/kNH -3XcrP5qofkzfgz1calZv6phIGJdHLQ== ------END PRIVATE KEY----- diff --git a/go.mod b/go.mod index 91d42059..dad41ff7 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect lukechampine.com/blake3 v1.1.6 // indirect ) diff --git a/go.sum b/go.sum index 04a8f6c5..12d550cb 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/pkg/ocache/ocache.go b/pkg/ocache/ocache.go index 9f72aa38..b0460ca6 100644 --- a/pkg/ocache/ocache.go +++ b/pkg/ocache/ocache.go @@ -10,9 +10,10 @@ import ( ) var ( - ErrClosed = errors.New("object cache closed") - ErrExists = errors.New("object exists") - ErrTimeout = errors.New("loading object timed out") + ErrClosed = errors.New("object cache closed") + ErrExists = errors.New("object exists") + ErrTimeout = errors.New("loading object timed out") + ErrNotExists = errors.New("object not exists") ) var ( @@ -20,10 +21,6 @@ var ( defaultGC = 20 * time.Second ) -type key int - -const CacheTimeout key = 0 - var log = logger.NewNamed("ocache") type LoadFunc func(ctx context.Context, id string) (value Object, err error) @@ -61,7 +58,9 @@ func New(loadFunc LoadFunc, opts ...Option) OCache { for _, o := range opts { o(c) } - go c.ticker() + if c.ttl != 0 { + go c.ticker() + } return c } @@ -99,6 +98,8 @@ type OCache interface { // Increases the object refs counter on successful // When 'loadFunc' returns a non-nil error, an object will not be stored to cache Get(ctx context.Context, id string) (value Object, err error) + // Pick returns value if it's presents in cache (will not call loadFunc) + Pick(id string) (value Object, err error) // Add adds new object to cache // Returns error when object exists Add(id string, value Object) (err error) @@ -154,28 +155,28 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { e.refCount++ c.mu.Unlock() - timeout := ctx.Value(CacheTimeout) if load { - if timeout != nil { - go c.load(ctx, id, e) - } else { - c.load(ctx, id, e) - } + go c.load(ctx, id, e) } - - if timeout != nil { - duration := timeout.(time.Duration) - select { - case <-e.load: - return e.value, e.loadErr - case <-time.After(duration): - return nil, ErrTimeout - } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.load: } - <-e.load return e.value, e.loadErr } +func (c *oCache) Pick(id string) (value Object, err error) { + c.mu.Lock() + val, ok := c.data[id] + c.mu.Unlock() + if !ok { + return nil, ErrNotExists + } + <-val.load + return val.value, val.loadErr +} + func (c *oCache) load(ctx context.Context, id string, e *entry) { defer close(e.load) value, err := c.loadFunc(ctx, id) diff --git a/pkg/ocache/ocache_test.go b/pkg/ocache/ocache_test.go index eb3a1caf..55bf2e95 100644 --- a/pkg/ocache/ocache_test.go +++ b/pkg/ocache/ocache_test.go @@ -92,6 +92,20 @@ func TestOCache_Get(t *testing.T) { _, err := c.Get(context.TODO(), "id") assert.Equal(t, ErrClosed, err) }) + t.Run("context cancel", func(t *testing.T) { + c := New(func(ctx context.Context, id string) (value Object, err error) { + time.Sleep(time.Second / 3) + return &testObject{ + name: "id", + }, nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.Get(ctx, "id") + assert.Equal(t, context.Canceled, err) + assert.NoError(t, c.Close()) + }) } func TestOCache_GC(t *testing.T) { diff --git a/service/example/example.go b/service/example/example.go new file mode 100644 index 00000000..c3513fd8 --- /dev/null +++ b/service/example/example.go @@ -0,0 +1,84 @@ +package example + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "go.uber.org/zap" + "strings" + "time" +) + +var log = logger.NewNamed("example") + +type Example struct { + pool pool.Pool + peerConf config.PeerList +} + +func (e *Example) Init(ctx context.Context, a *app.App) (err error) { + e.pool = a.MustComponent(pool.CName).(pool.Pool) + e.peerConf = a.MustComponent(config.CName).(*config.Config).PeerList + + // subscribe for sync messages + e.pool.AddHandler(syncproto.MessageType_MessageTypeSync, e.syncHandler) + return +} + +func (e *Example) Name() (name string) { + return "example" +} + +func (e *Example) Run(ctx context.Context) (err error) { + // dial manually with all peers + for _, rp := range e.peerConf.Remote { + if er := e.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil { + log.Info("can't dial to peer", zap.Error(er)) + } else { + log.Info("connected with peer", zap.String("peerId", rp.PeerId)) + } + } + go e.doRequests() + return nil +} + +func (e *Example) syncHandler(ctx context.Context, msg *pool.Message) (err error) { + data := string(msg.Data) // you need unmarshal this bytes + log.Info("msg received", zap.String("peerId", msg.Peer().Id()), zap.String("data", data)) + + if strings.HasPrefix(data, "ack:") { + if err = msg.Ack(); err != nil { + log.Error("ack error", zap.Error(err)) + } + } else if strings.HasPrefix(data, "ackErr:") { + if err = msg.AckError(42, "ack error description"); err != nil { + log.Error("ackErr error", zap.Error(err)) + } + } else if strings.HasPrefix(data, "reply:") { + if err = msg.Reply([]byte("reply for:" + strings.TrimPrefix(data, "reply:"))); err != nil { + log.Error("reply error", zap.Error(err)) + } + } + return nil +} + +func (e *Example) doRequests() { + time.Sleep(time.Second) + ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + st := time.Now() + err := e.pool.SendAndWait(ctx, e.peerConf.Remote[0].PeerId, &syncproto.Message{ + Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, + Data: []byte("ack: something"), + }) + log.Info("sent with ack:", zap.Error(err), zap.Duration("dur", time.Since(st))) +} + +func (e *Example) Close(ctx context.Context) (err error) { + return +} diff --git a/service/net/dialer/dialer.go b/service/net/dialer/dialer.go new file mode 100644 index 00000000..a1cb8a31 --- /dev/null +++ b/service/net/dialer/dialer.go @@ -0,0 +1,104 @@ +package dialer + +import ( + "context" + "errors" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" + "github.com/libp2p/go-libp2p-core/sec" + "go.uber.org/zap" + "net" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "sync" +) + +const CName = "net/dialer" + +var ErrArrdsNotFound = errors.New("addrs for peer not found") + +var log = logger.NewNamed(CName) + +func New() Dialer { + return &dialer{} +} + +type Dialer interface { + Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) + UpdateAddrs(addrs map[string][]string) + app.Component +} + +type dialer struct { + transport secure.Service + peerAddrs map[string][]string + + mu sync.RWMutex +} + +func (d *dialer) Init(ctx context.Context, a *app.App) (err error) { + d.transport = a.MustComponent(secure.CName).(secure.Service) + peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote + d.peerAddrs = map[string][]string{} + for _, rp := range peerConf { + d.peerAddrs[rp.PeerId] = []string{rp.Addr} + } + return +} + +func (d *dialer) Name() (name string) { + return CName +} + +func (d *dialer) UpdateAddrs(addrs map[string][]string) { + d.mu.Lock() + d.peerAddrs = addrs + d.mu.Unlock() +} + +func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) { + d.mu.RLock() + defer d.mu.RUnlock() + addrs, ok := d.peerAddrs[peerId] + if !ok || len(addrs) == 0 { + return nil, ErrArrdsNotFound + } + var ( + stream drpc.Stream + sc sec.SecureConn + ) + for _, addr := range addrs { + stream, sc, err = d.makeStream(ctx, addr) + if err != nil { + log.Info("can't connect to host", zap.String("addr", addr)) + } else { + err = nil + break + } + } + if err != nil { + return + } + return rpc.PeerFromStream(sc, stream, false), nil +} + +func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Stream, sc sec.SecureConn, err error) { + tcpConn, err := net.Dial("tcp", addr) + if err != nil { + return + } + sc, err = d.transport.TLSConn(ctx, tcpConn) + if err != nil { + return + } + log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String())) + stream, err = drpcconn.New(sc).NewStream(ctx, "", rpc.Encoding) + if err != nil { + return + } + return stream, sc, err +} diff --git a/service/net/peer/peer.go b/service/net/peer/peer.go new file mode 100644 index 00000000..331b0982 --- /dev/null +++ b/service/net/peer/peer.go @@ -0,0 +1,35 @@ +package peer + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "time" +) + +type Dir uint + +const ( + // DirInbound indicates peer created connection + DirInbound Dir = iota + // DirOutbound indicates that our host created connection + DirOutbound +) + +type Info struct { + Id string + Dir Dir + LastActiveUnix int64 +} + +func (i Info) LastActive() time.Time { + return time.Unix(i.LastActiveUnix, 0) +} + +type Peer interface { + Id() string + Info() Info + Recv() (*syncproto.Message, error) + Send(msg *syncproto.Message) (err error) + Context() context.Context + Close() error +} diff --git a/service/net/pool/message.go b/service/net/pool/message.go new file mode 100644 index 00000000..bc067a12 --- /dev/null +++ b/service/net/pool/message.go @@ -0,0 +1,71 @@ +package pool + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "gopkg.in/mgo.v2/bson" +) + +type Message struct { + *syncproto.Message + peer peer.Peer +} + +func (m *Message) Peer() peer.Peer { + return m.peer +} + +func (m *Message) Reply(data []byte) (err error) { + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: m.GetHeader().TraceId, + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSync, + }, + Data: data, + } + return m.peer.Send(rep) +} + +func (m *Message) Ack() (err error) { + ack := &syncproto.System{ + Ack: &syncproto.SystemAck{}, + } + data, err := ack.Marshal() + if err != nil { + return + } + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: m.GetHeader().TraceId, + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + }, + Data: data, + } + return m.peer.Send(rep) +} + +func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) { + ack := &syncproto.System{ + Ack: &syncproto.SystemAck{ + Error: &syncproto.SystemError{ + Code: code, + Description: description, + }, + }, + } + data, err := ack.Marshal() + if err != nil { + return + } + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: []byte(bson.NewObjectId()), + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + }, + Data: data, + } + return m.peer.Send(rep) +} diff --git a/service/net/pool/peer.go b/service/net/pool/peer.go new file mode 100644 index 00000000..6a7a96d0 --- /dev/null +++ b/service/net/pool/peer.go @@ -0,0 +1,28 @@ +package pool + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" +) + +type peerEntry struct { + peer peer.Peer + groupIds []string + ready chan struct{} +} + +func (pe *peerEntry) addGroup(groupId string) (ok bool) { + if slice.FindPos(pe.groupIds, groupId) != -1 { + return false + } + pe.groupIds = append(pe.groupIds, groupId) + return true +} + +func (pe *peerEntry) removeGroup(groupId string) (ok bool) { + if slice.FindPos(pe.groupIds, groupId) == -1 { + return false + } + pe.groupIds = slice.Remove(pe.groupIds, groupId) + return true +} diff --git a/service/net/pool/pool.go b/service/net/pool/pool.go new file mode 100644 index 00000000..21779ca0 --- /dev/null +++ b/service/net/pool/pool.go @@ -0,0 +1,305 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "go.uber.org/zap" + "sync" + "sync/atomic" +) + +const CName = "sync/peerPool" + +var log = logger.NewNamed("peerPool") + +var ( + ErrPoolClosed = errors.New("peer pool is closed") + ErrPeerNotFound = errors.New("peer not found") +) + +func NewPool() Pool { + return &pool{closed: true} +} + +type Handler func(ctx context.Context, msg *Message) (err error) + +type Pool interface { + DialAndAddPeer(ctx context.Context, id string) (err error) + AddAndReadPeer(peer peer.Peer) (err error) + AddHandler(msgType syncproto.MessageType, h Handler) + AddPeerIdToGroup(peerId, groupId string) (err error) + RemovePeerIdFromGroup(peerId, groupId string) (err error) + + SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) + Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) + + app.ComponentRunnable +} + +type pool struct { + peersById map[string]*peerEntry + waiters waiters + handlers map[syncproto.MessageType][]Handler + peersIdsByGroup map[string][]string + + dialer dialer.Dialer + + closed bool + mu sync.RWMutex + wg *sync.WaitGroup +} + +func (p *pool) Init(ctx context.Context, a *app.App) (err error) { + p.peersById = map[string]*peerEntry{} + p.handlers = map[syncproto.MessageType][]Handler{} + p.peersIdsByGroup = map[string][]string{} + p.waiters = waiters{waiters: map[uint64]*waiter{}} + p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer) + p.wg = &sync.WaitGroup{} + return nil +} + +func (p *pool) Name() (name string) { + return CName +} + +func (p *pool) Run(ctx context.Context) (err error) { + p.closed = false + return nil +} + +func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) { + p.mu.Lock() + defer p.mu.Unlock() + if !p.closed { + // unable to add handler after Run + return + } + p.handlers[msgType] = append(p.handlers[msgType], h) +} + +func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return ErrPoolClosed + } + if _, ok := p.peersById[peerId]; ok { + return nil + } + peer, err := p.dialer.Dial(ctx, peerId) + if err != nil { + return + } + p.peersById[peer.Id()] = &peerEntry{ + peer: peer, + } + p.wg.Add(1) + go p.readPeerLoop(peer) + return nil +} + +func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return ErrPoolClosed + } + p.peersById[peer.Id()] = &peerEntry{ + peer: peer, + } + p.wg.Add(1) + p.mu.Unlock() + return p.readPeerLoop(peer) +} + +func (p *pool) AddPeerIdToGroup(peerId, groupId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + peer, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + if slice.FindPos(peer.groupIds, groupId) != -1 { + return nil + } + peer.addGroup(groupId) + p.peersIdsByGroup[groupId] = append(p.peersIdsByGroup[groupId], peerId) + return +} + +func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + peer, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + if slice.FindPos(peer.groupIds, groupId) == -1 { + return nil + } + peer.removeGroup(groupId) + p.peersIdsByGroup[groupId] = slice.Remove(p.peersIdsByGroup[groupId], peerId) + return +} + +func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) { + p.mu.RLock() + peer := p.peersById[peerId] + p.mu.RUnlock() + if peer == nil { + return ErrPeerNotFound + } + repId := p.waiters.NewReplyId() + msg.GetHeader().RequestId = repId + ch := make(chan Reply, 1) + p.waiters.Add(repId, &waiter{ch: ch}) + defer p.waiters.Remove(repId) + if err = peer.peer.Send(msg); err != nil { + return + } + select { + case rep := <-ch: + if rep.Error != nil { + return rep.Error + } + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) { + //TODO implement me + panic("implement me") +} + +func (p *pool) readPeerLoop(peer peer.Peer) (err error) { + defer p.wg.Done() + for { + msg, err := peer.Recv() + if err != nil { + log.Debug("peer receive error", zap.Error(err), zap.String("peerId", peer.Id())) + break + } + p.handleMessage(peer, msg) + } + if err = p.removePeer(peer.Id()); err != nil { + log.Error("remove peer error", zap.String("peerId", peer.Id())) + } + return +} + +func (p *pool) removePeer(peerId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + _, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + delete(p.peersById, peerId) + return +} + +func (p *pool) handleMessage(peer peer.Peer, msg *syncproto.Message) { + replyId := msg.GetHeader().GetReplyId() + if replyId != 0 { + if !p.waiters.Send(replyId, Reply{ + PeerInfo: peer.Info(), + Message: &Message{ + Message: msg, + peer: peer, + }, + }) { + log.Debug("received reply with unknown (or expired) replyId", zap.Uint64("replyId", replyId)) + } + return + } + handlers := p.handlers[msg.GetHeader().GetType()] + if len(handlers) == 0 { + return + } + + message := &Message{Message: msg, peer: peer} + + for _, h := range handlers { + if err := h(peer.Context(), message); err != nil { + log.Error("handle message error", zap.Error(err)) + } + } +} + +func (p *pool) Close(ctx context.Context) (err error) { + p.mu.Lock() + for _, peer := range p.peersById { + peer.peer.Close() + } + wg := p.wg + p.mu.Unlock() + if wg != nil { + wg.Wait() + } + return nil +} + +type waiter struct { + sent int + ch chan<- Reply +} + +type waiters struct { + waiters map[uint64]*waiter + replySeq uint64 + mu sync.Mutex +} + +func (w waiters) Send(replyId uint64, r Reply) (ok bool) { + w.mu.Lock() + wait := w.waiters[replyId] + if wait == nil { + w.mu.Unlock() + return false + } + wait.sent++ + var lastMessage = wait.sent == cap(wait.ch) + if lastMessage { + delete(w.waiters, replyId) + } + w.mu.Unlock() + wait.ch <- r + if lastMessage { + close(wait.ch) + } + return true +} + +func (w waiters) Add(replyId uint64, wait *waiter) { + w.mu.Lock() + w.waiters[replyId] = wait + w.mu.Unlock() +} + +func (w waiters) Remove(id uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + if _, ok := w.waiters[id]; ok { + delete(w.waiters, id) + return nil + } + return fmt.Errorf("waiter not found") +} + +func (w waiters) NewReplyId() uint64 { + res := atomic.AddUint64(&w.replySeq, 1) + if res == 0 { + return w.NewReplyId() + } + return res +} diff --git a/service/net/pool/request.go b/service/net/pool/request.go new file mode 100644 index 00000000..8584f0dd --- /dev/null +++ b/service/net/pool/request.go @@ -0,0 +1,45 @@ +package pool + +import "context" + +// 1. message for one peerId with ack +// pool.SendAndWait(ctx context,.C +// 2. message for many peers without ack (or group) + +type Request struct { + groupId string + oneOf []string + all []string + tryDial bool + needReply bool + pool *pool +} + +func (r *Request) GroupId(groupId string) *Request { + r.groupId = groupId + return r +} + +func (r *Request) All(peerIds ...string) *Request { + r.all = peerIds + return r +} + +func (r *Request) OneOf(peerIds ...string) *Request { + r.oneOf = peerIds + return r +} + +func (r *Request) TryDial(is bool) *Request { + r.tryDial = is + return r +} + +func (r *Request) NeedReply(is bool) *Request { + r.needReply = is + return r +} + +func (r *Request) Exec(ctx context.Context, msg *Message) *Results { + return nil +} diff --git a/service/net/pool/result.go b/service/net/pool/result.go new file mode 100644 index 00000000..94f1ac93 --- /dev/null +++ b/service/net/pool/result.go @@ -0,0 +1,53 @@ +package pool + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" +) + +// Results of request collects replies and errors +// Must be closed after usage r.Close() +type Results struct { + ctx context.Context + cancel func() + waiterId uint64 + ch chan Reply + pool *pool +} + +// Iterate iterates over replies +// if callback will return a non-nil error then iteration stops +func (r *Results) Iterate(callback func(r Reply) (err error)) (err error) { + if r.ctx == nil || r.ch == nil { + return fmt.Errorf("results not initialized") + } + for { + select { + case <-r.ctx.Done(): + return r.ctx.Err() + case m, ok := <-r.ch: + if ok { + if err = callback(m); err != nil { + return err + } + } else { + return + } + } + } +} + +// Close cancels iteration and unregister reply handler in the pool +// Required to call to avoid memory leaks +func (r *Results) Close() (err error) { + r.cancel() + return r.pool.waiters.Remove(r.waiterId) +} + +// Reply presents the result of request executing can be error or result message +type Reply struct { + PeerInfo peer.Info + Error error + Message *Message +} diff --git a/service/net/rpc/encoding.go b/service/net/rpc/encoding.go new file mode 100644 index 00000000..eb983b9d --- /dev/null +++ b/service/net/rpc/encoding.go @@ -0,0 +1,18 @@ +package rpc + +import ( + "github.com/gogo/protobuf/proto" + "storj.io/drpc" +) + +var Encoding = enc{} + +type enc struct{} + +func (e enc) Marshal(msg drpc.Message) ([]byte, error) { + return msg.(proto.Marshaler).Marshal() +} + +func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { + return msg.(proto.Unmarshaler).Unmarshal(buf) +} diff --git a/service/sync/drpcserver/drpcserver.go b/service/net/rpc/server/drpcserver.go similarity index 67% rename from service/sync/drpcserver/drpcserver.go rename to service/net/rpc/server/drpcserver.go index e3fe7bee..dcdf2156 100644 --- a/service/sync/drpcserver/drpcserver.go +++ b/service/net/rpc/server/drpcserver.go @@ -1,15 +1,14 @@ -package drpcserver +package server import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/gogo/protobuf/proto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "go.uber.org/zap" - "io" "net" "storj.io/drpc" "storj.io/drpc/drpcserver" @@ -17,9 +16,9 @@ import ( "time" ) -var log = logger.NewNamed("drpcserver") +const CName = "net/drpcserver" -const CName = "drpcserver" +var log = logger.NewNamed(CName) func New() DRPCServer { return &drpcServer{} @@ -32,14 +31,16 @@ type DRPCServer interface { type drpcServer struct { config config.GrpcServer drpcServer *drpcserver.Server - transport transport.Service - listeners []transport.ContextListener + transport secure.Service + listeners []secure.ContextListener + pool pool.Pool cancel func() } func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) { s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer - s.transport = a.MustComponent(transport.CName).(transport.Service) + s.transport = a.MustComponent(secure.CName).(secure.Service) + s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -61,7 +62,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { return } -func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) { +func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) { l := log.With(zap.String("localAddr", lis.Addr().String())) l.Info("drpc listener started") defer func() { @@ -85,7 +86,7 @@ func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) { } continue } - if _, ok := err.(transport.HandshakeError); ok { + if _, ok := err.(secure.HandshakeError); ok { l.Warn("listener handshake error", zap.Error(err)) continue } @@ -108,30 +109,14 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) { } } -func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) { +func (s *drpcServer) HandleRPC(stream drpc.Stream, _ string) (err error) { ctx := stream.Context() - sc, err := transport.CtxSecureConn(ctx) + sc, err := secure.CtxSecureConn(ctx) if err != nil { return } - l := log.With(zap.String("peer", sc.RemotePeer().String())) - l.Info("stream opened") - defer func() { - l.Info("stream closed", zap.Error(err)) - }() - for { - msg := &syncproto.SyncMessage{} - if err = stream.MsgRecv(msg, enc{}); err != nil { - if err == io.EOF { - return - } - } - //log.Debug("receive msg", zap.Int("seq", int(msg.Seq))) - if err = stream.MsgSend(msg, enc{}); err != nil { - return - } - } - return nil + log.With(zap.String("peer", sc.RemotePeer().String())).Debug("stream opened") + return s.pool.AddAndReadPeer(rpc.PeerFromStream(sc, stream, true)) } func (s *drpcServer) Close(ctx context.Context) (err error) { @@ -145,13 +130,3 @@ func (s *drpcServer) Close(ctx context.Context) (err error) { } return } - -type enc struct{} - -func (e enc) Marshal(msg drpc.Message) ([]byte, error) { - return msg.(proto.Marshaler).Marshal() -} - -func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { - return msg.(proto.Unmarshaler).Unmarshal(buf) -} diff --git a/service/sync/drpcserver/util.go b/service/net/rpc/server/util.go similarity index 92% rename from service/sync/drpcserver/util.go rename to service/net/rpc/server/util.go index d84358eb..5852288a 100644 --- a/service/sync/drpcserver/util.go +++ b/service/net/rpc/server/util.go @@ -1,6 +1,6 @@ //go:build !windows -package drpcserver +package server import ( "errors" diff --git a/service/sync/drpcserver/util_windows.go b/service/net/rpc/server/util_windows.go similarity index 97% rename from service/sync/drpcserver/util_windows.go rename to service/net/rpc/server/util_windows.go index 3acba434..efef2915 100644 --- a/service/sync/drpcserver/util_windows.go +++ b/service/net/rpc/server/util_windows.go @@ -1,6 +1,6 @@ //go:build windows -package drpcserver +package server import ( "errors" diff --git a/service/net/rpc/stream.go b/service/net/rpc/stream.go new file mode 100644 index 00000000..c05691da --- /dev/null +++ b/service/net/rpc/stream.go @@ -0,0 +1,55 @@ +package rpc + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "github.com/libp2p/go-libp2p-core/sec" + "storj.io/drpc" + "sync/atomic" + "time" +) + +func PeerFromStream(sc sec.SecureConn, stream drpc.Stream, incoming bool) peer.Peer { + dp := &drpcPeer{ + sc: sc, + Stream: stream, + } + dp.info.Id = sc.RemotePeer().String() + if incoming { + dp.info.Dir = peer.DirInbound + } else { + dp.info.Dir = peer.DirOutbound + } + return dp +} + +type drpcPeer struct { + sc sec.SecureConn + info peer.Info + drpc.Stream +} + +func (d *drpcPeer) Id() string { + return d.info.Id +} + +func (d *drpcPeer) Info() peer.Info { + return d.info +} + +func (d *drpcPeer) Recv() (msg *syncproto.Message, err error) { + msg = &syncproto.Message{} + if err = d.Stream.MsgRecv(msg, Encoding); err != nil { + return + } + atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix()) + return +} + +func (d *drpcPeer) Send(msg *syncproto.Message) (err error) { + if err = d.Stream.MsgSend(msg, Encoding); err != nil { + return + } + atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix()) + return +} diff --git a/service/sync/transport/context.go b/service/net/secure/context.go similarity index 97% rename from service/sync/transport/context.go rename to service/net/secure/context.go index f30bab39..c38da959 100644 --- a/service/sync/transport/context.go +++ b/service/net/secure/context.go @@ -1,4 +1,4 @@ -package transport +package secure import ( "context" diff --git a/service/sync/transport/listener.go b/service/net/secure/listener.go similarity index 98% rename from service/sync/transport/listener.go rename to service/net/secure/listener.go index 97e9e6f4..a0c61d56 100644 --- a/service/sync/transport/listener.go +++ b/service/net/secure/listener.go @@ -1,4 +1,4 @@ -package transport +package secure import ( "context" diff --git a/service/sync/transport/transport.go b/service/net/secure/service.go similarity index 55% rename from service/sync/transport/transport.go rename to service/net/secure/service.go index 5b7e952a..b61c2f86 100644 --- a/service/sync/transport/transport.go +++ b/service/net/secure/service.go @@ -1,11 +1,13 @@ -package transport +package secure import ( "context" - "crypto/rand" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/sec" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "go.uber.org/zap" @@ -14,9 +16,9 @@ import ( type HandshakeError error -var log = logger.NewNamed("transport") +const CName = "net/secure" -const CName = "transport" +var log = logger.NewNamed(CName) func New() Service { return &service{} @@ -33,13 +35,39 @@ type service struct { } func (s *service) Init(ctx context.Context, a *app.App) (err error) { - var pubKey crypto.PubKey - s.key, pubKey, err = crypto.GenerateEd25519Key(rand.Reader) + peerConf := a.MustComponent(config.CName).(*config.Config).PeerList + pkb, err := crypto.ConfigDecodeKey(peerConf.MyId.PrivKey) if err != nil { return } - pubKeyRaw, _ := pubKey.Raw() - log.Info("transport keys generated", zap.Binary("pubKey", pubKeyRaw)) + if s.key, err = crypto.UnmarshalEd25519PrivateKey(pkb); err != nil { + return + } + + pid, err := peer.Decode(peerConf.MyId.PeerId) + if err != nil { + return + } + + var testData = []byte("test data") + sign, err := s.key.Sign(testData) + if err != nil { + return + } + pubKey, err := pid.ExtractPublicKey() + if err != nil { + return + } + ok, err := pubKey.Verify(testData, sign) + if err != nil { + return + } + if !ok { + return fmt.Errorf("peerId and privateKey mismatched") + } + + log.Info("secure service init", zap.String("peerId", peerConf.MyId.PeerId)) + return nil } diff --git a/syncproto/proto/sync.proto b/syncproto/proto/sync.proto index 51b2f3bb..d633afd3 100644 --- a/syncproto/proto/sync.proto +++ b/syncproto/proto/sync.proto @@ -2,6 +2,63 @@ syntax = "proto3"; package anytype; option go_package = "/syncproto"; -message SyncMessage { - int64 seq = 1; -} \ No newline at end of file +message Message { + Header header = 1; + bytes data = 2; +} + +message Header { + bytes traceId = 1; + uint64 requestId = 2; + uint64 replyId = 3; + MessageType type = 4; +} + +enum MessageType { + MessageTypeSystem = 0; + MessageTypeSubscription = 1; + MessageTypeSync = 2; +} + + +message System { + Handshake handshake = 1; + Ping ping = 2; + Ack ack = 3; + + message Handshake { + string protocolVersion = 1; + } + message Ping { + uint64 unixTime = 1; + } + message Ack { + Error error = 2; + } + message Error { + Code code = 1; + string description = 2; + + enum Code { + UNKNOWN = 0; + UNSUPPORTED_PROTOCOL_VERSION = 10; + } + } +} + +message Subscription { + SubscribeSpace subscribeSpace = 1; + UnsubscribeSpace unsubscribeSpace = 2; + + message SubscribeSpace { + string spaceId = 1; + } + message UnsubscribeSpace { + string spaceId = 1; + } +} + +message Sync { + string spaceId = 1; + +} diff --git a/syncproto/sync.pb.go b/syncproto/sync.pb.go index aa3947c8..d673a859 100644 --- a/syncproto/sync.pb.go +++ b/syncproto/sync.pb.go @@ -22,22 +22,76 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type SyncMessage struct { - Seq int64 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"` +type MessageType int32 + +const ( + MessageType_MessageTypeSystem MessageType = 0 + MessageType_MessageTypeSubscription MessageType = 1 + MessageType_MessageTypeSync MessageType = 2 +) + +var MessageType_name = map[int32]string{ + 0: "MessageTypeSystem", + 1: "MessageTypeSubscription", + 2: "MessageTypeSync", } -func (m *SyncMessage) Reset() { *m = SyncMessage{} } -func (m *SyncMessage) String() string { return proto.CompactTextString(m) } -func (*SyncMessage) ProtoMessage() {} -func (*SyncMessage) Descriptor() ([]byte, []int) { +var MessageType_value = map[string]int32{ + "MessageTypeSystem": 0, + "MessageTypeSubscription": 1, + "MessageTypeSync": 2, +} + +func (x MessageType) String() string { + return proto.EnumName(MessageType_name, int32(x)) +} + +func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_4b28dfdd48a89166, []int{0} } -func (m *SyncMessage) XXX_Unmarshal(b []byte) error { + +type SystemErrorCode int32 + +const ( + SystemError_UNKNOWN SystemErrorCode = 0 + SystemError_UNSUPPORTED_PROTOCOL_VERSION SystemErrorCode = 10 +) + +var SystemErrorCode_name = map[int32]string{ + 0: "UNKNOWN", + 10: "UNSUPPORTED_PROTOCOL_VERSION", +} + +var SystemErrorCode_value = map[string]int32{ + "UNKNOWN": 0, + "UNSUPPORTED_PROTOCOL_VERSION": 10, +} + +func (x SystemErrorCode) String() string { + return proto.EnumName(SystemErrorCode_name, int32(x)) +} + +func (SystemErrorCode) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 3, 0} +} + +type Message struct { + Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{0} +} +func (m *Message) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *SyncMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_SyncMessage.Marshal(b, m, deterministic) + return xxx_messageInfo_Message.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -47,44 +101,588 @@ func (m *SyncMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (m *SyncMessage) XXX_Merge(src proto.Message) { - xxx_messageInfo_SyncMessage.Merge(m, src) +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) } -func (m *SyncMessage) XXX_Size() int { +func (m *Message) XXX_Size() int { return m.Size() } -func (m *SyncMessage) XXX_DiscardUnknown() { - xxx_messageInfo_SyncMessage.DiscardUnknown(m) +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) } -var xxx_messageInfo_SyncMessage proto.InternalMessageInfo +var xxx_messageInfo_Message proto.InternalMessageInfo -func (m *SyncMessage) GetSeq() int64 { +func (m *Message) GetHeader() *Header { if m != nil { - return m.Seq + return m.Header + } + return nil +} + +func (m *Message) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +type Header struct { + TraceId []byte `protobuf:"bytes,1,opt,name=traceId,proto3" json:"traceId,omitempty"` + RequestId uint64 `protobuf:"varint,2,opt,name=requestId,proto3" json:"requestId,omitempty"` + ReplyId uint64 `protobuf:"varint,3,opt,name=replyId,proto3" json:"replyId,omitempty"` + Type MessageType `protobuf:"varint,4,opt,name=type,proto3,enum=anytype.MessageType" json:"type,omitempty"` +} + +func (m *Header) Reset() { *m = Header{} } +func (m *Header) String() string { return proto.CompactTextString(m) } +func (*Header) ProtoMessage() {} +func (*Header) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{1} +} +func (m *Header) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Header.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Header) XXX_Merge(src proto.Message) { + xxx_messageInfo_Header.Merge(m, src) +} +func (m *Header) XXX_Size() int { + return m.Size() +} +func (m *Header) XXX_DiscardUnknown() { + xxx_messageInfo_Header.DiscardUnknown(m) +} + +var xxx_messageInfo_Header proto.InternalMessageInfo + +func (m *Header) GetTraceId() []byte { + if m != nil { + return m.TraceId + } + return nil +} + +func (m *Header) GetRequestId() uint64 { + if m != nil { + return m.RequestId } return 0 } +func (m *Header) GetReplyId() uint64 { + if m != nil { + return m.ReplyId + } + return 0 +} + +func (m *Header) GetType() MessageType { + if m != nil { + return m.Type + } + return MessageType_MessageTypeSystem +} + +type System struct { + Handshake *SystemHandshake `protobuf:"bytes,1,opt,name=handshake,proto3" json:"handshake,omitempty"` + Ping *SystemPing `protobuf:"bytes,2,opt,name=ping,proto3" json:"ping,omitempty"` + Ack *SystemAck `protobuf:"bytes,3,opt,name=ack,proto3" json:"ack,omitempty"` +} + +func (m *System) Reset() { *m = System{} } +func (m *System) String() string { return proto.CompactTextString(m) } +func (*System) ProtoMessage() {} +func (*System) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2} +} +func (m *System) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *System) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_System.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *System) XXX_Merge(src proto.Message) { + xxx_messageInfo_System.Merge(m, src) +} +func (m *System) XXX_Size() int { + return m.Size() +} +func (m *System) XXX_DiscardUnknown() { + xxx_messageInfo_System.DiscardUnknown(m) +} + +var xxx_messageInfo_System proto.InternalMessageInfo + +func (m *System) GetHandshake() *SystemHandshake { + if m != nil { + return m.Handshake + } + return nil +} + +func (m *System) GetPing() *SystemPing { + if m != nil { + return m.Ping + } + return nil +} + +func (m *System) GetAck() *SystemAck { + if m != nil { + return m.Ack + } + return nil +} + +type SystemHandshake struct { + ProtocolVersion string `protobuf:"bytes,1,opt,name=protocolVersion,proto3" json:"protocolVersion,omitempty"` +} + +func (m *SystemHandshake) Reset() { *m = SystemHandshake{} } +func (m *SystemHandshake) String() string { return proto.CompactTextString(m) } +func (*SystemHandshake) ProtoMessage() {} +func (*SystemHandshake) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 0} +} +func (m *SystemHandshake) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemHandshake) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemHandshake.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemHandshake) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemHandshake.Merge(m, src) +} +func (m *SystemHandshake) XXX_Size() int { + return m.Size() +} +func (m *SystemHandshake) XXX_DiscardUnknown() { + xxx_messageInfo_SystemHandshake.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemHandshake proto.InternalMessageInfo + +func (m *SystemHandshake) GetProtocolVersion() string { + if m != nil { + return m.ProtocolVersion + } + return "" +} + +type SystemPing struct { + UnixTime uint64 `protobuf:"varint,1,opt,name=unixTime,proto3" json:"unixTime,omitempty"` +} + +func (m *SystemPing) Reset() { *m = SystemPing{} } +func (m *SystemPing) String() string { return proto.CompactTextString(m) } +func (*SystemPing) ProtoMessage() {} +func (*SystemPing) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 1} +} +func (m *SystemPing) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemPing) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemPing.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemPing) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemPing.Merge(m, src) +} +func (m *SystemPing) XXX_Size() int { + return m.Size() +} +func (m *SystemPing) XXX_DiscardUnknown() { + xxx_messageInfo_SystemPing.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemPing proto.InternalMessageInfo + +func (m *SystemPing) GetUnixTime() uint64 { + if m != nil { + return m.UnixTime + } + return 0 +} + +type SystemAck struct { + Error *SystemError `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` +} + +func (m *SystemAck) Reset() { *m = SystemAck{} } +func (m *SystemAck) String() string { return proto.CompactTextString(m) } +func (*SystemAck) ProtoMessage() {} +func (*SystemAck) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 2} +} +func (m *SystemAck) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemAck.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemAck) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemAck.Merge(m, src) +} +func (m *SystemAck) XXX_Size() int { + return m.Size() +} +func (m *SystemAck) XXX_DiscardUnknown() { + xxx_messageInfo_SystemAck.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemAck proto.InternalMessageInfo + +func (m *SystemAck) GetError() *SystemError { + if m != nil { + return m.Error + } + return nil +} + +type SystemError struct { + Code SystemErrorCode `protobuf:"varint,1,opt,name=code,proto3,enum=anytype.SystemErrorCode" json:"code,omitempty"` + Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"` +} + +func (m *SystemError) Reset() { *m = SystemError{} } +func (m *SystemError) String() string { return proto.CompactTextString(m) } +func (*SystemError) ProtoMessage() {} +func (*SystemError) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 3} +} +func (m *SystemError) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemError.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemError) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemError.Merge(m, src) +} +func (m *SystemError) XXX_Size() int { + return m.Size() +} +func (m *SystemError) XXX_DiscardUnknown() { + xxx_messageInfo_SystemError.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemError proto.InternalMessageInfo + +func (m *SystemError) GetCode() SystemErrorCode { + if m != nil { + return m.Code + } + return SystemError_UNKNOWN +} + +func (m *SystemError) GetDescription() string { + if m != nil { + return m.Description + } + return "" +} + +type Subscription struct { + SubscribeSpace *SubscriptionSubscribeSpace `protobuf:"bytes,1,opt,name=subscribeSpace,proto3" json:"subscribeSpace,omitempty"` + UnsubscribeSpace *SubscriptionUnsubscribeSpace `protobuf:"bytes,2,opt,name=unsubscribeSpace,proto3" json:"unsubscribeSpace,omitempty"` +} + +func (m *Subscription) Reset() { *m = Subscription{} } +func (m *Subscription) String() string { return proto.CompactTextString(m) } +func (*Subscription) ProtoMessage() {} +func (*Subscription) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3} +} +func (m *Subscription) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Subscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Subscription.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Subscription) XXX_Merge(src proto.Message) { + xxx_messageInfo_Subscription.Merge(m, src) +} +func (m *Subscription) XXX_Size() int { + return m.Size() +} +func (m *Subscription) XXX_DiscardUnknown() { + xxx_messageInfo_Subscription.DiscardUnknown(m) +} + +var xxx_messageInfo_Subscription proto.InternalMessageInfo + +func (m *Subscription) GetSubscribeSpace() *SubscriptionSubscribeSpace { + if m != nil { + return m.SubscribeSpace + } + return nil +} + +func (m *Subscription) GetUnsubscribeSpace() *SubscriptionUnsubscribeSpace { + if m != nil { + return m.UnsubscribeSpace + } + return nil +} + +type SubscriptionSubscribeSpace struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *SubscriptionSubscribeSpace) Reset() { *m = SubscriptionSubscribeSpace{} } +func (m *SubscriptionSubscribeSpace) String() string { return proto.CompactTextString(m) } +func (*SubscriptionSubscribeSpace) ProtoMessage() {} +func (*SubscriptionSubscribeSpace) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3, 0} +} +func (m *SubscriptionSubscribeSpace) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscriptionSubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscriptionSubscribeSpace.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscriptionSubscribeSpace) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscriptionSubscribeSpace.Merge(m, src) +} +func (m *SubscriptionSubscribeSpace) XXX_Size() int { + return m.Size() +} +func (m *SubscriptionSubscribeSpace) XXX_DiscardUnknown() { + xxx_messageInfo_SubscriptionSubscribeSpace.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscriptionSubscribeSpace proto.InternalMessageInfo + +func (m *SubscriptionSubscribeSpace) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +type SubscriptionUnsubscribeSpace struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *SubscriptionUnsubscribeSpace) Reset() { *m = SubscriptionUnsubscribeSpace{} } +func (m *SubscriptionUnsubscribeSpace) String() string { return proto.CompactTextString(m) } +func (*SubscriptionUnsubscribeSpace) ProtoMessage() {} +func (*SubscriptionUnsubscribeSpace) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3, 1} +} +func (m *SubscriptionUnsubscribeSpace) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscriptionUnsubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscriptionUnsubscribeSpace.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscriptionUnsubscribeSpace) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscriptionUnsubscribeSpace.Merge(m, src) +} +func (m *SubscriptionUnsubscribeSpace) XXX_Size() int { + return m.Size() +} +func (m *SubscriptionUnsubscribeSpace) XXX_DiscardUnknown() { + xxx_messageInfo_SubscriptionUnsubscribeSpace.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscriptionUnsubscribeSpace proto.InternalMessageInfo + +func (m *SubscriptionUnsubscribeSpace) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +type Sync struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *Sync) Reset() { *m = Sync{} } +func (m *Sync) String() string { return proto.CompactTextString(m) } +func (*Sync) ProtoMessage() {} +func (*Sync) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{4} +} +func (m *Sync) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Sync) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Sync.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Sync) XXX_Merge(src proto.Message) { + xxx_messageInfo_Sync.Merge(m, src) +} +func (m *Sync) XXX_Size() int { + return m.Size() +} +func (m *Sync) XXX_DiscardUnknown() { + xxx_messageInfo_Sync.DiscardUnknown(m) +} + +var xxx_messageInfo_Sync proto.InternalMessageInfo + +func (m *Sync) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + func init() { - proto.RegisterType((*SyncMessage)(nil), "anytype.SyncMessage") + proto.RegisterEnum("anytype.MessageType", MessageType_name, MessageType_value) + proto.RegisterEnum("anytype.SystemErrorCode", SystemErrorCode_name, SystemErrorCode_value) + proto.RegisterType((*Message)(nil), "anytype.Message") + proto.RegisterType((*Header)(nil), "anytype.Header") + proto.RegisterType((*System)(nil), "anytype.System") + proto.RegisterType((*SystemHandshake)(nil), "anytype.System.Handshake") + proto.RegisterType((*SystemPing)(nil), "anytype.System.Ping") + proto.RegisterType((*SystemAck)(nil), "anytype.System.Ack") + proto.RegisterType((*SystemError)(nil), "anytype.System.Error") + proto.RegisterType((*Subscription)(nil), "anytype.Subscription") + proto.RegisterType((*SubscriptionSubscribeSpace)(nil), "anytype.Subscription.SubscribeSpace") + proto.RegisterType((*SubscriptionUnsubscribeSpace)(nil), "anytype.Subscription.UnsubscribeSpace") + proto.RegisterType((*Sync)(nil), "anytype.Sync") } func init() { proto.RegisterFile("syncproto/proto/sync.proto", fileDescriptor_4b28dfdd48a89166) } var fileDescriptor_4b28dfdd48a89166 = []byte{ - // 119 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2a, 0xae, 0xcc, 0x4b, - 0x2e, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x87, 0x90, 0x20, 0xbe, 0x1e, 0x98, 0x29, 0xc4, 0x9e, 0x98, - 0x57, 0x59, 0x52, 0x59, 0x90, 0xaa, 0x24, 0xcf, 0xc5, 0x1d, 0x5c, 0x99, 0x97, 0xec, 0x9b, 0x5a, - 0x5c, 0x9c, 0x98, 0x9e, 0x2a, 0x24, 0xc0, 0xc5, 0x5c, 0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8, - 0xc1, 0x1c, 0x04, 0x62, 0x3a, 0xa9, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, - 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, - 0x14, 0x97, 0x3e, 0xdc, 0x82, 0x24, 0x36, 0x30, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x7e, - 0xbc, 0x46, 0xd2, 0x74, 0x00, 0x00, 0x00, + // 577 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x94, 0x51, 0x6f, 0xd2, 0x50, + 0x14, 0xc7, 0x29, 0x74, 0x20, 0xa7, 0x04, 0xea, 0x9d, 0x8b, 0x58, 0x97, 0xa6, 0x69, 0xa6, 0x92, + 0x69, 0xba, 0x04, 0xb3, 0xf8, 0x3c, 0x27, 0x06, 0xe2, 0xa4, 0xe4, 0x16, 0x30, 0xf1, 0x65, 0x29, + 0xb7, 0x37, 0xd0, 0xb0, 0xb5, 0xb5, 0x2d, 0x89, 0x7d, 0xf7, 0xc9, 0xa7, 0x7d, 0x18, 0x3f, 0x84, + 0x8f, 0x7b, 0xf4, 0xd1, 0xc0, 0x17, 0x31, 0xbd, 0x6d, 0xa1, 0xab, 0xdb, 0x0b, 0xdc, 0xff, 0xb9, + 0xbf, 0xff, 0x39, 0xff, 0xcb, 0xbd, 0x01, 0xa4, 0x20, 0x72, 0x88, 0xe7, 0xbb, 0xa1, 0x7b, 0x92, + 0x7c, 0xc6, 0x5a, 0x63, 0x4b, 0x54, 0x33, 0x9d, 0x28, 0x8c, 0x3c, 0xaa, 0x7e, 0x84, 0xda, 0x67, + 0x1a, 0x04, 0xe6, 0x9c, 0xa2, 0x57, 0x50, 0x5d, 0x50, 0xd3, 0xa2, 0x7e, 0x9b, 0x53, 0xb8, 0x8e, + 0xd0, 0x6d, 0x69, 0x29, 0xa4, 0xf5, 0x59, 0x19, 0xa7, 0xdb, 0x08, 0x01, 0x6f, 0x99, 0xa1, 0xd9, + 0x2e, 0x2b, 0x5c, 0xa7, 0x81, 0xd9, 0x5a, 0xfd, 0xc1, 0x41, 0x35, 0xc1, 0x50, 0x1b, 0x6a, 0xa1, + 0x6f, 0x12, 0x3a, 0xb0, 0x58, 0xa3, 0x06, 0xce, 0x24, 0x3a, 0x84, 0xba, 0x4f, 0xbf, 0xad, 0x68, + 0x10, 0x0e, 0x2c, 0xe6, 0xe6, 0xf1, 0xae, 0x10, 0xfb, 0x7c, 0xea, 0x5d, 0x45, 0x03, 0xab, 0x5d, + 0x61, 0x7b, 0x99, 0x44, 0x1d, 0xe0, 0xe3, 0x1c, 0x6d, 0x5e, 0xe1, 0x3a, 0xcd, 0xee, 0x93, 0x6d, + 0xae, 0x34, 0xf9, 0x38, 0xf2, 0x28, 0x66, 0x84, 0xfa, 0xab, 0x02, 0x55, 0x23, 0x0a, 0x42, 0x7a, + 0x8d, 0xde, 0x41, 0x7d, 0x61, 0x3a, 0x56, 0xb0, 0x30, 0x97, 0x34, 0x3d, 0xd1, 0xb3, 0xad, 0x33, + 0x61, 0xb4, 0x7e, 0x06, 0xe0, 0x1d, 0x1b, 0x4f, 0xf3, 0x6c, 0x67, 0xce, 0x02, 0x0a, 0xb9, 0x69, + 0xa9, 0x67, 0x64, 0x3b, 0x73, 0xcc, 0x08, 0xf4, 0x02, 0x2a, 0x26, 0x59, 0xb2, 0xb4, 0x42, 0x77, + 0xbf, 0x08, 0x9e, 0x91, 0x25, 0x8e, 0xf7, 0xa5, 0x53, 0xa8, 0xf7, 0x73, 0xdd, 0x5b, 0xec, 0x0a, + 0x88, 0x7b, 0x35, 0xa5, 0x7e, 0x60, 0xbb, 0x0e, 0x0b, 0x57, 0xc7, 0xc5, 0xb2, 0xa4, 0x02, 0x1f, + 0xcf, 0x42, 0x12, 0x3c, 0x5a, 0x39, 0xf6, 0xf7, 0xb1, 0x7d, 0x9d, 0x9c, 0x83, 0xc7, 0x5b, 0x2d, + 0x75, 0xa1, 0x72, 0x46, 0x96, 0xe8, 0x35, 0xec, 0x51, 0xdf, 0x77, 0xfd, 0x34, 0xf3, 0x41, 0x31, + 0x4a, 0x2f, 0xde, 0xc4, 0x09, 0x23, 0xdd, 0x70, 0xb0, 0xc7, 0x0a, 0x48, 0x03, 0x9e, 0xb8, 0x56, + 0xd2, 0xb5, 0xd9, 0x95, 0xee, 0x75, 0x69, 0xe7, 0xae, 0x45, 0x31, 0xe3, 0x90, 0x02, 0x82, 0x45, + 0x03, 0xe2, 0xdb, 0x5e, 0x18, 0xe7, 0x2e, 0xb3, 0xdc, 0xf9, 0x92, 0x7a, 0x0a, 0x7c, 0xcc, 0x23, + 0x01, 0x6a, 0x93, 0xe1, 0xa7, 0xa1, 0xfe, 0x65, 0x28, 0x96, 0x90, 0x02, 0x87, 0x93, 0xa1, 0x31, + 0x19, 0x8d, 0x74, 0x3c, 0xee, 0x7d, 0xb8, 0x1c, 0x61, 0x7d, 0xac, 0x9f, 0xeb, 0x17, 0x97, 0xd3, + 0x1e, 0x36, 0x06, 0xfa, 0x50, 0x04, 0xf5, 0x67, 0x19, 0x1a, 0xc6, 0x6a, 0xb6, 0xed, 0x83, 0x2e, + 0xa0, 0x19, 0x24, 0x7a, 0x46, 0x0d, 0xcf, 0x24, 0xd9, 0x0d, 0x1e, 0xed, 0x32, 0xe6, 0xf0, 0x4c, + 0xa4, 0x2c, 0x2e, 0x78, 0x11, 0x06, 0x71, 0xe5, 0x14, 0xfa, 0x25, 0xbf, 0xd4, 0xcb, 0xfb, 0xfb, + 0x4d, 0x0a, 0x34, 0xfe, 0xcf, 0x2f, 0x1d, 0x43, 0xf3, 0xee, 0xd4, 0xf8, 0xfd, 0x06, 0xde, 0xee, + 0xdd, 0xd7, 0x71, 0x26, 0xa5, 0x37, 0x20, 0x16, 0x3b, 0x3e, 0x4c, 0xab, 0x0a, 0xf0, 0x46, 0xe4, + 0x90, 0x87, 0x89, 0xe3, 0x29, 0x08, 0xb9, 0xa7, 0x8f, 0x0e, 0xe0, 0x71, 0x4e, 0x26, 0x97, 0x27, + 0x96, 0xd0, 0x73, 0x78, 0x9a, 0x2f, 0xe7, 0xce, 0x27, 0x72, 0x68, 0x1f, 0x5a, 0x77, 0x3c, 0x0e, + 0x11, 0xcb, 0xef, 0x8f, 0x7e, 0xaf, 0x65, 0xee, 0x76, 0x2d, 0x73, 0x7f, 0xd7, 0x32, 0x77, 0xb3, + 0x91, 0x4b, 0xb7, 0x1b, 0xb9, 0xf4, 0x67, 0x23, 0x97, 0xbe, 0xc2, 0xc9, 0xf6, 0xcf, 0x64, 0x56, + 0x65, 0x5f, 0x6f, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2f, 0x92, 0xfd, 0x60, 0x04, 0x00, + 0x00, } -func (m *SyncMessage) Marshal() (dAtA []byte, err error) { +func (m *Message) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -94,24 +692,407 @@ func (m *SyncMessage) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *SyncMessage) MarshalTo(dAtA []byte) (int, error) { +func (m *Message) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *SyncMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if m.Seq != 0 { - i = encodeVarintSync(dAtA, i, uint64(m.Seq)) + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = encodeVarintSync(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x12 + } + if m.Header != nil { + { + size, err := m.Header.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Header) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Header) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Header) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Type != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x20 + } + if m.ReplyId != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.ReplyId)) + i-- + dAtA[i] = 0x18 + } + if m.RequestId != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.RequestId)) + i-- + dAtA[i] = 0x10 + } + if len(m.TraceId) > 0 { + i -= len(m.TraceId) + copy(dAtA[i:], m.TraceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.TraceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *System) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *System) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *System) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Ack != nil { + { + size, err := m.Ack.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Ping != nil { + { + size, err := m.Ping.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Handshake != nil { + { + size, err := m.Handshake.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SystemHandshake) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemHandshake) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemHandshake) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ProtocolVersion) > 0 { + i -= len(m.ProtocolVersion) + copy(dAtA[i:], m.ProtocolVersion) + i = encodeVarintSync(dAtA, i, uint64(len(m.ProtocolVersion))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SystemPing) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemPing) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemPing) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.UnixTime != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.UnixTime)) i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil } +func (m *SystemAck) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemAck) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemAck) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Error != nil { + { + size, err := m.Error.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} + +func (m *SystemError) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemError) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemError) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Description) > 0 { + i -= len(m.Description) + copy(dAtA[i:], m.Description) + i = encodeVarintSync(dAtA, i, uint64(len(m.Description))) + i-- + dAtA[i] = 0x12 + } + if m.Code != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.Code)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Subscription) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Subscription) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Subscription) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.UnsubscribeSpace != nil { + { + size, err := m.UnsubscribeSpace.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.SubscribeSpace != nil { + { + size, err := m.SubscribeSpace.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SubscriptionSubscribeSpace) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscriptionSubscribeSpace) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SubscriptionSubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SubscriptionUnsubscribeSpace) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscriptionUnsubscribeSpace) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SubscriptionUnsubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Sync) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Sync) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Sync) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintSync(dAtA []byte, offset int, v uint64) int { offset -= sovSync(v) base := offset @@ -123,14 +1104,172 @@ func encodeVarintSync(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *SyncMessage) Size() (n int) { +func (m *Message) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Seq != 0 { - n += 1 + sovSync(uint64(m.Seq)) + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovSync(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Header) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TraceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + if m.RequestId != 0 { + n += 1 + sovSync(uint64(m.RequestId)) + } + if m.ReplyId != 0 { + n += 1 + sovSync(uint64(m.ReplyId)) + } + if m.Type != 0 { + n += 1 + sovSync(uint64(m.Type)) + } + return n +} + +func (m *System) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Handshake != nil { + l = m.Handshake.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.Ping != nil { + l = m.Ping.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.Ack != nil { + l = m.Ack.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemHandshake) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ProtocolVersion) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemPing) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.UnixTime != 0 { + n += 1 + sovSync(uint64(m.UnixTime)) + } + return n +} + +func (m *SystemAck) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Error != nil { + l = m.Error.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemError) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Code != 0 { + n += 1 + sovSync(uint64(m.Code)) + } + l = len(m.Description) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Subscription) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SubscribeSpace != nil { + l = m.SubscribeSpace.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.UnsubscribeSpace != nil { + l = m.UnsubscribeSpace.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SubscriptionSubscribeSpace) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SubscriptionUnsubscribeSpace) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Sync) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) } return n } @@ -141,7 +1280,7 @@ func sovSync(x uint64) (n int) { func sozSync(x uint64) (n int) { return sovSync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *SyncMessage) Unmarshal(dAtA []byte) error { +func (m *Message) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -164,17 +1303,17 @@ func (m *SyncMessage) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: SyncMessage: wiretype end group for non-group") + return fmt.Errorf("proto: Message: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: SyncMessage: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType) + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) } - m.Seq = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowSync @@ -184,11 +1323,1067 @@ func (m *SyncMessage) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Seq |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &Header{} + } + if err := m.Header.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Header) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Header: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Header: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TraceId", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TraceId = append(m.TraceId[:0], dAtA[iNdEx:postIndex]...) + if m.TraceId == nil { + m.TraceId = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestId", wireType) + } + m.RequestId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplyId", wireType) + } + m.ReplyId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReplyId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= MessageType(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *System) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: System: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: System: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Handshake", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Handshake == nil { + m.Handshake = &SystemHandshake{} + } + if err := m.Handshake.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ping", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Ping == nil { + m.Ping = &SystemPing{} + } + if err := m.Ping.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ack", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Ack == nil { + m.Ack = &SystemAck{} + } + if err := m.Ack.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemHandshake) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Handshake: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Handshake: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ProtocolVersion", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ProtocolVersion = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemPing) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Ping: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Ping: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UnixTime", wireType) + } + m.UnixTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UnixTime |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemAck) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Ack: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Ack: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Error == nil { + m.Error = &SystemError{} + } + if err := m.Error.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemError) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Error: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Error: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= SystemErrorCode(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Description = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Subscription) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Subscription: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Subscription: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SubscribeSpace", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SubscribeSpace == nil { + m.SubscribeSpace = &SubscriptionSubscribeSpace{} + } + if err := m.SubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UnsubscribeSpace", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.UnsubscribeSpace == nil { + m.UnsubscribeSpace = &SubscriptionUnsubscribeSpace{} + } + if err := m.UnsubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscriptionSubscribeSpace) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SubscribeSpace: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscriptionUnsubscribeSpace) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UnsubscribeSpace: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UnsubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Sync) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Sync: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Sync: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipSync(dAtA[iNdEx:])