diff --git a/cmd/client/client.go b/cmd/client/client.go deleted file mode 100644 index 18bb7e99..00000000 --- a/cmd/client/client.go +++ /dev/null @@ -1,169 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/gogo/protobuf/proto" - "github.com/libp2p/go-libp2p-core/sec" - "go.uber.org/zap" - "io" - "net" - "net/http" - _ "net/http/pprof" - "os" - "os/signal" - "storj.io/drpc" - "storj.io/drpc/drpcconn" - "syscall" - "time" -) - -var log = logger.NewNamed("client") - -var ( - flagConfigFile = flag.String("c", "etc/config.yml", "path to config file") - flagVersion = flag.Bool("v", false, "show version and exit") - flagHelp = flag.Bool("h", false, "show help and exit") -) - -func main() { - flag.Parse() - - if *flagVersion { - fmt.Println(app.VersionDescription()) - return - } - if *flagHelp { - flag.PrintDefaults() - return - } - - if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" { - go func() { - http.ListenAndServe(debug, nil) - }() - } - - // create app - ctx := context.Background() - a := new(app.App) - - // open config file - conf, err := config.NewFromFile(*flagConfigFile) - if err != nil { - log.Fatal("can't open config file", zap.Error(err)) - } - - // bootstrap components - a.Register(conf). - Register(transport.New()). - Register(&Client{}) - - // start app - if err := a.Start(ctx); err != nil { - log.Fatal("can't start app", zap.Error(err)) - } - log.Info("app started", zap.String("version", a.Version())) - - // wait exit signal - exit := make(chan os.Signal, 1) - signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) - sig := <-exit - log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig))) - - // close app - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - if err := a.Close(ctx); err != nil { - log.Fatal("close error", zap.Error(err)) - } else { - log.Info("goodbye!") - } - time.Sleep(time.Second / 3) -} - -type Client struct { - conf config.GrpcServer - tr transport.Service - sc sec.SecureConn -} - -func (c *Client) Init(ctx context.Context, a *app.App) (err error) { - c.tr = a.MustComponent(transport.CName).(transport.Service) - c.conf = a.MustComponent(config.CName).(*config.Config).GrpcServer - return nil -} - -func (c *Client) Name() (name string) { - return "testClient" -} - -func (c *Client) Run(ctx context.Context) (err error) { - tcpConn, err := net.Dial("tcp", c.conf.ListenAddrs[0]) - if err != nil { - return - } - c.sc, err = c.tr.TLSConn(ctx, tcpConn) - if err != nil { - return - } - log.Info("connected with server", zap.String("serverPeer", c.sc.RemotePeer().String()), zap.String("per", c.sc.LocalPeer().String())) - - dconn := drpcconn.New(c.sc) - stream, err := dconn.NewStream(ctx, "", enc{}) - if err != nil { - return - } - go c.handleStream(stream) - return nil -} - -func (c *Client) handleStream(stream drpc.Stream) { - var err error - defer func() { - log.Info("stream closed", zap.Error(err)) - }() - var n int64 = 100000 - for i := int64(0); i < n; i++ { - st := time.Now() - if err = stream.MsgSend(&syncproto.SyncMessage{Seq: i}, enc{}); err != nil { - if err == io.EOF { - return - } - log.Fatal("send error", zap.Error(err)) - } - log.Debug("message sent", zap.Int64("seq", i)) - msg := &syncproto.SyncMessage{} - if err := stream.MsgRecv(msg, enc{}); err != nil { - if err == io.EOF { - return - } - log.Error("msg recv error", zap.Error(err)) - } - log.Debug("message received", zap.Int64("seq", msg.Seq), zap.Duration("dur", time.Since(st))) - time.Sleep(time.Second) - } -} - -func (c *Client) Close(ctx context.Context) (err error) { - if c.sc != nil { - return c.sc.Close() - } - return -} - -type enc struct{} - -func (e enc) Marshal(msg drpc.Message) ([]byte, error) { - return msg.(proto.Marshaler).Marshal() -} - -func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { - return msg.(proto.Unmarshaler).Unmarshal(buf) -} diff --git a/cmd/node/node.go b/cmd/node/node.go index fddad6f7..69c5c601 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -7,15 +7,11 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/api" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/drpcserver" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/example" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "go.uber.org/zap" "net/http" _ "net/http/pprof" @@ -89,13 +85,9 @@ func main() { } func Bootstrap(a *app.App) { - a.Register(account.New()). - Register(transport.New()). - Register(drpcserver.New()). - Register(node.New()). - Register(document.New()). - Register(message.New()). - Register(requesthandler.New()). - Register(treecache.New()). - Register(api.New()) + a.Register(secure.New()). + Register(server.New()). + Register(dialer.New()). + Register(pool.NewPool()). + Register(&example.Example{}) } diff --git a/config/config.go b/config/config.go index 1766eec0..4b610ca9 100644 --- a/config/config.go +++ b/config/config.go @@ -26,9 +26,7 @@ func NewFromFile(path string) (c *Config, err error) { type Config struct { Anytype Anytype `yaml:"anytype"` GrpcServer GrpcServer `yaml:"grpcServer"` - Account Account `yaml:"account"` - APIServer APIServer `yaml:"apiServer"` - Nodes []Node `yaml:"nodes"` + PeerList PeerList `yaml:"peerList"` } func (c *Config) Init(ctx context.Context, a *app.App) (err error) { diff --git a/config/grpc.go b/config/grpc.go index 700c3567..01b797fa 100644 --- a/config/grpc.go +++ b/config/grpc.go @@ -2,5 +2,4 @@ package config type GrpcServer struct { ListenAddrs []string `yaml:"listenAddrs"` - TLS bool `yaml:"tls"` } diff --git a/config/peer.go b/config/peer.go new file mode 100644 index 00000000..17680e2b --- /dev/null +++ b/config/peer.go @@ -0,0 +1,14 @@ +package config + +type PeerList struct { + MyId struct { + PeerId string `yaml:"peerId"` + PrivKey string `yaml:"privKey"` + } `yaml:"myId"` + Remote []PeerRemote `yaml:"remote"` +} + +type PeerRemote struct { + PeerId string `yaml:"peerId"` + Addr string `yaml:"addr"` +} diff --git a/etc/config.1.yml b/etc/config.1.yml new file mode 100644 index 00000000..093d1ef7 --- /dev/null +++ b/etc/config.1.yml @@ -0,0 +1,16 @@ +anytype: + swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec" + +grpcServer: + listenAddrs: + - "127.0.0.1:4431" + +peerList: + myId: + peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + privKey: "InCGjb55V9+jj2PebUExUuwrpOIBc4hmgk2dSqyk3k4DjmgrdoNVuFe7xCFaFdUVb0RJYj6A+OTp2yXASTmq2w==" + remote: + - peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + addr: "127.0.0.1:4430" + - peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + addr: "127.0.0.1:4432" diff --git a/etc/config.2.yml b/etc/config.2.yml new file mode 100644 index 00000000..701fdc5f --- /dev/null +++ b/etc/config.2.yml @@ -0,0 +1,16 @@ +anytype: + swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec" + +grpcServer: + listenAddrs: + - "127.0.0.1:4432" + +peerList: + myId: + peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + privKey: "jynYZBgtM4elT+6e7M5UERTJCZgUd3hDdmQjCqTpApyJ4h53V6TQan4Ru4OXqz+91rCLjpIVdphhaB0l+TvNsA==" + remote: + - peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + addr: "127.0.0.1:4431" + - peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + addr: "127.0.0.1:4430" diff --git a/etc/config.yml b/etc/config.yml index f66b7fdd..782b6015 100644 --- a/etc/config.yml +++ b/etc/config.yml @@ -1,26 +1,16 @@ anytype: - swarmKey: /key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec + swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec" + grpcServer: listenAddrs: - - 127.0.0.1:4430 - - 127.0.0.1:4431 - tls: false -account: - peerId: 12D3KooWPviVQZY23iXQU7eva7k4ojyScagRAqE8cXm6cTm6rhNs - signingKey: 3inYSp8q7WidZi7RkhLFNZPF9yMJXHpnyrTpTsE7cd2ggESfTAEpWiQNa8zCnNdzK9DpG6Mu5hDCZhfU6RMDL466H8om - encryptionKey: udNkNsbKtm7eu1Bqt6DUwLZka2Rq22vm33K5eezrMkGnXx8X13oQ8HBSHgkLmRZTC1737hgZ66s92QRNe1YC1v8JNiv6yXVuV3MujBkmrYYVbGB7DBz6REsGAMPmuqDiG5PubjVHP468MDJJYRTZN6VEJuSGZtEAyLBJe9iErRVFsMdNc6ZMbY5pxoYE6LHWbsfZejrnAXvgABTnYTN4ad6EaxRiEWHGACizr59uf3KYFb9hFwxkFMhenpTpxbkBwQp3V8MxV4eLbgsWQPMpfeGeinjmNRAnZfi6tnqSDLyy3RWM5y9W89rYKt3EdVRwcFWFHh8FgSjzhpqE77GEwjGU9ddKorUfW85jQGdd2xdFwQfLF2eTGEwSxX9Gz2fTTtkN4pNJC8MesKK2cRsLAZLb9rv5ebCMgQ3S86WjfbWRcEYLoKUWKjujiY1XT5GszTgSa2b68QpcDmqgUv7FiwcZwC5qaCsgoARZ2GGsPgW7mCBahxTJnor4dCLF9Aiz1YvFvwWzm8k9zUmY5pxPQFNSWfAAcodqEHJV6i9TSf5ERNLJCQiRTSStZZdoeqAf42sYCSdiaiVUc26Q7QcTxtxxpfCTqGc2Lgh6ofgwG1m2CfmhXTsBDwBFRc8q6TcGJ4fT4WuZXF9f4MhKHqgqppFKMWYyvtJyvypbJ735kU63UWWSMCo8BbVTRrtaq7bPHbgvHHWYso5o7PeqNuhD2YwTKd6wpbAeg8wpv3eEWB2sgotQ5JiW2itEyinLZHRpLvbtYGFWctsZisvBbCssiCsPqzG1DffZWNjt2nj1vztuapZLpz9azanCaXhFxLBBLVveEk4tvEknHtREwXtmTpQK5GqpmW9LTFqA5jj7fVtytYQFufe9Cp4HKNNodv1UCPQ1KTLwkwA6UEMwqamvE67GDDvjmE2XaMoq6ErEYX6FkQi2d9kbLJaFVGwYH3DVcbBGAJzSoeYceosuoQJBcKS19BhFqXWT1W3iKvMibadh8MYLim7ncxezuHFCyUPFnFn33kfzzTWTsFYbKDukCECRBKciptevx9zS6V2YiSgcYRWV7Rd8sNg8vAGUBQ6mLBtWNExn5gRkoBQDqN9BhfqpEbGo33JHb7GVLsN7um9VxKcTSkiDLPKTxijNevHeRWRbtbCvQ31WHaNLzihnsowVQMk8SV6CiqzqwUb4ETxiXCDHABGuCACcYVC2jgEoFuqSAwS6c2y5sfcjxWSe3hs9eLjyWmeHu75MbNcJN8EtBrLs4j2tWGtbq6BR7SYVcygUbscf75HKncC4YBkF1MWRHmos11ZKaCwLfjsTW6XUXdfhvpco6mxic8muiq5XA3G8x47stQ3eBoSXoehRAMd4FKNv9geDagAhjorfcMd7kCT5CEENMqCLpjtUsFXSGtjqpYg1aeaQsxdEszfw2LS5pvPQEnYmqMRxZsuYHbuAH4J7sQwvjn7ezWfpsJfwQT6EqDFJp4g6vcgfUeCYzim9DVDnJYWocL9Uzb9P3rvknDKZmAkFWgTVWq16GaX4xUsXVNLMkDm88q4vvxrCQyAerNzox27uF2yH1ca66pVu9GZkGHM1KJrNmyyYEm445wNgkgSx8ivhyC11SUKs3nTKNVrnc -apiServer: - port: "8080" -nodes: - - peerId: 12D3KooWPviVQZY23iXQU7eva7k4ojyScagRAqE8cXm6cTm6rhNs - address: 127.0.0.1:4430 - signingKey: 3inYSp8q7WidZi7RkhLFNZPF9yMJXHpnyrTpTsE7cd2ggESfTAEpWiQNa8zCnNdzK9DpG6Mu5hDCZhfU6RMDL466H8om - encryptionKey: udNkNsbKtm7eu1Bqt6DUwLZka2Rq22vm33K5eezrMkGnXx8X13oQ8HBSHgkLmRZTC1737hgZ66s92QRNe1YC1v8JNiv6yXVuV3MujBkmrYYVbGB7DBz6REsGAMPmuqDiG5PubjVHP468MDJJYRTZN6VEJuSGZtEAyLBJe9iErRVFsMdNc6ZMbY5pxoYE6LHWbsfZejrnAXvgABTnYTN4ad6EaxRiEWHGACizr59uf3KYFb9hFwxkFMhenpTpxbkBwQp3V8MxV4eLbgsWQPMpfeGeinjmNRAnZfi6tnqSDLyy3RWM5y9W89rYKt3EdVRwcFWFHh8FgSjzhpqE77GEwjGU9ddKorUfW85jQGdd2xdFwQfLF2eTGEwSxX9Gz2fTTtkN4pNJC8MesKK2cRsLAZLb9rv5ebCMgQ3S86WjfbWRcEYLoKUWKjujiY1XT5GszTgSa2b68QpcDmqgUv7FiwcZwC5qaCsgoARZ2GGsPgW7mCBahxTJnor4dCLF9Aiz1YvFvwWzm8k9zUmY5pxPQFNSWfAAcodqEHJV6i9TSf5ERNLJCQiRTSStZZdoeqAf42sYCSdiaiVUc26Q7QcTxtxxpfCTqGc2Lgh6ofgwG1m2CfmhXTsBDwBFRc8q6TcGJ4fT4WuZXF9f4MhKHqgqppFKMWYyvtJyvypbJ735kU63UWWSMCo8BbVTRrtaq7bPHbgvHHWYso5o7PeqNuhD2YwTKd6wpbAeg8wpv3eEWB2sgotQ5JiW2itEyinLZHRpLvbtYGFWctsZisvBbCssiCsPqzG1DffZWNjt2nj1vztuapZLpz9azanCaXhFxLBBLVveEk4tvEknHtREwXtmTpQK5GqpmW9LTFqA5jj7fVtytYQFufe9Cp4HKNNodv1UCPQ1KTLwkwA6UEMwqamvE67GDDvjmE2XaMoq6ErEYX6FkQi2d9kbLJaFVGwYH3DVcbBGAJzSoeYceosuoQJBcKS19BhFqXWT1W3iKvMibadh8MYLim7ncxezuHFCyUPFnFn33kfzzTWTsFYbKDukCECRBKciptevx9zS6V2YiSgcYRWV7Rd8sNg8vAGUBQ6mLBtWNExn5gRkoBQDqN9BhfqpEbGo33JHb7GVLsN7um9VxKcTSkiDLPKTxijNevHeRWRbtbCvQ31WHaNLzihnsowVQMk8SV6CiqzqwUb4ETxiXCDHABGuCACcYVC2jgEoFuqSAwS6c2y5sfcjxWSe3hs9eLjyWmeHu75MbNcJN8EtBrLs4j2tWGtbq6BR7SYVcygUbscf75HKncC4YBkF1MWRHmos11ZKaCwLfjsTW6XUXdfhvpco6mxic8muiq5XA3G8x47stQ3eBoSXoehRAMd4FKNv9geDagAhjorfcMd7kCT5CEENMqCLpjtUsFXSGtjqpYg1aeaQsxdEszfw2LS5pvPQEnYmqMRxZsuYHbuAH4J7sQwvjn7ezWfpsJfwQT6EqDFJp4g6vcgfUeCYzim9DVDnJYWocL9Uzb9P3rvknDKZmAkFWgTVWq16GaX4xUsXVNLMkDm88q4vvxrCQyAerNzox27uF2yH1ca66pVu9GZkGHM1KJrNmyyYEm445wNgkgSx8ivhyC11SUKs3nTKNVrnc - - peerId: 12D3KooWPzziVTnuypw4MC7m4mzdoh9CM5FXGrYiTW81pfjngmwx - address: 127.0.0.1:4432 - signingKey: 3i7TZgCtctChP9rA13BDuK2GmdbRL6sgk51JjetKyFt3SEqXN8M4EwcREWVfPYmayBKYPHveLzY7NsggxvKapJFdYH8F - encryptionKey: JgG4CcCbae1qEpe7mL87XWScKZWBBqSZseakgKVJfHqTUt3CiLPMKPDZtzQWLfb2ouiTbff5YTBZbGxwKqWYMAW8prEVjbcXBtkCi2Y2XkZ8v1ugidF5MC3d1ifnh5Es5CafB8kToUegtAig8vhMCuXAzLQpbiQyFLf25KcyKfVvHUomafaHyKCbYu1bmyGRC7CGxUD6jnPg2MYZZiJiq9scNRaRxbiqCe9C2VcUTFdigV65eGmZPKGgfAThcyKCdQSe1hmoXtNw5MaqbB3eQvkuS4GySap3SaEZUmGsmuVumUvHappUpN8x3UQs1TNWfYxULVx3tUMHfQ98G9No4gwP42ENMgCuMVQW9M68s73WCGucvpPUZdvQFiSJkSoKDqi2F6zdrM2LZTp8qGciunKDFZvh14eAGDbPLw9dBLVmMQLKq68yz8Nzi9BYWjzLa4fDNkoYETB8rArNPECAjVVW6PYLzueVNmqvo7RdWZGHF2YCRXr1wLG7jg4WBX4Lm5jQZS8bkWacbpj49b4nxRnjLsGusdCwRCZPY9qFPim2WU4z2KEgRS87x97TiYAThLUJwGBv5pBKRLkrAAoFWwkDSfxcRt83FiZtJ1hLWkpj6fK56SuubQUjWzNfVPCTfNqcod54PAPVESTuAPjDsmM4YeMGgUpaK9Goqbu1FuuZ9P6mQPwGKdpGuVgjJv7uQJZw3fpC9H1vJ4fa2wBrGFkahyEFo2QDb8iG9oGarpFxewJVVs12cpKJeQzU5dLFBvQJmGnHASTnBwfNBieDT7ZeFmnGuN7sLaPJ5wgdKJpY6aAooztyWfJPKTbxzYjUQMVoyW3YbTD3qg8F9grqVT1VFAp88ta1u55eokqz7h82EJNQf7GptDHxPRg4euAJ3BDvmRUfAwYzU8TD1gyiUnnHGRuvH25xgJRqauNrDLUbha33wVAHeMzWJhpLjLz4DLmtLejpfwNr5PuEbczqFAaLspf8dFjkrvmC3RrwHngKuzTkYo46WSJggWs3CwWDpzMmrATh2LRYmdH7BtNUkvu7onRzms8jWHBsFNfbCitMpfAsM3XmgtFUGTMi4LzVK3f5G8HqwGqcVEnFJTZzdoBsfNMKyFapXPnUBUHQse8jZjSYQv3dJpVuTV6HQ5R8saBdKDGMpXFyYqZK85cikcK6Nkd7ZrnbUUN2i6f3ano7hnxTbeW7kGxatUWETTwAzwGjdMhpJjieLFBTomAqqfXopEr1FeWcFmQ6kYBL2fa6XYqhZJevG6LpNXeuLcbyTiADt5CMNwt6MTmEomqtzqE7BgvqYEibuEj5K22ufvvLmX7XbGiB7wZKRPGuTR1pPDgJbzU9HGmF5fWjTVGPeNMn7ZmbQVcys6xy79notYp6MoQhBCTWyuuEuc24REEfMCjPuhNgssJQWT3ZR3xskrcEdCUyGbvQT2eWVa6atYsu5Z2m7sGXXwfH7vhdiUkVZEmhNPusij7dsMa9io63D6itYuu8Ed5aZYoUkZCfKZ39KQ16rtYpa9mfcwG4fyQy11Zy9G7nxdNJohGYeq3xuerSdwTDNdK2PREJcAoNA7Nfcsxk7333B5gXNFZKcBLQk56C1PqApLerSfo3jVuojcNAxYB688K6 - - peerId: 12D3KooWL9qu6sDua8hK8sEyeUVzpei8SVf3DA4W1gVAxDMwW9Ze - address: 127.0.0.1:4434 - signingKey: 3iPPhpw3jndkSCvpZ4yf5eWYydZwbFAk7gdMkXZBgyh939AicHVakwkabDpxzhtrsgJEXrP5sKam9QL7weMyyoMrKNCH - encryptionKey: JgG4CcCbae1qEpe7mL5br9LinoxiiuQqGC7y6HBbvmQ9pcZM4F4q2oGDQA71ZumaTJ1RAiUDaxi1fBVnrvv4Tqm9fau3NyijggQ246UtHKemsPFBX9Qyv4oYZJ9X3WNNc9q3BkPX9C8WHdfda9fxW4wD1QYKzd5F1J7FdhxZh3MZUdc4GEfPLwew8dTfEJp7ipAvM7TY1r6URe4VHVmsdaEL3jsEUzEqL4MRSc69WdjdKZLyVzQVcjnLegFfoH2c6476MfnoPggeVGghPjDmoqoJ2kzHwjvDQKoGyC9m4wkicS4YZkVA6N2ezKBSumJkV1mY6hpBPuCtdYLDw2zsCqMq9AXFhtMxzSCtLLRnRVntErY3Wb8ty5vEnS8aY7B612okwdsMg2rYxP55ccK4zP7PoKTKTS4TWTqGiLW2GtptTLhQ7ynJudG9YArzwwjgqXaxExrMqFCSMCfQbVNvwv3SckfpwQjpJjCfRS73SQKPY8ghqrdKnuRHyUWpJbYb9y5doCWMCuLw6u6t2LqfcwknNkjoWNCG69tohSgRStbYfmA7AZvHzPGc3gKByZecVxdmvPmtbSLXRiB9FbqtuqEerGxpisL3FhDM6SVqxRm2E2SxwJTHVcyL2LNF5tE2V7mujSa5dwzZfbRA9p3SwFeRhd42z1r36iL92yugfa2SbHYYDhdKFCJLhMc8ogqd1b8oUBbwBktKg6p6MJeZdqS2vfsLQRnV8TyP72fKUjFtaQ4o7D6vQ9VnsBMQS9FF8PEuZ5yF3XQaMjJMpeGho3c4dNFEw4vCiUaVN2WzF7XR28dyGwLASMoVFuUoErgr7TUQm4yKJG8QaHePx7A7Ku3Ys6fvrDsBVJWz5taQB7cMBqXmrAg2DHSUwqr8XfhxXnv8cnZZzPTrtib86P7w3kUYLrJYaR6jur5jTFucFSxthKEMqzBPtS3wgn2hcj31ATSNR7FNDcZmteWex5vAzGkuDoCG4c7dUCGinT3akYvukR7WugdHAYySkhf6AhNGg4wrZ7y3YVRMwRkex3PumcwGGKoTvNMNMqz2kTCNjSsDr6xRBk8njCLwDMunvDZM7VThTMKQ4CvegCtX3kV11qk46WjouqFedYSmShmxB5dMMVKui3owicS3ioVXoSqCRAcov3LJ3zL5aQ19FBBvvezeN27PkcZjvXKQwNEpnMzJeLTLdz8guWrER9Vn7XB3zgvt6xzab8JbHJ62XEEGFC5EoU68D3Wn9oXH6EqfCNbUsfW6EScc7wcsRu7aLi4ddjYrDcd4pQgct3dpAzR9Z4ZHRgQi9k4DF8bg9LUjfjaJS53DGSiSSfGBNtkpdyLp8K8Y89BZRnw21yVcKwA8JD47By6s3vfJ8PdSTs4Lsfm7mEhERuVLyQC9wpPe6ysjzUTQjx7RyTaVVA9ENQfpc1XCdHufDFST1eeiRFLwKzEFCr2bATo5r5oQ6UtBdQv6eFaGv19ju4cikXTNq2vCGw6aMupdaESYeWL5Gs8ssCjb1K2iZfGwqidfTJejEiuoiD2S6shTRv6BFz3JrhU7XsZdLkU9rvDYJs6rWq4C4ix4VWDxNHktF7iBuCASnZjgMYs9Lipt5hZEow4FrWcvUtVQ8PXQT3SC + - "127.0.0.1:4430" + +peerList: + myId: + peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C" + privKey: "3BhkWxi0Vzc2AhLtw7LhFHa9Ys4P0wHOlPCW02O6FKJvS8V2nzkJGDlM3vcSLZZ0m9tyYuCKqK4TWUKXgu/FVQ==" + remote: + - peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U" + addr: "127.0.0.1:4431" + - peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H" + addr: "127.0.0.1:4432" diff --git a/go.mod b/go.mod index 28de7481..8594463c 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect lukechampine.com/blake3 v1.1.6 // indirect ) diff --git a/go.sum b/go.sum index 5cb5955d..12ac3e57 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/pkg/ocache/ocache.go b/pkg/ocache/ocache.go index 9f72aa38..b0460ca6 100644 --- a/pkg/ocache/ocache.go +++ b/pkg/ocache/ocache.go @@ -10,9 +10,10 @@ import ( ) var ( - ErrClosed = errors.New("object cache closed") - ErrExists = errors.New("object exists") - ErrTimeout = errors.New("loading object timed out") + ErrClosed = errors.New("object cache closed") + ErrExists = errors.New("object exists") + ErrTimeout = errors.New("loading object timed out") + ErrNotExists = errors.New("object not exists") ) var ( @@ -20,10 +21,6 @@ var ( defaultGC = 20 * time.Second ) -type key int - -const CacheTimeout key = 0 - var log = logger.NewNamed("ocache") type LoadFunc func(ctx context.Context, id string) (value Object, err error) @@ -61,7 +58,9 @@ func New(loadFunc LoadFunc, opts ...Option) OCache { for _, o := range opts { o(c) } - go c.ticker() + if c.ttl != 0 { + go c.ticker() + } return c } @@ -99,6 +98,8 @@ type OCache interface { // Increases the object refs counter on successful // When 'loadFunc' returns a non-nil error, an object will not be stored to cache Get(ctx context.Context, id string) (value Object, err error) + // Pick returns value if it's presents in cache (will not call loadFunc) + Pick(id string) (value Object, err error) // Add adds new object to cache // Returns error when object exists Add(id string, value Object) (err error) @@ -154,28 +155,28 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) { e.refCount++ c.mu.Unlock() - timeout := ctx.Value(CacheTimeout) if load { - if timeout != nil { - go c.load(ctx, id, e) - } else { - c.load(ctx, id, e) - } + go c.load(ctx, id, e) } - - if timeout != nil { - duration := timeout.(time.Duration) - select { - case <-e.load: - return e.value, e.loadErr - case <-time.After(duration): - return nil, ErrTimeout - } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.load: } - <-e.load return e.value, e.loadErr } +func (c *oCache) Pick(id string) (value Object, err error) { + c.mu.Lock() + val, ok := c.data[id] + c.mu.Unlock() + if !ok { + return nil, ErrNotExists + } + <-val.load + return val.value, val.loadErr +} + func (c *oCache) load(ctx context.Context, id string, e *entry) { defer close(e.load) value, err := c.loadFunc(ctx, id) diff --git a/pkg/ocache/ocache_test.go b/pkg/ocache/ocache_test.go index eb3a1caf..55bf2e95 100644 --- a/pkg/ocache/ocache_test.go +++ b/pkg/ocache/ocache_test.go @@ -92,6 +92,20 @@ func TestOCache_Get(t *testing.T) { _, err := c.Get(context.TODO(), "id") assert.Equal(t, ErrClosed, err) }) + t.Run("context cancel", func(t *testing.T) { + c := New(func(ctx context.Context, id string) (value Object, err error) { + time.Sleep(time.Second / 3) + return &testObject{ + name: "id", + }, nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.Get(ctx, "id") + assert.Equal(t, context.Canceled, err) + assert.NoError(t, c.Close()) + }) } func TestOCache_GC(t *testing.T) { diff --git a/service/example/example.go b/service/example/example.go new file mode 100644 index 00000000..c3513fd8 --- /dev/null +++ b/service/example/example.go @@ -0,0 +1,84 @@ +package example + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "go.uber.org/zap" + "strings" + "time" +) + +var log = logger.NewNamed("example") + +type Example struct { + pool pool.Pool + peerConf config.PeerList +} + +func (e *Example) Init(ctx context.Context, a *app.App) (err error) { + e.pool = a.MustComponent(pool.CName).(pool.Pool) + e.peerConf = a.MustComponent(config.CName).(*config.Config).PeerList + + // subscribe for sync messages + e.pool.AddHandler(syncproto.MessageType_MessageTypeSync, e.syncHandler) + return +} + +func (e *Example) Name() (name string) { + return "example" +} + +func (e *Example) Run(ctx context.Context) (err error) { + // dial manually with all peers + for _, rp := range e.peerConf.Remote { + if er := e.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil { + log.Info("can't dial to peer", zap.Error(er)) + } else { + log.Info("connected with peer", zap.String("peerId", rp.PeerId)) + } + } + go e.doRequests() + return nil +} + +func (e *Example) syncHandler(ctx context.Context, msg *pool.Message) (err error) { + data := string(msg.Data) // you need unmarshal this bytes + log.Info("msg received", zap.String("peerId", msg.Peer().Id()), zap.String("data", data)) + + if strings.HasPrefix(data, "ack:") { + if err = msg.Ack(); err != nil { + log.Error("ack error", zap.Error(err)) + } + } else if strings.HasPrefix(data, "ackErr:") { + if err = msg.AckError(42, "ack error description"); err != nil { + log.Error("ackErr error", zap.Error(err)) + } + } else if strings.HasPrefix(data, "reply:") { + if err = msg.Reply([]byte("reply for:" + strings.TrimPrefix(data, "reply:"))); err != nil { + log.Error("reply error", zap.Error(err)) + } + } + return nil +} + +func (e *Example) doRequests() { + time.Sleep(time.Second) + ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + st := time.Now() + err := e.pool.SendAndWait(ctx, e.peerConf.Remote[0].PeerId, &syncproto.Message{ + Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync}, + Data: []byte("ack: something"), + }) + log.Info("sent with ack:", zap.Error(err), zap.Duration("dur", time.Since(st))) +} + +func (e *Example) Close(ctx context.Context) (err error) { + return +} diff --git a/service/net/dialer/dialer.go b/service/net/dialer/dialer.go new file mode 100644 index 00000000..a1cb8a31 --- /dev/null +++ b/service/net/dialer/dialer.go @@ -0,0 +1,104 @@ +package dialer + +import ( + "context" + "errors" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" + "github.com/libp2p/go-libp2p-core/sec" + "go.uber.org/zap" + "net" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "sync" +) + +const CName = "net/dialer" + +var ErrArrdsNotFound = errors.New("addrs for peer not found") + +var log = logger.NewNamed(CName) + +func New() Dialer { + return &dialer{} +} + +type Dialer interface { + Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) + UpdateAddrs(addrs map[string][]string) + app.Component +} + +type dialer struct { + transport secure.Service + peerAddrs map[string][]string + + mu sync.RWMutex +} + +func (d *dialer) Init(ctx context.Context, a *app.App) (err error) { + d.transport = a.MustComponent(secure.CName).(secure.Service) + peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote + d.peerAddrs = map[string][]string{} + for _, rp := range peerConf { + d.peerAddrs[rp.PeerId] = []string{rp.Addr} + } + return +} + +func (d *dialer) Name() (name string) { + return CName +} + +func (d *dialer) UpdateAddrs(addrs map[string][]string) { + d.mu.Lock() + d.peerAddrs = addrs + d.mu.Unlock() +} + +func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) { + d.mu.RLock() + defer d.mu.RUnlock() + addrs, ok := d.peerAddrs[peerId] + if !ok || len(addrs) == 0 { + return nil, ErrArrdsNotFound + } + var ( + stream drpc.Stream + sc sec.SecureConn + ) + for _, addr := range addrs { + stream, sc, err = d.makeStream(ctx, addr) + if err != nil { + log.Info("can't connect to host", zap.String("addr", addr)) + } else { + err = nil + break + } + } + if err != nil { + return + } + return rpc.PeerFromStream(sc, stream, false), nil +} + +func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Stream, sc sec.SecureConn, err error) { + tcpConn, err := net.Dial("tcp", addr) + if err != nil { + return + } + sc, err = d.transport.TLSConn(ctx, tcpConn) + if err != nil { + return + } + log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String())) + stream, err = drpcconn.New(sc).NewStream(ctx, "", rpc.Encoding) + if err != nil { + return + } + return stream, sc, err +} diff --git a/service/net/peer/peer.go b/service/net/peer/peer.go new file mode 100644 index 00000000..331b0982 --- /dev/null +++ b/service/net/peer/peer.go @@ -0,0 +1,35 @@ +package peer + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "time" +) + +type Dir uint + +const ( + // DirInbound indicates peer created connection + DirInbound Dir = iota + // DirOutbound indicates that our host created connection + DirOutbound +) + +type Info struct { + Id string + Dir Dir + LastActiveUnix int64 +} + +func (i Info) LastActive() time.Time { + return time.Unix(i.LastActiveUnix, 0) +} + +type Peer interface { + Id() string + Info() Info + Recv() (*syncproto.Message, error) + Send(msg *syncproto.Message) (err error) + Context() context.Context + Close() error +} diff --git a/service/net/pool/message.go b/service/net/pool/message.go new file mode 100644 index 00000000..bc067a12 --- /dev/null +++ b/service/net/pool/message.go @@ -0,0 +1,71 @@ +package pool + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "gopkg.in/mgo.v2/bson" +) + +type Message struct { + *syncproto.Message + peer peer.Peer +} + +func (m *Message) Peer() peer.Peer { + return m.peer +} + +func (m *Message) Reply(data []byte) (err error) { + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: m.GetHeader().TraceId, + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSync, + }, + Data: data, + } + return m.peer.Send(rep) +} + +func (m *Message) Ack() (err error) { + ack := &syncproto.System{ + Ack: &syncproto.SystemAck{}, + } + data, err := ack.Marshal() + if err != nil { + return + } + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: m.GetHeader().TraceId, + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + }, + Data: data, + } + return m.peer.Send(rep) +} + +func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) { + ack := &syncproto.System{ + Ack: &syncproto.SystemAck{ + Error: &syncproto.SystemError{ + Code: code, + Description: description, + }, + }, + } + data, err := ack.Marshal() + if err != nil { + return + } + rep := &syncproto.Message{ + Header: &syncproto.Header{ + TraceId: []byte(bson.NewObjectId()), + ReplyId: m.GetHeader().RequestId, + Type: syncproto.MessageType_MessageTypeSystem, + }, + Data: data, + } + return m.peer.Send(rep) +} diff --git a/service/net/pool/peer.go b/service/net/pool/peer.go new file mode 100644 index 00000000..6a7a96d0 --- /dev/null +++ b/service/net/pool/peer.go @@ -0,0 +1,28 @@ +package pool + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" +) + +type peerEntry struct { + peer peer.Peer + groupIds []string + ready chan struct{} +} + +func (pe *peerEntry) addGroup(groupId string) (ok bool) { + if slice.FindPos(pe.groupIds, groupId) != -1 { + return false + } + pe.groupIds = append(pe.groupIds, groupId) + return true +} + +func (pe *peerEntry) removeGroup(groupId string) (ok bool) { + if slice.FindPos(pe.groupIds, groupId) == -1 { + return false + } + pe.groupIds = slice.Remove(pe.groupIds, groupId) + return true +} diff --git a/service/net/pool/pool.go b/service/net/pool/pool.go new file mode 100644 index 00000000..21779ca0 --- /dev/null +++ b/service/net/pool/pool.go @@ -0,0 +1,305 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" + "go.uber.org/zap" + "sync" + "sync/atomic" +) + +const CName = "sync/peerPool" + +var log = logger.NewNamed("peerPool") + +var ( + ErrPoolClosed = errors.New("peer pool is closed") + ErrPeerNotFound = errors.New("peer not found") +) + +func NewPool() Pool { + return &pool{closed: true} +} + +type Handler func(ctx context.Context, msg *Message) (err error) + +type Pool interface { + DialAndAddPeer(ctx context.Context, id string) (err error) + AddAndReadPeer(peer peer.Peer) (err error) + AddHandler(msgType syncproto.MessageType, h Handler) + AddPeerIdToGroup(peerId, groupId string) (err error) + RemovePeerIdFromGroup(peerId, groupId string) (err error) + + SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) + Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) + + app.ComponentRunnable +} + +type pool struct { + peersById map[string]*peerEntry + waiters waiters + handlers map[syncproto.MessageType][]Handler + peersIdsByGroup map[string][]string + + dialer dialer.Dialer + + closed bool + mu sync.RWMutex + wg *sync.WaitGroup +} + +func (p *pool) Init(ctx context.Context, a *app.App) (err error) { + p.peersById = map[string]*peerEntry{} + p.handlers = map[syncproto.MessageType][]Handler{} + p.peersIdsByGroup = map[string][]string{} + p.waiters = waiters{waiters: map[uint64]*waiter{}} + p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer) + p.wg = &sync.WaitGroup{} + return nil +} + +func (p *pool) Name() (name string) { + return CName +} + +func (p *pool) Run(ctx context.Context) (err error) { + p.closed = false + return nil +} + +func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) { + p.mu.Lock() + defer p.mu.Unlock() + if !p.closed { + // unable to add handler after Run + return + } + p.handlers[msgType] = append(p.handlers[msgType], h) +} + +func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return ErrPoolClosed + } + if _, ok := p.peersById[peerId]; ok { + return nil + } + peer, err := p.dialer.Dial(ctx, peerId) + if err != nil { + return + } + p.peersById[peer.Id()] = &peerEntry{ + peer: peer, + } + p.wg.Add(1) + go p.readPeerLoop(peer) + return nil +} + +func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return ErrPoolClosed + } + p.peersById[peer.Id()] = &peerEntry{ + peer: peer, + } + p.wg.Add(1) + p.mu.Unlock() + return p.readPeerLoop(peer) +} + +func (p *pool) AddPeerIdToGroup(peerId, groupId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + peer, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + if slice.FindPos(peer.groupIds, groupId) != -1 { + return nil + } + peer.addGroup(groupId) + p.peersIdsByGroup[groupId] = append(p.peersIdsByGroup[groupId], peerId) + return +} + +func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + peer, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + if slice.FindPos(peer.groupIds, groupId) == -1 { + return nil + } + peer.removeGroup(groupId) + p.peersIdsByGroup[groupId] = slice.Remove(p.peersIdsByGroup[groupId], peerId) + return +} + +func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) { + p.mu.RLock() + peer := p.peersById[peerId] + p.mu.RUnlock() + if peer == nil { + return ErrPeerNotFound + } + repId := p.waiters.NewReplyId() + msg.GetHeader().RequestId = repId + ch := make(chan Reply, 1) + p.waiters.Add(repId, &waiter{ch: ch}) + defer p.waiters.Remove(repId) + if err = peer.peer.Send(msg); err != nil { + return + } + select { + case rep := <-ch: + if rep.Error != nil { + return rep.Error + } + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) { + //TODO implement me + panic("implement me") +} + +func (p *pool) readPeerLoop(peer peer.Peer) (err error) { + defer p.wg.Done() + for { + msg, err := peer.Recv() + if err != nil { + log.Debug("peer receive error", zap.Error(err), zap.String("peerId", peer.Id())) + break + } + p.handleMessage(peer, msg) + } + if err = p.removePeer(peer.Id()); err != nil { + log.Error("remove peer error", zap.String("peerId", peer.Id())) + } + return +} + +func (p *pool) removePeer(peerId string) (err error) { + p.mu.Lock() + defer p.mu.Unlock() + _, ok := p.peersById[peerId] + if !ok { + return ErrPeerNotFound + } + delete(p.peersById, peerId) + return +} + +func (p *pool) handleMessage(peer peer.Peer, msg *syncproto.Message) { + replyId := msg.GetHeader().GetReplyId() + if replyId != 0 { + if !p.waiters.Send(replyId, Reply{ + PeerInfo: peer.Info(), + Message: &Message{ + Message: msg, + peer: peer, + }, + }) { + log.Debug("received reply with unknown (or expired) replyId", zap.Uint64("replyId", replyId)) + } + return + } + handlers := p.handlers[msg.GetHeader().GetType()] + if len(handlers) == 0 { + return + } + + message := &Message{Message: msg, peer: peer} + + for _, h := range handlers { + if err := h(peer.Context(), message); err != nil { + log.Error("handle message error", zap.Error(err)) + } + } +} + +func (p *pool) Close(ctx context.Context) (err error) { + p.mu.Lock() + for _, peer := range p.peersById { + peer.peer.Close() + } + wg := p.wg + p.mu.Unlock() + if wg != nil { + wg.Wait() + } + return nil +} + +type waiter struct { + sent int + ch chan<- Reply +} + +type waiters struct { + waiters map[uint64]*waiter + replySeq uint64 + mu sync.Mutex +} + +func (w waiters) Send(replyId uint64, r Reply) (ok bool) { + w.mu.Lock() + wait := w.waiters[replyId] + if wait == nil { + w.mu.Unlock() + return false + } + wait.sent++ + var lastMessage = wait.sent == cap(wait.ch) + if lastMessage { + delete(w.waiters, replyId) + } + w.mu.Unlock() + wait.ch <- r + if lastMessage { + close(wait.ch) + } + return true +} + +func (w waiters) Add(replyId uint64, wait *waiter) { + w.mu.Lock() + w.waiters[replyId] = wait + w.mu.Unlock() +} + +func (w waiters) Remove(id uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + if _, ok := w.waiters[id]; ok { + delete(w.waiters, id) + return nil + } + return fmt.Errorf("waiter not found") +} + +func (w waiters) NewReplyId() uint64 { + res := atomic.AddUint64(&w.replySeq, 1) + if res == 0 { + return w.NewReplyId() + } + return res +} diff --git a/service/net/pool/request.go b/service/net/pool/request.go new file mode 100644 index 00000000..8584f0dd --- /dev/null +++ b/service/net/pool/request.go @@ -0,0 +1,45 @@ +package pool + +import "context" + +// 1. message for one peerId with ack +// pool.SendAndWait(ctx context,.C +// 2. message for many peers without ack (or group) + +type Request struct { + groupId string + oneOf []string + all []string + tryDial bool + needReply bool + pool *pool +} + +func (r *Request) GroupId(groupId string) *Request { + r.groupId = groupId + return r +} + +func (r *Request) All(peerIds ...string) *Request { + r.all = peerIds + return r +} + +func (r *Request) OneOf(peerIds ...string) *Request { + r.oneOf = peerIds + return r +} + +func (r *Request) TryDial(is bool) *Request { + r.tryDial = is + return r +} + +func (r *Request) NeedReply(is bool) *Request { + r.needReply = is + return r +} + +func (r *Request) Exec(ctx context.Context, msg *Message) *Results { + return nil +} diff --git a/service/net/pool/result.go b/service/net/pool/result.go new file mode 100644 index 00000000..94f1ac93 --- /dev/null +++ b/service/net/pool/result.go @@ -0,0 +1,53 @@ +package pool + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" +) + +// Results of request collects replies and errors +// Must be closed after usage r.Close() +type Results struct { + ctx context.Context + cancel func() + waiterId uint64 + ch chan Reply + pool *pool +} + +// Iterate iterates over replies +// if callback will return a non-nil error then iteration stops +func (r *Results) Iterate(callback func(r Reply) (err error)) (err error) { + if r.ctx == nil || r.ch == nil { + return fmt.Errorf("results not initialized") + } + for { + select { + case <-r.ctx.Done(): + return r.ctx.Err() + case m, ok := <-r.ch: + if ok { + if err = callback(m); err != nil { + return err + } + } else { + return + } + } + } +} + +// Close cancels iteration and unregister reply handler in the pool +// Required to call to avoid memory leaks +func (r *Results) Close() (err error) { + r.cancel() + return r.pool.waiters.Remove(r.waiterId) +} + +// Reply presents the result of request executing can be error or result message +type Reply struct { + PeerInfo peer.Info + Error error + Message *Message +} diff --git a/service/net/rpc/encoding.go b/service/net/rpc/encoding.go new file mode 100644 index 00000000..eb983b9d --- /dev/null +++ b/service/net/rpc/encoding.go @@ -0,0 +1,18 @@ +package rpc + +import ( + "github.com/gogo/protobuf/proto" + "storj.io/drpc" +) + +var Encoding = enc{} + +type enc struct{} + +func (e enc) Marshal(msg drpc.Message) ([]byte, error) { + return msg.(proto.Marshaler).Marshal() +} + +func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { + return msg.(proto.Unmarshaler).Unmarshal(buf) +} diff --git a/service/sync/drpcserver/drpcserver.go b/service/net/rpc/server/drpcserver.go similarity index 55% rename from service/sync/drpcserver/drpcserver.go rename to service/net/rpc/server/drpcserver.go index 90613d1a..dcdf2156 100644 --- a/service/sync/drpcserver/drpcserver.go +++ b/service/net/rpc/server/drpcserver.go @@ -1,27 +1,24 @@ -package drpcserver +package server import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport" - "github.com/gogo/protobuf/proto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "go.uber.org/zap" - "io" "net" "storj.io/drpc" "storj.io/drpc/drpcserver" "strings" - "sync" "time" ) -var log = logger.NewNamed("drpcserver") +const CName = "net/drpcserver" -const CName = "drpcserver" +var log = logger.NewNamed(CName) func New() DRPCServer { return &drpcServer{} @@ -32,19 +29,18 @@ type DRPCServer interface { } type drpcServer struct { - config config.GrpcServer - drpcServer *drpcserver.Server - transport transport.Service - listeners []transport.ContextListener - messageService message.Service - - cancel func() + config config.GrpcServer + drpcServer *drpcserver.Server + transport secure.Service + listeners []secure.ContextListener + pool pool.Pool + cancel func() } func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) { s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer - s.transport = a.MustComponent(transport.CName).(transport.Service) - s.messageService = a.MustComponent(message.CName).(message.Service) + s.transport = a.MustComponent(secure.CName).(secure.Service) + s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -66,7 +62,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { return } -func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) { +func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) { l := log.With(zap.String("localAddr", lis.Addr().String())) l.Info("drpc listener started") defer func() { @@ -90,7 +86,7 @@ func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) { } continue } - if _, ok := err.(transport.HandshakeError); ok { + if _, ok := err.(secure.HandshakeError); ok { l.Warn("listener handshake error", zap.Error(err)) continue } @@ -113,29 +109,14 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) { } } -func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) { +func (s *drpcServer) HandleRPC(stream drpc.Stream, _ string) (err error) { ctx := stream.Context() - sc, err := transport.CtxSecureConn(ctx) + sc, err := secure.CtxSecureConn(ctx) if err != nil { return } - peerId := sc.RemotePeer().String() - l := log.With(zap.String("peer", peerId)) - l.Info("stream opened") - defer func() { - l.Info("stream closed", zap.Error(err)) - }() - - ch := s.messageService.RegisterMessageSender(peerId) - defer s.messageService.UnregisterMessageSender(peerId) - - wg := &sync.WaitGroup{} - wg.Add(2) - go s.sendMessages(stream, wg, ch) - go s.receiveMessages(stream, wg, peerId) - wg.Wait() - - return nil + log.With(zap.String("peer", sc.RemotePeer().String())).Debug("stream opened") + return s.pool.AddAndReadPeer(rpc.PeerFromStream(sc, stream, true)) } func (s *drpcServer) Close(ctx context.Context) (err error) { @@ -149,43 +130,3 @@ func (s *drpcServer) Close(ctx context.Context) (err error) { } return } - -func (s *drpcServer) sendMessages(stream drpc.Stream, wg *sync.WaitGroup, ch chan *syncpb.SyncContent) { - defer wg.Done() - for { - select { - case msg := <-ch: - if err := stream.MsgSend(msg, enc{}); err != nil { - return - } - case <-stream.Context().Done(): - return - } - } -} - -func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, peerId string) { - defer wg.Done() - for { - msg := &syncpb.SyncContent{} - if err := stream.MsgRecv(msg, enc{}); err != nil { - if err == io.EOF { - return - } - } - err := s.messageService.HandleMessage(peerId, msg) - if err != nil { - log.Error("error handling message", zap.Error(err)) - } - } -} - -type enc struct{} - -func (e enc) Marshal(msg drpc.Message) ([]byte, error) { - return msg.(proto.Marshaler).Marshal() -} - -func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { - return msg.(proto.Unmarshaler).Unmarshal(buf) -} diff --git a/service/sync/drpcserver/util.go b/service/net/rpc/server/util.go similarity index 92% rename from service/sync/drpcserver/util.go rename to service/net/rpc/server/util.go index d84358eb..5852288a 100644 --- a/service/sync/drpcserver/util.go +++ b/service/net/rpc/server/util.go @@ -1,6 +1,6 @@ //go:build !windows -package drpcserver +package server import ( "errors" diff --git a/service/sync/drpcserver/util_windows.go b/service/net/rpc/server/util_windows.go similarity index 97% rename from service/sync/drpcserver/util_windows.go rename to service/net/rpc/server/util_windows.go index 3acba434..efef2915 100644 --- a/service/sync/drpcserver/util_windows.go +++ b/service/net/rpc/server/util_windows.go @@ -1,6 +1,6 @@ //go:build windows -package drpcserver +package server import ( "errors" diff --git a/service/net/rpc/stream.go b/service/net/rpc/stream.go new file mode 100644 index 00000000..c05691da --- /dev/null +++ b/service/net/rpc/stream.go @@ -0,0 +1,55 @@ +package rpc + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "github.com/libp2p/go-libp2p-core/sec" + "storj.io/drpc" + "sync/atomic" + "time" +) + +func PeerFromStream(sc sec.SecureConn, stream drpc.Stream, incoming bool) peer.Peer { + dp := &drpcPeer{ + sc: sc, + Stream: stream, + } + dp.info.Id = sc.RemotePeer().String() + if incoming { + dp.info.Dir = peer.DirInbound + } else { + dp.info.Dir = peer.DirOutbound + } + return dp +} + +type drpcPeer struct { + sc sec.SecureConn + info peer.Info + drpc.Stream +} + +func (d *drpcPeer) Id() string { + return d.info.Id +} + +func (d *drpcPeer) Info() peer.Info { + return d.info +} + +func (d *drpcPeer) Recv() (msg *syncproto.Message, err error) { + msg = &syncproto.Message{} + if err = d.Stream.MsgRecv(msg, Encoding); err != nil { + return + } + atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix()) + return +} + +func (d *drpcPeer) Send(msg *syncproto.Message) (err error) { + if err = d.Stream.MsgSend(msg, Encoding); err != nil { + return + } + atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix()) + return +} diff --git a/service/sync/transport/context.go b/service/net/secure/context.go similarity index 97% rename from service/sync/transport/context.go rename to service/net/secure/context.go index f30bab39..c38da959 100644 --- a/service/sync/transport/context.go +++ b/service/net/secure/context.go @@ -1,4 +1,4 @@ -package transport +package secure import ( "context" diff --git a/service/sync/transport/listener.go b/service/net/secure/listener.go similarity index 98% rename from service/sync/transport/listener.go rename to service/net/secure/listener.go index 97e9e6f4..a0c61d56 100644 --- a/service/sync/transport/listener.go +++ b/service/net/secure/listener.go @@ -1,4 +1,4 @@ -package transport +package secure import ( "context" diff --git a/service/sync/transport/transport.go b/service/net/secure/service.go similarity index 56% rename from service/sync/transport/transport.go rename to service/net/secure/service.go index 13f77282..b61c2f86 100644 --- a/service/sync/transport/transport.go +++ b/service/net/secure/service.go @@ -1,11 +1,13 @@ -package transport +package secure import ( "context" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/sec" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "go.uber.org/zap" @@ -14,9 +16,9 @@ import ( type HandshakeError error -var log = logger.NewNamed("transport") +const CName = "net/secure" -const CName = "transport" +var log = logger.NewNamed(CName) func New() Service { return &service{} @@ -33,20 +35,39 @@ type service struct { } func (s *service) Init(ctx context.Context, a *app.App) (err error) { - acc := a.MustComponent(account.CName).(account.Service) - rawKey, err := acc.Account().SignKey.Raw() + peerConf := a.MustComponent(config.CName).(*config.Config).PeerList + pkb, err := crypto.ConfigDecodeKey(peerConf.MyId.PrivKey) if err != nil { - return err + return + } + if s.key, err = crypto.UnmarshalEd25519PrivateKey(pkb); err != nil { + return } - // converting into libp2p crypto structure - s.key, err = crypto.UnmarshalEd25519PrivateKey(rawKey) + pid, err := peer.Decode(peerConf.MyId.PeerId) if err != nil { - return err + return } - pubKeyRaw, _ := s.key.GetPublic().Raw() - log.Info("transport keys generated", zap.Binary("pubKey", pubKeyRaw)) + var testData = []byte("test data") + sign, err := s.key.Sign(testData) + if err != nil { + return + } + pubKey, err := pid.ExtractPublicKey() + if err != nil { + return + } + ok, err := pubKey.Verify(testData, sign) + if err != nil { + return + } + if !ok { + return fmt.Errorf("peerId and privateKey mismatched") + } + + log.Info("secure service init", zap.String("peerId", peerConf.MyId.PeerId)) + return nil } diff --git a/syncproto/proto/sync.proto b/syncproto/proto/sync.proto index 51b2f3bb..d633afd3 100644 --- a/syncproto/proto/sync.proto +++ b/syncproto/proto/sync.proto @@ -2,6 +2,63 @@ syntax = "proto3"; package anytype; option go_package = "/syncproto"; -message SyncMessage { - int64 seq = 1; -} \ No newline at end of file +message Message { + Header header = 1; + bytes data = 2; +} + +message Header { + bytes traceId = 1; + uint64 requestId = 2; + uint64 replyId = 3; + MessageType type = 4; +} + +enum MessageType { + MessageTypeSystem = 0; + MessageTypeSubscription = 1; + MessageTypeSync = 2; +} + + +message System { + Handshake handshake = 1; + Ping ping = 2; + Ack ack = 3; + + message Handshake { + string protocolVersion = 1; + } + message Ping { + uint64 unixTime = 1; + } + message Ack { + Error error = 2; + } + message Error { + Code code = 1; + string description = 2; + + enum Code { + UNKNOWN = 0; + UNSUPPORTED_PROTOCOL_VERSION = 10; + } + } +} + +message Subscription { + SubscribeSpace subscribeSpace = 1; + UnsubscribeSpace unsubscribeSpace = 2; + + message SubscribeSpace { + string spaceId = 1; + } + message UnsubscribeSpace { + string spaceId = 1; + } +} + +message Sync { + string spaceId = 1; + +} diff --git a/syncproto/sync.pb.go b/syncproto/sync.pb.go index aa3947c8..d673a859 100644 --- a/syncproto/sync.pb.go +++ b/syncproto/sync.pb.go @@ -22,22 +22,76 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type SyncMessage struct { - Seq int64 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"` +type MessageType int32 + +const ( + MessageType_MessageTypeSystem MessageType = 0 + MessageType_MessageTypeSubscription MessageType = 1 + MessageType_MessageTypeSync MessageType = 2 +) + +var MessageType_name = map[int32]string{ + 0: "MessageTypeSystem", + 1: "MessageTypeSubscription", + 2: "MessageTypeSync", } -func (m *SyncMessage) Reset() { *m = SyncMessage{} } -func (m *SyncMessage) String() string { return proto.CompactTextString(m) } -func (*SyncMessage) ProtoMessage() {} -func (*SyncMessage) Descriptor() ([]byte, []int) { +var MessageType_value = map[string]int32{ + "MessageTypeSystem": 0, + "MessageTypeSubscription": 1, + "MessageTypeSync": 2, +} + +func (x MessageType) String() string { + return proto.EnumName(MessageType_name, int32(x)) +} + +func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_4b28dfdd48a89166, []int{0} } -func (m *SyncMessage) XXX_Unmarshal(b []byte) error { + +type SystemErrorCode int32 + +const ( + SystemError_UNKNOWN SystemErrorCode = 0 + SystemError_UNSUPPORTED_PROTOCOL_VERSION SystemErrorCode = 10 +) + +var SystemErrorCode_name = map[int32]string{ + 0: "UNKNOWN", + 10: "UNSUPPORTED_PROTOCOL_VERSION", +} + +var SystemErrorCode_value = map[string]int32{ + "UNKNOWN": 0, + "UNSUPPORTED_PROTOCOL_VERSION": 10, +} + +func (x SystemErrorCode) String() string { + return proto.EnumName(SystemErrorCode_name, int32(x)) +} + +func (SystemErrorCode) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 3, 0} +} + +type Message struct { + Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{0} +} +func (m *Message) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *SyncMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_SyncMessage.Marshal(b, m, deterministic) + return xxx_messageInfo_Message.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -47,44 +101,588 @@ func (m *SyncMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (m *SyncMessage) XXX_Merge(src proto.Message) { - xxx_messageInfo_SyncMessage.Merge(m, src) +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) } -func (m *SyncMessage) XXX_Size() int { +func (m *Message) XXX_Size() int { return m.Size() } -func (m *SyncMessage) XXX_DiscardUnknown() { - xxx_messageInfo_SyncMessage.DiscardUnknown(m) +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) } -var xxx_messageInfo_SyncMessage proto.InternalMessageInfo +var xxx_messageInfo_Message proto.InternalMessageInfo -func (m *SyncMessage) GetSeq() int64 { +func (m *Message) GetHeader() *Header { if m != nil { - return m.Seq + return m.Header + } + return nil +} + +func (m *Message) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +type Header struct { + TraceId []byte `protobuf:"bytes,1,opt,name=traceId,proto3" json:"traceId,omitempty"` + RequestId uint64 `protobuf:"varint,2,opt,name=requestId,proto3" json:"requestId,omitempty"` + ReplyId uint64 `protobuf:"varint,3,opt,name=replyId,proto3" json:"replyId,omitempty"` + Type MessageType `protobuf:"varint,4,opt,name=type,proto3,enum=anytype.MessageType" json:"type,omitempty"` +} + +func (m *Header) Reset() { *m = Header{} } +func (m *Header) String() string { return proto.CompactTextString(m) } +func (*Header) ProtoMessage() {} +func (*Header) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{1} +} +func (m *Header) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Header.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Header) XXX_Merge(src proto.Message) { + xxx_messageInfo_Header.Merge(m, src) +} +func (m *Header) XXX_Size() int { + return m.Size() +} +func (m *Header) XXX_DiscardUnknown() { + xxx_messageInfo_Header.DiscardUnknown(m) +} + +var xxx_messageInfo_Header proto.InternalMessageInfo + +func (m *Header) GetTraceId() []byte { + if m != nil { + return m.TraceId + } + return nil +} + +func (m *Header) GetRequestId() uint64 { + if m != nil { + return m.RequestId } return 0 } +func (m *Header) GetReplyId() uint64 { + if m != nil { + return m.ReplyId + } + return 0 +} + +func (m *Header) GetType() MessageType { + if m != nil { + return m.Type + } + return MessageType_MessageTypeSystem +} + +type System struct { + Handshake *SystemHandshake `protobuf:"bytes,1,opt,name=handshake,proto3" json:"handshake,omitempty"` + Ping *SystemPing `protobuf:"bytes,2,opt,name=ping,proto3" json:"ping,omitempty"` + Ack *SystemAck `protobuf:"bytes,3,opt,name=ack,proto3" json:"ack,omitempty"` +} + +func (m *System) Reset() { *m = System{} } +func (m *System) String() string { return proto.CompactTextString(m) } +func (*System) ProtoMessage() {} +func (*System) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2} +} +func (m *System) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *System) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_System.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *System) XXX_Merge(src proto.Message) { + xxx_messageInfo_System.Merge(m, src) +} +func (m *System) XXX_Size() int { + return m.Size() +} +func (m *System) XXX_DiscardUnknown() { + xxx_messageInfo_System.DiscardUnknown(m) +} + +var xxx_messageInfo_System proto.InternalMessageInfo + +func (m *System) GetHandshake() *SystemHandshake { + if m != nil { + return m.Handshake + } + return nil +} + +func (m *System) GetPing() *SystemPing { + if m != nil { + return m.Ping + } + return nil +} + +func (m *System) GetAck() *SystemAck { + if m != nil { + return m.Ack + } + return nil +} + +type SystemHandshake struct { + ProtocolVersion string `protobuf:"bytes,1,opt,name=protocolVersion,proto3" json:"protocolVersion,omitempty"` +} + +func (m *SystemHandshake) Reset() { *m = SystemHandshake{} } +func (m *SystemHandshake) String() string { return proto.CompactTextString(m) } +func (*SystemHandshake) ProtoMessage() {} +func (*SystemHandshake) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 0} +} +func (m *SystemHandshake) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemHandshake) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemHandshake.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemHandshake) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemHandshake.Merge(m, src) +} +func (m *SystemHandshake) XXX_Size() int { + return m.Size() +} +func (m *SystemHandshake) XXX_DiscardUnknown() { + xxx_messageInfo_SystemHandshake.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemHandshake proto.InternalMessageInfo + +func (m *SystemHandshake) GetProtocolVersion() string { + if m != nil { + return m.ProtocolVersion + } + return "" +} + +type SystemPing struct { + UnixTime uint64 `protobuf:"varint,1,opt,name=unixTime,proto3" json:"unixTime,omitempty"` +} + +func (m *SystemPing) Reset() { *m = SystemPing{} } +func (m *SystemPing) String() string { return proto.CompactTextString(m) } +func (*SystemPing) ProtoMessage() {} +func (*SystemPing) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 1} +} +func (m *SystemPing) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemPing) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemPing.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemPing) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemPing.Merge(m, src) +} +func (m *SystemPing) XXX_Size() int { + return m.Size() +} +func (m *SystemPing) XXX_DiscardUnknown() { + xxx_messageInfo_SystemPing.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemPing proto.InternalMessageInfo + +func (m *SystemPing) GetUnixTime() uint64 { + if m != nil { + return m.UnixTime + } + return 0 +} + +type SystemAck struct { + Error *SystemError `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` +} + +func (m *SystemAck) Reset() { *m = SystemAck{} } +func (m *SystemAck) String() string { return proto.CompactTextString(m) } +func (*SystemAck) ProtoMessage() {} +func (*SystemAck) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 2} +} +func (m *SystemAck) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemAck.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemAck) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemAck.Merge(m, src) +} +func (m *SystemAck) XXX_Size() int { + return m.Size() +} +func (m *SystemAck) XXX_DiscardUnknown() { + xxx_messageInfo_SystemAck.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemAck proto.InternalMessageInfo + +func (m *SystemAck) GetError() *SystemError { + if m != nil { + return m.Error + } + return nil +} + +type SystemError struct { + Code SystemErrorCode `protobuf:"varint,1,opt,name=code,proto3,enum=anytype.SystemErrorCode" json:"code,omitempty"` + Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"` +} + +func (m *SystemError) Reset() { *m = SystemError{} } +func (m *SystemError) String() string { return proto.CompactTextString(m) } +func (*SystemError) ProtoMessage() {} +func (*SystemError) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{2, 3} +} +func (m *SystemError) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SystemError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SystemError.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SystemError) XXX_Merge(src proto.Message) { + xxx_messageInfo_SystemError.Merge(m, src) +} +func (m *SystemError) XXX_Size() int { + return m.Size() +} +func (m *SystemError) XXX_DiscardUnknown() { + xxx_messageInfo_SystemError.DiscardUnknown(m) +} + +var xxx_messageInfo_SystemError proto.InternalMessageInfo + +func (m *SystemError) GetCode() SystemErrorCode { + if m != nil { + return m.Code + } + return SystemError_UNKNOWN +} + +func (m *SystemError) GetDescription() string { + if m != nil { + return m.Description + } + return "" +} + +type Subscription struct { + SubscribeSpace *SubscriptionSubscribeSpace `protobuf:"bytes,1,opt,name=subscribeSpace,proto3" json:"subscribeSpace,omitempty"` + UnsubscribeSpace *SubscriptionUnsubscribeSpace `protobuf:"bytes,2,opt,name=unsubscribeSpace,proto3" json:"unsubscribeSpace,omitempty"` +} + +func (m *Subscription) Reset() { *m = Subscription{} } +func (m *Subscription) String() string { return proto.CompactTextString(m) } +func (*Subscription) ProtoMessage() {} +func (*Subscription) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3} +} +func (m *Subscription) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Subscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Subscription.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Subscription) XXX_Merge(src proto.Message) { + xxx_messageInfo_Subscription.Merge(m, src) +} +func (m *Subscription) XXX_Size() int { + return m.Size() +} +func (m *Subscription) XXX_DiscardUnknown() { + xxx_messageInfo_Subscription.DiscardUnknown(m) +} + +var xxx_messageInfo_Subscription proto.InternalMessageInfo + +func (m *Subscription) GetSubscribeSpace() *SubscriptionSubscribeSpace { + if m != nil { + return m.SubscribeSpace + } + return nil +} + +func (m *Subscription) GetUnsubscribeSpace() *SubscriptionUnsubscribeSpace { + if m != nil { + return m.UnsubscribeSpace + } + return nil +} + +type SubscriptionSubscribeSpace struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *SubscriptionSubscribeSpace) Reset() { *m = SubscriptionSubscribeSpace{} } +func (m *SubscriptionSubscribeSpace) String() string { return proto.CompactTextString(m) } +func (*SubscriptionSubscribeSpace) ProtoMessage() {} +func (*SubscriptionSubscribeSpace) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3, 0} +} +func (m *SubscriptionSubscribeSpace) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscriptionSubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscriptionSubscribeSpace.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscriptionSubscribeSpace) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscriptionSubscribeSpace.Merge(m, src) +} +func (m *SubscriptionSubscribeSpace) XXX_Size() int { + return m.Size() +} +func (m *SubscriptionSubscribeSpace) XXX_DiscardUnknown() { + xxx_messageInfo_SubscriptionSubscribeSpace.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscriptionSubscribeSpace proto.InternalMessageInfo + +func (m *SubscriptionSubscribeSpace) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +type SubscriptionUnsubscribeSpace struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *SubscriptionUnsubscribeSpace) Reset() { *m = SubscriptionUnsubscribeSpace{} } +func (m *SubscriptionUnsubscribeSpace) String() string { return proto.CompactTextString(m) } +func (*SubscriptionUnsubscribeSpace) ProtoMessage() {} +func (*SubscriptionUnsubscribeSpace) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{3, 1} +} +func (m *SubscriptionUnsubscribeSpace) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscriptionUnsubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscriptionUnsubscribeSpace.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscriptionUnsubscribeSpace) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscriptionUnsubscribeSpace.Merge(m, src) +} +func (m *SubscriptionUnsubscribeSpace) XXX_Size() int { + return m.Size() +} +func (m *SubscriptionUnsubscribeSpace) XXX_DiscardUnknown() { + xxx_messageInfo_SubscriptionUnsubscribeSpace.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscriptionUnsubscribeSpace proto.InternalMessageInfo + +func (m *SubscriptionUnsubscribeSpace) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + +type Sync struct { + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` +} + +func (m *Sync) Reset() { *m = Sync{} } +func (m *Sync) String() string { return proto.CompactTextString(m) } +func (*Sync) ProtoMessage() {} +func (*Sync) Descriptor() ([]byte, []int) { + return fileDescriptor_4b28dfdd48a89166, []int{4} +} +func (m *Sync) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Sync) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Sync.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Sync) XXX_Merge(src proto.Message) { + xxx_messageInfo_Sync.Merge(m, src) +} +func (m *Sync) XXX_Size() int { + return m.Size() +} +func (m *Sync) XXX_DiscardUnknown() { + xxx_messageInfo_Sync.DiscardUnknown(m) +} + +var xxx_messageInfo_Sync proto.InternalMessageInfo + +func (m *Sync) GetSpaceId() string { + if m != nil { + return m.SpaceId + } + return "" +} + func init() { - proto.RegisterType((*SyncMessage)(nil), "anytype.SyncMessage") + proto.RegisterEnum("anytype.MessageType", MessageType_name, MessageType_value) + proto.RegisterEnum("anytype.SystemErrorCode", SystemErrorCode_name, SystemErrorCode_value) + proto.RegisterType((*Message)(nil), "anytype.Message") + proto.RegisterType((*Header)(nil), "anytype.Header") + proto.RegisterType((*System)(nil), "anytype.System") + proto.RegisterType((*SystemHandshake)(nil), "anytype.System.Handshake") + proto.RegisterType((*SystemPing)(nil), "anytype.System.Ping") + proto.RegisterType((*SystemAck)(nil), "anytype.System.Ack") + proto.RegisterType((*SystemError)(nil), "anytype.System.Error") + proto.RegisterType((*Subscription)(nil), "anytype.Subscription") + proto.RegisterType((*SubscriptionSubscribeSpace)(nil), "anytype.Subscription.SubscribeSpace") + proto.RegisterType((*SubscriptionUnsubscribeSpace)(nil), "anytype.Subscription.UnsubscribeSpace") + proto.RegisterType((*Sync)(nil), "anytype.Sync") } func init() { proto.RegisterFile("syncproto/proto/sync.proto", fileDescriptor_4b28dfdd48a89166) } var fileDescriptor_4b28dfdd48a89166 = []byte{ - // 119 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2a, 0xae, 0xcc, 0x4b, - 0x2e, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x87, 0x90, 0x20, 0xbe, 0x1e, 0x98, 0x29, 0xc4, 0x9e, 0x98, - 0x57, 0x59, 0x52, 0x59, 0x90, 0xaa, 0x24, 0xcf, 0xc5, 0x1d, 0x5c, 0x99, 0x97, 0xec, 0x9b, 0x5a, - 0x5c, 0x9c, 0x98, 0x9e, 0x2a, 0x24, 0xc0, 0xc5, 0x5c, 0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8, - 0xc1, 0x1c, 0x04, 0x62, 0x3a, 0xa9, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, - 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, - 0x14, 0x97, 0x3e, 0xdc, 0x82, 0x24, 0x36, 0x30, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x7e, - 0xbc, 0x46, 0xd2, 0x74, 0x00, 0x00, 0x00, + // 577 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x94, 0x51, 0x6f, 0xd2, 0x50, + 0x14, 0xc7, 0x29, 0x74, 0x20, 0xa7, 0x04, 0xea, 0x9d, 0x8b, 0x58, 0x97, 0xa6, 0x69, 0xa6, 0x92, + 0x69, 0xba, 0x04, 0xb3, 0xf8, 0x3c, 0x27, 0x06, 0xe2, 0xa4, 0xe4, 0x16, 0x30, 0xf1, 0x65, 0x29, + 0xb7, 0x37, 0xd0, 0xb0, 0xb5, 0xb5, 0x2d, 0x89, 0x7d, 0xf7, 0xc9, 0xa7, 0x7d, 0x18, 0x3f, 0x84, + 0x8f, 0x7b, 0xf4, 0xd1, 0xc0, 0x17, 0x31, 0xbd, 0x6d, 0xa1, 0xab, 0xdb, 0x0b, 0xdc, 0xff, 0xb9, + 0xbf, 0xff, 0x39, 0xff, 0xcb, 0xbd, 0x01, 0xa4, 0x20, 0x72, 0x88, 0xe7, 0xbb, 0xa1, 0x7b, 0x92, + 0x7c, 0xc6, 0x5a, 0x63, 0x4b, 0x54, 0x33, 0x9d, 0x28, 0x8c, 0x3c, 0xaa, 0x7e, 0x84, 0xda, 0x67, + 0x1a, 0x04, 0xe6, 0x9c, 0xa2, 0x57, 0x50, 0x5d, 0x50, 0xd3, 0xa2, 0x7e, 0x9b, 0x53, 0xb8, 0x8e, + 0xd0, 0x6d, 0x69, 0x29, 0xa4, 0xf5, 0x59, 0x19, 0xa7, 0xdb, 0x08, 0x01, 0x6f, 0x99, 0xa1, 0xd9, + 0x2e, 0x2b, 0x5c, 0xa7, 0x81, 0xd9, 0x5a, 0xfd, 0xc1, 0x41, 0x35, 0xc1, 0x50, 0x1b, 0x6a, 0xa1, + 0x6f, 0x12, 0x3a, 0xb0, 0x58, 0xa3, 0x06, 0xce, 0x24, 0x3a, 0x84, 0xba, 0x4f, 0xbf, 0xad, 0x68, + 0x10, 0x0e, 0x2c, 0xe6, 0xe6, 0xf1, 0xae, 0x10, 0xfb, 0x7c, 0xea, 0x5d, 0x45, 0x03, 0xab, 0x5d, + 0x61, 0x7b, 0x99, 0x44, 0x1d, 0xe0, 0xe3, 0x1c, 0x6d, 0x5e, 0xe1, 0x3a, 0xcd, 0xee, 0x93, 0x6d, + 0xae, 0x34, 0xf9, 0x38, 0xf2, 0x28, 0x66, 0x84, 0xfa, 0xab, 0x02, 0x55, 0x23, 0x0a, 0x42, 0x7a, + 0x8d, 0xde, 0x41, 0x7d, 0x61, 0x3a, 0x56, 0xb0, 0x30, 0x97, 0x34, 0x3d, 0xd1, 0xb3, 0xad, 0x33, + 0x61, 0xb4, 0x7e, 0x06, 0xe0, 0x1d, 0x1b, 0x4f, 0xf3, 0x6c, 0x67, 0xce, 0x02, 0x0a, 0xb9, 0x69, + 0xa9, 0x67, 0x64, 0x3b, 0x73, 0xcc, 0x08, 0xf4, 0x02, 0x2a, 0x26, 0x59, 0xb2, 0xb4, 0x42, 0x77, + 0xbf, 0x08, 0x9e, 0x91, 0x25, 0x8e, 0xf7, 0xa5, 0x53, 0xa8, 0xf7, 0x73, 0xdd, 0x5b, 0xec, 0x0a, + 0x88, 0x7b, 0x35, 0xa5, 0x7e, 0x60, 0xbb, 0x0e, 0x0b, 0x57, 0xc7, 0xc5, 0xb2, 0xa4, 0x02, 0x1f, + 0xcf, 0x42, 0x12, 0x3c, 0x5a, 0x39, 0xf6, 0xf7, 0xb1, 0x7d, 0x9d, 0x9c, 0x83, 0xc7, 0x5b, 0x2d, + 0x75, 0xa1, 0x72, 0x46, 0x96, 0xe8, 0x35, 0xec, 0x51, 0xdf, 0x77, 0xfd, 0x34, 0xf3, 0x41, 0x31, + 0x4a, 0x2f, 0xde, 0xc4, 0x09, 0x23, 0xdd, 0x70, 0xb0, 0xc7, 0x0a, 0x48, 0x03, 0x9e, 0xb8, 0x56, + 0xd2, 0xb5, 0xd9, 0x95, 0xee, 0x75, 0x69, 0xe7, 0xae, 0x45, 0x31, 0xe3, 0x90, 0x02, 0x82, 0x45, + 0x03, 0xe2, 0xdb, 0x5e, 0x18, 0xe7, 0x2e, 0xb3, 0xdc, 0xf9, 0x92, 0x7a, 0x0a, 0x7c, 0xcc, 0x23, + 0x01, 0x6a, 0x93, 0xe1, 0xa7, 0xa1, 0xfe, 0x65, 0x28, 0x96, 0x90, 0x02, 0x87, 0x93, 0xa1, 0x31, + 0x19, 0x8d, 0x74, 0x3c, 0xee, 0x7d, 0xb8, 0x1c, 0x61, 0x7d, 0xac, 0x9f, 0xeb, 0x17, 0x97, 0xd3, + 0x1e, 0x36, 0x06, 0xfa, 0x50, 0x04, 0xf5, 0x67, 0x19, 0x1a, 0xc6, 0x6a, 0xb6, 0xed, 0x83, 0x2e, + 0xa0, 0x19, 0x24, 0x7a, 0x46, 0x0d, 0xcf, 0x24, 0xd9, 0x0d, 0x1e, 0xed, 0x32, 0xe6, 0xf0, 0x4c, + 0xa4, 0x2c, 0x2e, 0x78, 0x11, 0x06, 0x71, 0xe5, 0x14, 0xfa, 0x25, 0xbf, 0xd4, 0xcb, 0xfb, 0xfb, + 0x4d, 0x0a, 0x34, 0xfe, 0xcf, 0x2f, 0x1d, 0x43, 0xf3, 0xee, 0xd4, 0xf8, 0xfd, 0x06, 0xde, 0xee, + 0xdd, 0xd7, 0x71, 0x26, 0xa5, 0x37, 0x20, 0x16, 0x3b, 0x3e, 0x4c, 0xab, 0x0a, 0xf0, 0x46, 0xe4, + 0x90, 0x87, 0x89, 0xe3, 0x29, 0x08, 0xb9, 0xa7, 0x8f, 0x0e, 0xe0, 0x71, 0x4e, 0x26, 0x97, 0x27, + 0x96, 0xd0, 0x73, 0x78, 0x9a, 0x2f, 0xe7, 0xce, 0x27, 0x72, 0x68, 0x1f, 0x5a, 0x77, 0x3c, 0x0e, + 0x11, 0xcb, 0xef, 0x8f, 0x7e, 0xaf, 0x65, 0xee, 0x76, 0x2d, 0x73, 0x7f, 0xd7, 0x32, 0x77, 0xb3, + 0x91, 0x4b, 0xb7, 0x1b, 0xb9, 0xf4, 0x67, 0x23, 0x97, 0xbe, 0xc2, 0xc9, 0xf6, 0xcf, 0x64, 0x56, + 0x65, 0x5f, 0x6f, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2f, 0x92, 0xfd, 0x60, 0x04, 0x00, + 0x00, } -func (m *SyncMessage) Marshal() (dAtA []byte, err error) { +func (m *Message) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -94,24 +692,407 @@ func (m *SyncMessage) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *SyncMessage) MarshalTo(dAtA []byte) (int, error) { +func (m *Message) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *SyncMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if m.Seq != 0 { - i = encodeVarintSync(dAtA, i, uint64(m.Seq)) + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = encodeVarintSync(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x12 + } + if m.Header != nil { + { + size, err := m.Header.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Header) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Header) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Header) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Type != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x20 + } + if m.ReplyId != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.ReplyId)) + i-- + dAtA[i] = 0x18 + } + if m.RequestId != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.RequestId)) + i-- + dAtA[i] = 0x10 + } + if len(m.TraceId) > 0 { + i -= len(m.TraceId) + copy(dAtA[i:], m.TraceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.TraceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *System) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *System) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *System) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Ack != nil { + { + size, err := m.Ack.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Ping != nil { + { + size, err := m.Ping.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Handshake != nil { + { + size, err := m.Handshake.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SystemHandshake) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemHandshake) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemHandshake) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ProtocolVersion) > 0 { + i -= len(m.ProtocolVersion) + copy(dAtA[i:], m.ProtocolVersion) + i = encodeVarintSync(dAtA, i, uint64(len(m.ProtocolVersion))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SystemPing) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemPing) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemPing) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.UnixTime != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.UnixTime)) i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil } +func (m *SystemAck) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemAck) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemAck) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Error != nil { + { + size, err := m.Error.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} + +func (m *SystemError) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SystemError) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SystemError) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Description) > 0 { + i -= len(m.Description) + copy(dAtA[i:], m.Description) + i = encodeVarintSync(dAtA, i, uint64(len(m.Description))) + i-- + dAtA[i] = 0x12 + } + if m.Code != 0 { + i = encodeVarintSync(dAtA, i, uint64(m.Code)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Subscription) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Subscription) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Subscription) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.UnsubscribeSpace != nil { + { + size, err := m.UnsubscribeSpace.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.SubscribeSpace != nil { + { + size, err := m.SubscribeSpace.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSync(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SubscriptionSubscribeSpace) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscriptionSubscribeSpace) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SubscriptionSubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SubscriptionUnsubscribeSpace) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscriptionUnsubscribeSpace) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SubscriptionUnsubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Sync) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Sync) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Sync) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SpaceId) > 0 { + i -= len(m.SpaceId) + copy(dAtA[i:], m.SpaceId) + i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintSync(dAtA []byte, offset int, v uint64) int { offset -= sovSync(v) base := offset @@ -123,14 +1104,172 @@ func encodeVarintSync(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *SyncMessage) Size() (n int) { +func (m *Message) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Seq != 0 { - n += 1 + sovSync(uint64(m.Seq)) + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovSync(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Header) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TraceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + if m.RequestId != 0 { + n += 1 + sovSync(uint64(m.RequestId)) + } + if m.ReplyId != 0 { + n += 1 + sovSync(uint64(m.ReplyId)) + } + if m.Type != 0 { + n += 1 + sovSync(uint64(m.Type)) + } + return n +} + +func (m *System) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Handshake != nil { + l = m.Handshake.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.Ping != nil { + l = m.Ping.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.Ack != nil { + l = m.Ack.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemHandshake) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ProtocolVersion) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemPing) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.UnixTime != 0 { + n += 1 + sovSync(uint64(m.UnixTime)) + } + return n +} + +func (m *SystemAck) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Error != nil { + l = m.Error.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SystemError) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Code != 0 { + n += 1 + sovSync(uint64(m.Code)) + } + l = len(m.Description) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Subscription) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SubscribeSpace != nil { + l = m.SubscribeSpace.Size() + n += 1 + l + sovSync(uint64(l)) + } + if m.UnsubscribeSpace != nil { + l = m.UnsubscribeSpace.Size() + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SubscriptionSubscribeSpace) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *SubscriptionUnsubscribeSpace) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) + } + return n +} + +func (m *Sync) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SpaceId) + if l > 0 { + n += 1 + l + sovSync(uint64(l)) } return n } @@ -141,7 +1280,7 @@ func sovSync(x uint64) (n int) { func sozSync(x uint64) (n int) { return sovSync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *SyncMessage) Unmarshal(dAtA []byte) error { +func (m *Message) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -164,17 +1303,17 @@ func (m *SyncMessage) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: SyncMessage: wiretype end group for non-group") + return fmt.Errorf("proto: Message: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: SyncMessage: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType) + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) } - m.Seq = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowSync @@ -184,11 +1323,1067 @@ func (m *SyncMessage) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Seq |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &Header{} + } + if err := m.Header.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Header) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Header: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Header: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TraceId", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TraceId = append(m.TraceId[:0], dAtA[iNdEx:postIndex]...) + if m.TraceId == nil { + m.TraceId = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestId", wireType) + } + m.RequestId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplyId", wireType) + } + m.ReplyId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReplyId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= MessageType(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *System) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: System: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: System: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Handshake", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Handshake == nil { + m.Handshake = &SystemHandshake{} + } + if err := m.Handshake.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ping", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Ping == nil { + m.Ping = &SystemPing{} + } + if err := m.Ping.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ack", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Ack == nil { + m.Ack = &SystemAck{} + } + if err := m.Ack.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemHandshake) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Handshake: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Handshake: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ProtocolVersion", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ProtocolVersion = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemPing) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Ping: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Ping: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UnixTime", wireType) + } + m.UnixTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UnixTime |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemAck) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Ack: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Ack: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Error == nil { + m.Error = &SystemError{} + } + if err := m.Error.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SystemError) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Error: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Error: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= SystemErrorCode(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Description = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Subscription) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Subscription: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Subscription: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SubscribeSpace", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SubscribeSpace == nil { + m.SubscribeSpace = &SubscriptionSubscribeSpace{} + } + if err := m.SubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UnsubscribeSpace", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.UnsubscribeSpace == nil { + m.UnsubscribeSpace = &SubscriptionUnsubscribeSpace{} + } + if err := m.UnsubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscriptionSubscribeSpace) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SubscribeSpace: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscriptionUnsubscribeSpace) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UnsubscribeSpace: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UnsubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Sync) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Sync: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Sync: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipSync(dAtA[iNdEx:])