net utils wip

This commit is contained in:
Sergey Cherepanov 2022-08-05 12:07:34 +03:00 committed by Mikhail Iudin
parent 2588697197
commit ebfdbdd508
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
30 changed files with 3264 additions and 378 deletions

View File

@ -1,169 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/sec"
"go.uber.org/zap"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"syscall"
"time"
)
var log = logger.NewNamed("client")
var (
flagConfigFile = flag.String("c", "etc/config.yml", "path to config file")
flagVersion = flag.Bool("v", false, "show version and exit")
flagHelp = flag.Bool("h", false, "show help and exit")
)
func main() {
flag.Parse()
if *flagVersion {
fmt.Println(app.VersionDescription())
return
}
if *flagHelp {
flag.PrintDefaults()
return
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
go func() {
http.ListenAndServe(debug, nil)
}()
}
// create app
ctx := context.Background()
a := new(app.App)
// open config file
conf, err := config.NewFromFile(*flagConfigFile)
if err != nil {
log.Fatal("can't open config file", zap.Error(err))
}
// bootstrap components
a.Register(conf).
Register(transport.New()).
Register(&Client{})
// start app
if err := a.Start(ctx); err != nil {
log.Fatal("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
// wait exit signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-exit
log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig)))
// close app
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := a.Close(ctx); err != nil {
log.Fatal("close error", zap.Error(err))
} else {
log.Info("goodbye!")
}
time.Sleep(time.Second / 3)
}
type Client struct {
conf config.GrpcServer
tr transport.Service
sc sec.SecureConn
}
func (c *Client) Init(ctx context.Context, a *app.App) (err error) {
c.tr = a.MustComponent(transport.CName).(transport.Service)
c.conf = a.MustComponent(config.CName).(*config.Config).GrpcServer
return nil
}
func (c *Client) Name() (name string) {
return "testClient"
}
func (c *Client) Run(ctx context.Context) (err error) {
tcpConn, err := net.Dial("tcp", c.conf.ListenAddrs[0])
if err != nil {
return
}
c.sc, err = c.tr.TLSConn(ctx, tcpConn)
if err != nil {
return
}
log.Info("connected with server", zap.String("serverPeer", c.sc.RemotePeer().String()), zap.String("per", c.sc.LocalPeer().String()))
dconn := drpcconn.New(c.sc)
stream, err := dconn.NewStream(ctx, "", enc{})
if err != nil {
return
}
go c.handleStream(stream)
return nil
}
func (c *Client) handleStream(stream drpc.Stream) {
var err error
defer func() {
log.Info("stream closed", zap.Error(err))
}()
var n int64 = 100000
for i := int64(0); i < n; i++ {
st := time.Now()
if err = stream.MsgSend(&syncproto.SyncMessage{Seq: i}, enc{}); err != nil {
if err == io.EOF {
return
}
log.Fatal("send error", zap.Error(err))
}
log.Debug("message sent", zap.Int64("seq", i))
msg := &syncproto.SyncMessage{}
if err := stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
log.Error("msg recv error", zap.Error(err))
}
log.Debug("message received", zap.Int64("seq", msg.Seq), zap.Duration("dur", time.Since(st)))
time.Sleep(time.Second)
}
}
func (c *Client) Close(ctx context.Context) (err error) {
if c.sc != nil {
return c.sc.Close()
}
return
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -7,15 +7,11 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/api"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/drpcserver"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/example"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
@ -89,13 +85,9 @@ func main() {
}
func Bootstrap(a *app.App) {
a.Register(account.New()).
Register(transport.New()).
Register(drpcserver.New()).
Register(node.New()).
Register(document.New()).
Register(message.New()).
Register(requesthandler.New()).
Register(treecache.New()).
Register(api.New())
a.Register(secure.New()).
Register(server.New()).
Register(dialer.New()).
Register(pool.NewPool()).
Register(&example.Example{})
}

View File

@ -26,9 +26,7 @@ func NewFromFile(path string) (c *Config, err error) {
type Config struct {
Anytype Anytype `yaml:"anytype"`
GrpcServer GrpcServer `yaml:"grpcServer"`
Account Account `yaml:"account"`
APIServer APIServer `yaml:"apiServer"`
Nodes []Node `yaml:"nodes"`
PeerList PeerList `yaml:"peerList"`
}
func (c *Config) Init(ctx context.Context, a *app.App) (err error) {

View File

@ -2,5 +2,4 @@ package config
type GrpcServer struct {
ListenAddrs []string `yaml:"listenAddrs"`
TLS bool `yaml:"tls"`
}

14
config/peer.go Normal file
View File

@ -0,0 +1,14 @@
package config
type PeerList struct {
MyId struct {
PeerId string `yaml:"peerId"`
PrivKey string `yaml:"privKey"`
} `yaml:"myId"`
Remote []PeerRemote `yaml:"remote"`
}
type PeerRemote struct {
PeerId string `yaml:"peerId"`
Addr string `yaml:"addr"`
}

16
etc/config.1.yml Normal file
View File

@ -0,0 +1,16 @@
anytype:
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- "127.0.0.1:4431"
peerList:
myId:
peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
privKey: "InCGjb55V9+jj2PebUExUuwrpOIBc4hmgk2dSqyk3k4DjmgrdoNVuFe7xCFaFdUVb0RJYj6A+OTp2yXASTmq2w=="
remote:
- peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
addr: "127.0.0.1:4430"
- peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
addr: "127.0.0.1:4432"

16
etc/config.2.yml Normal file
View File

@ -0,0 +1,16 @@
anytype:
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- "127.0.0.1:4432"
peerList:
myId:
peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
privKey: "jynYZBgtM4elT+6e7M5UERTJCZgUd3hDdmQjCqTpApyJ4h53V6TQan4Ru4OXqz+91rCLjpIVdphhaB0l+TvNsA=="
remote:
- peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
addr: "127.0.0.1:4431"
- peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
addr: "127.0.0.1:4430"

View File

@ -1,26 +1,16 @@
anytype:
swarmKey: /key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec
swarmKey: "/key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec"
grpcServer:
listenAddrs:
- 127.0.0.1:4430
- 127.0.0.1:4431
tls: false
account:
peerId: 12D3KooWPviVQZY23iXQU7eva7k4ojyScagRAqE8cXm6cTm6rhNs
signingKey: 3inYSp8q7WidZi7RkhLFNZPF9yMJXHpnyrTpTsE7cd2ggESfTAEpWiQNa8zCnNdzK9DpG6Mu5hDCZhfU6RMDL466H8om
encryptionKey: udNkNsbKtm7eu1Bqt6DUwLZka2Rq22vm33K5eezrMkGnXx8X13oQ8HBSHgkLmRZTC1737hgZ66s92QRNe1YC1v8JNiv6yXVuV3MujBkmrYYVbGB7DBz6REsGAMPmuqDiG5PubjVHP468MDJJYRTZN6VEJuSGZtEAyLBJe9iErRVFsMdNc6ZMbY5pxoYE6LHWbsfZejrnAXvgABTnYTN4ad6EaxRiEWHGACizr59uf3KYFb9hFwxkFMhenpTpxbkBwQp3V8MxV4eLbgsWQPMpfeGeinjmNRAnZfi6tnqSDLyy3RWM5y9W89rYKt3EdVRwcFWFHh8FgSjzhpqE77GEwjGU9ddKorUfW85jQGdd2xdFwQfLF2eTGEwSxX9Gz2fTTtkN4pNJC8MesKK2cRsLAZLb9rv5ebCMgQ3S86WjfbWRcEYLoKUWKjujiY1XT5GszTgSa2b68QpcDmqgUv7FiwcZwC5qaCsgoARZ2GGsPgW7mCBahxTJnor4dCLF9Aiz1YvFvwWzm8k9zUmY5pxPQFNSWfAAcodqEHJV6i9TSf5ERNLJCQiRTSStZZdoeqAf42sYCSdiaiVUc26Q7QcTxtxxpfCTqGc2Lgh6ofgwG1m2CfmhXTsBDwBFRc8q6TcGJ4fT4WuZXF9f4MhKHqgqppFKMWYyvtJyvypbJ735kU63UWWSMCo8BbVTRrtaq7bPHbgvHHWYso5o7PeqNuhD2YwTKd6wpbAeg8wpv3eEWB2sgotQ5JiW2itEyinLZHRpLvbtYGFWctsZisvBbCssiCsPqzG1DffZWNjt2nj1vztuapZLpz9azanCaXhFxLBBLVveEk4tvEknHtREwXtmTpQK5GqpmW9LTFqA5jj7fVtytYQFufe9Cp4HKNNodv1UCPQ1KTLwkwA6UEMwqamvE67GDDvjmE2XaMoq6ErEYX6FkQi2d9kbLJaFVGwYH3DVcbBGAJzSoeYceosuoQJBcKS19BhFqXWT1W3iKvMibadh8MYLim7ncxezuHFCyUPFnFn33kfzzTWTsFYbKDukCECRBKciptevx9zS6V2YiSgcYRWV7Rd8sNg8vAGUBQ6mLBtWNExn5gRkoBQDqN9BhfqpEbGo33JHb7GVLsN7um9VxKcTSkiDLPKTxijNevHeRWRbtbCvQ31WHaNLzihnsowVQMk8SV6CiqzqwUb4ETxiXCDHABGuCACcYVC2jgEoFuqSAwS6c2y5sfcjxWSe3hs9eLjyWmeHu75MbNcJN8EtBrLs4j2tWGtbq6BR7SYVcygUbscf75HKncC4YBkF1MWRHmos11ZKaCwLfjsTW6XUXdfhvpco6mxic8muiq5XA3G8x47stQ3eBoSXoehRAMd4FKNv9geDagAhjorfcMd7kCT5CEENMqCLpjtUsFXSGtjqpYg1aeaQsxdEszfw2LS5pvPQEnYmqMRxZsuYHbuAH4J7sQwvjn7ezWfpsJfwQT6EqDFJp4g6vcgfUeCYzim9DVDnJYWocL9Uzb9P3rvknDKZmAkFWgTVWq16GaX4xUsXVNLMkDm88q4vvxrCQyAerNzox27uF2yH1ca66pVu9GZkGHM1KJrNmyyYEm445wNgkgSx8ivhyC11SUKs3nTKNVrnc
apiServer:
port: "8080"
nodes:
- peerId: 12D3KooWPviVQZY23iXQU7eva7k4ojyScagRAqE8cXm6cTm6rhNs
address: 127.0.0.1:4430
signingKey: 3inYSp8q7WidZi7RkhLFNZPF9yMJXHpnyrTpTsE7cd2ggESfTAEpWiQNa8zCnNdzK9DpG6Mu5hDCZhfU6RMDL466H8om
encryptionKey: udNkNsbKtm7eu1Bqt6DUwLZka2Rq22vm33K5eezrMkGnXx8X13oQ8HBSHgkLmRZTC1737hgZ66s92QRNe1YC1v8JNiv6yXVuV3MujBkmrYYVbGB7DBz6REsGAMPmuqDiG5PubjVHP468MDJJYRTZN6VEJuSGZtEAyLBJe9iErRVFsMdNc6ZMbY5pxoYE6LHWbsfZejrnAXvgABTnYTN4ad6EaxRiEWHGACizr59uf3KYFb9hFwxkFMhenpTpxbkBwQp3V8MxV4eLbgsWQPMpfeGeinjmNRAnZfi6tnqSDLyy3RWM5y9W89rYKt3EdVRwcFWFHh8FgSjzhpqE77GEwjGU9ddKorUfW85jQGdd2xdFwQfLF2eTGEwSxX9Gz2fTTtkN4pNJC8MesKK2cRsLAZLb9rv5ebCMgQ3S86WjfbWRcEYLoKUWKjujiY1XT5GszTgSa2b68QpcDmqgUv7FiwcZwC5qaCsgoARZ2GGsPgW7mCBahxTJnor4dCLF9Aiz1YvFvwWzm8k9zUmY5pxPQFNSWfAAcodqEHJV6i9TSf5ERNLJCQiRTSStZZdoeqAf42sYCSdiaiVUc26Q7QcTxtxxpfCTqGc2Lgh6ofgwG1m2CfmhXTsBDwBFRc8q6TcGJ4fT4WuZXF9f4MhKHqgqppFKMWYyvtJyvypbJ735kU63UWWSMCo8BbVTRrtaq7bPHbgvHHWYso5o7PeqNuhD2YwTKd6wpbAeg8wpv3eEWB2sgotQ5JiW2itEyinLZHRpLvbtYGFWctsZisvBbCssiCsPqzG1DffZWNjt2nj1vztuapZLpz9azanCaXhFxLBBLVveEk4tvEknHtREwXtmTpQK5GqpmW9LTFqA5jj7fVtytYQFufe9Cp4HKNNodv1UCPQ1KTLwkwA6UEMwqamvE67GDDvjmE2XaMoq6ErEYX6FkQi2d9kbLJaFVGwYH3DVcbBGAJzSoeYceosuoQJBcKS19BhFqXWT1W3iKvMibadh8MYLim7ncxezuHFCyUPFnFn33kfzzTWTsFYbKDukCECRBKciptevx9zS6V2YiSgcYRWV7Rd8sNg8vAGUBQ6mLBtWNExn5gRkoBQDqN9BhfqpEbGo33JHb7GVLsN7um9VxKcTSkiDLPKTxijNevHeRWRbtbCvQ31WHaNLzihnsowVQMk8SV6CiqzqwUb4ETxiXCDHABGuCACcYVC2jgEoFuqSAwS6c2y5sfcjxWSe3hs9eLjyWmeHu75MbNcJN8EtBrLs4j2tWGtbq6BR7SYVcygUbscf75HKncC4YBkF1MWRHmos11ZKaCwLfjsTW6XUXdfhvpco6mxic8muiq5XA3G8x47stQ3eBoSXoehRAMd4FKNv9geDagAhjorfcMd7kCT5CEENMqCLpjtUsFXSGtjqpYg1aeaQsxdEszfw2LS5pvPQEnYmqMRxZsuYHbuAH4J7sQwvjn7ezWfpsJfwQT6EqDFJp4g6vcgfUeCYzim9DVDnJYWocL9Uzb9P3rvknDKZmAkFWgTVWq16GaX4xUsXVNLMkDm88q4vvxrCQyAerNzox27uF2yH1ca66pVu9GZkGHM1KJrNmyyYEm445wNgkgSx8ivhyC11SUKs3nTKNVrnc
- peerId: 12D3KooWPzziVTnuypw4MC7m4mzdoh9CM5FXGrYiTW81pfjngmwx
address: 127.0.0.1:4432
signingKey: 3i7TZgCtctChP9rA13BDuK2GmdbRL6sgk51JjetKyFt3SEqXN8M4EwcREWVfPYmayBKYPHveLzY7NsggxvKapJFdYH8F
encryptionKey: JgG4CcCbae1qEpe7mL87XWScKZWBBqSZseakgKVJfHqTUt3CiLPMKPDZtzQWLfb2ouiTbff5YTBZbGxwKqWYMAW8prEVjbcXBtkCi2Y2XkZ8v1ugidF5MC3d1ifnh5Es5CafB8kToUegtAig8vhMCuXAzLQpbiQyFLf25KcyKfVvHUomafaHyKCbYu1bmyGRC7CGxUD6jnPg2MYZZiJiq9scNRaRxbiqCe9C2VcUTFdigV65eGmZPKGgfAThcyKCdQSe1hmoXtNw5MaqbB3eQvkuS4GySap3SaEZUmGsmuVumUvHappUpN8x3UQs1TNWfYxULVx3tUMHfQ98G9No4gwP42ENMgCuMVQW9M68s73WCGucvpPUZdvQFiSJkSoKDqi2F6zdrM2LZTp8qGciunKDFZvh14eAGDbPLw9dBLVmMQLKq68yz8Nzi9BYWjzLa4fDNkoYETB8rArNPECAjVVW6PYLzueVNmqvo7RdWZGHF2YCRXr1wLG7jg4WBX4Lm5jQZS8bkWacbpj49b4nxRnjLsGusdCwRCZPY9qFPim2WU4z2KEgRS87x97TiYAThLUJwGBv5pBKRLkrAAoFWwkDSfxcRt83FiZtJ1hLWkpj6fK56SuubQUjWzNfVPCTfNqcod54PAPVESTuAPjDsmM4YeMGgUpaK9Goqbu1FuuZ9P6mQPwGKdpGuVgjJv7uQJZw3fpC9H1vJ4fa2wBrGFkahyEFo2QDb8iG9oGarpFxewJVVs12cpKJeQzU5dLFBvQJmGnHASTnBwfNBieDT7ZeFmnGuN7sLaPJ5wgdKJpY6aAooztyWfJPKTbxzYjUQMVoyW3YbTD3qg8F9grqVT1VFAp88ta1u55eokqz7h82EJNQf7GptDHxPRg4euAJ3BDvmRUfAwYzU8TD1gyiUnnHGRuvH25xgJRqauNrDLUbha33wVAHeMzWJhpLjLz4DLmtLejpfwNr5PuEbczqFAaLspf8dFjkrvmC3RrwHngKuzTkYo46WSJggWs3CwWDpzMmrATh2LRYmdH7BtNUkvu7onRzms8jWHBsFNfbCitMpfAsM3XmgtFUGTMi4LzVK3f5G8HqwGqcVEnFJTZzdoBsfNMKyFapXPnUBUHQse8jZjSYQv3dJpVuTV6HQ5R8saBdKDGMpXFyYqZK85cikcK6Nkd7ZrnbUUN2i6f3ano7hnxTbeW7kGxatUWETTwAzwGjdMhpJjieLFBTomAqqfXopEr1FeWcFmQ6kYBL2fa6XYqhZJevG6LpNXeuLcbyTiADt5CMNwt6MTmEomqtzqE7BgvqYEibuEj5K22ufvvLmX7XbGiB7wZKRPGuTR1pPDgJbzU9HGmF5fWjTVGPeNMn7ZmbQVcys6xy79notYp6MoQhBCTWyuuEuc24REEfMCjPuhNgssJQWT3ZR3xskrcEdCUyGbvQT2eWVa6atYsu5Z2m7sGXXwfH7vhdiUkVZEmhNPusij7dsMa9io63D6itYuu8Ed5aZYoUkZCfKZ39KQ16rtYpa9mfcwG4fyQy11Zy9G7nxdNJohGYeq3xuerSdwTDNdK2PREJcAoNA7Nfcsxk7333B5gXNFZKcBLQk56C1PqApLerSfo3jVuojcNAxYB688K6
- peerId: 12D3KooWL9qu6sDua8hK8sEyeUVzpei8SVf3DA4W1gVAxDMwW9Ze
address: 127.0.0.1:4434
signingKey: 3iPPhpw3jndkSCvpZ4yf5eWYydZwbFAk7gdMkXZBgyh939AicHVakwkabDpxzhtrsgJEXrP5sKam9QL7weMyyoMrKNCH
encryptionKey: JgG4CcCbae1qEpe7mL5br9LinoxiiuQqGC7y6HBbvmQ9pcZM4F4q2oGDQA71ZumaTJ1RAiUDaxi1fBVnrvv4Tqm9fau3NyijggQ246UtHKemsPFBX9Qyv4oYZJ9X3WNNc9q3BkPX9C8WHdfda9fxW4wD1QYKzd5F1J7FdhxZh3MZUdc4GEfPLwew8dTfEJp7ipAvM7TY1r6URe4VHVmsdaEL3jsEUzEqL4MRSc69WdjdKZLyVzQVcjnLegFfoH2c6476MfnoPggeVGghPjDmoqoJ2kzHwjvDQKoGyC9m4wkicS4YZkVA6N2ezKBSumJkV1mY6hpBPuCtdYLDw2zsCqMq9AXFhtMxzSCtLLRnRVntErY3Wb8ty5vEnS8aY7B612okwdsMg2rYxP55ccK4zP7PoKTKTS4TWTqGiLW2GtptTLhQ7ynJudG9YArzwwjgqXaxExrMqFCSMCfQbVNvwv3SckfpwQjpJjCfRS73SQKPY8ghqrdKnuRHyUWpJbYb9y5doCWMCuLw6u6t2LqfcwknNkjoWNCG69tohSgRStbYfmA7AZvHzPGc3gKByZecVxdmvPmtbSLXRiB9FbqtuqEerGxpisL3FhDM6SVqxRm2E2SxwJTHVcyL2LNF5tE2V7mujSa5dwzZfbRA9p3SwFeRhd42z1r36iL92yugfa2SbHYYDhdKFCJLhMc8ogqd1b8oUBbwBktKg6p6MJeZdqS2vfsLQRnV8TyP72fKUjFtaQ4o7D6vQ9VnsBMQS9FF8PEuZ5yF3XQaMjJMpeGho3c4dNFEw4vCiUaVN2WzF7XR28dyGwLASMoVFuUoErgr7TUQm4yKJG8QaHePx7A7Ku3Ys6fvrDsBVJWz5taQB7cMBqXmrAg2DHSUwqr8XfhxXnv8cnZZzPTrtib86P7w3kUYLrJYaR6jur5jTFucFSxthKEMqzBPtS3wgn2hcj31ATSNR7FNDcZmteWex5vAzGkuDoCG4c7dUCGinT3akYvukR7WugdHAYySkhf6AhNGg4wrZ7y3YVRMwRkex3PumcwGGKoTvNMNMqz2kTCNjSsDr6xRBk8njCLwDMunvDZM7VThTMKQ4CvegCtX3kV11qk46WjouqFedYSmShmxB5dMMVKui3owicS3ioVXoSqCRAcov3LJ3zL5aQ19FBBvvezeN27PkcZjvXKQwNEpnMzJeLTLdz8guWrER9Vn7XB3zgvt6xzab8JbHJ62XEEGFC5EoU68D3Wn9oXH6EqfCNbUsfW6EScc7wcsRu7aLi4ddjYrDcd4pQgct3dpAzR9Z4ZHRgQi9k4DF8bg9LUjfjaJS53DGSiSSfGBNtkpdyLp8K8Y89BZRnw21yVcKwA8JD47By6s3vfJ8PdSTs4Lsfm7mEhERuVLyQC9wpPe6ysjzUTQjx7RyTaVVA9ENQfpc1XCdHufDFST1eeiRFLwKzEFCr2bATo5r5oQ6UtBdQv6eFaGv19ju4cikXTNq2vCGw6aMupdaESYeWL5Gs8ssCjb1K2iZfGwqidfTJejEiuoiD2S6shTRv6BFz3JrhU7XsZdLkU9rvDYJs6rWq4C4ix4VWDxNHktF7iBuCASnZjgMYs9Lipt5hZEow4FrWcvUtVQ8PXQT3SC
- "127.0.0.1:4430"
peerList:
myId:
peerId: "12D3KooWHJpSEMQUZCyK8TK181LhjzntWjKfXDr7MWks9cw41R2C"
privKey: "3BhkWxi0Vzc2AhLtw7LhFHa9Ys4P0wHOlPCW02O6FKJvS8V2nzkJGDlM3vcSLZZ0m9tyYuCKqK4TWUKXgu/FVQ=="
remote:
- peerId: "12D3KooWA4FLWvrMbCtp2MbzKcC5RRN7HqxxBxPcSADFfzrGiW3U"
addr: "127.0.0.1:4431"
- peerId: "12D3KooWK6c1CPLL4Bvjim9A9SDRmehy12hYjbqX1VASHKfH7W7H"
addr: "127.0.0.1:4432"

1
go.mod
View File

@ -48,6 +48,7 @@ require (
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
)

2
go.sum
View File

@ -162,6 +162,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View File

@ -13,6 +13,7 @@ var (
ErrClosed = errors.New("object cache closed")
ErrExists = errors.New("object exists")
ErrTimeout = errors.New("loading object timed out")
ErrNotExists = errors.New("object not exists")
)
var (
@ -20,10 +21,6 @@ var (
defaultGC = 20 * time.Second
)
type key int
const CacheTimeout key = 0
var log = logger.NewNamed("ocache")
type LoadFunc func(ctx context.Context, id string) (value Object, err error)
@ -61,7 +58,9 @@ func New(loadFunc LoadFunc, opts ...Option) OCache {
for _, o := range opts {
o(c)
}
if c.ttl != 0 {
go c.ticker()
}
return c
}
@ -99,6 +98,8 @@ type OCache interface {
// Increases the object refs counter on successful
// When 'loadFunc' returns a non-nil error, an object will not be stored to cache
Get(ctx context.Context, id string) (value Object, err error)
// Pick returns value if it's presents in cache (will not call loadFunc)
Pick(id string) (value Object, err error)
// Add adds new object to cache
// Returns error when object exists
Add(id string, value Object) (err error)
@ -154,28 +155,28 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
e.refCount++
c.mu.Unlock()
timeout := ctx.Value(CacheTimeout)
if load {
if timeout != nil {
go c.load(ctx, id, e)
} else {
c.load(ctx, id, e)
}
}
if timeout != nil {
duration := timeout.(time.Duration)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.load:
return e.value, e.loadErr
case <-time.After(duration):
return nil, ErrTimeout
}
}
<-e.load
return e.value, e.loadErr
}
func (c *oCache) Pick(id string) (value Object, err error) {
c.mu.Lock()
val, ok := c.data[id]
c.mu.Unlock()
if !ok {
return nil, ErrNotExists
}
<-val.load
return val.value, val.loadErr
}
func (c *oCache) load(ctx context.Context, id string, e *entry) {
defer close(e.load)
value, err := c.loadFunc(ctx, id)

View File

@ -92,6 +92,20 @@ func TestOCache_Get(t *testing.T) {
_, err := c.Get(context.TODO(), "id")
assert.Equal(t, ErrClosed, err)
})
t.Run("context cancel", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
time.Sleep(time.Second / 3)
return &testObject{
name: "id",
}, nil
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.Get(ctx, "id")
assert.Equal(t, context.Canceled, err)
assert.NoError(t, c.Close())
})
}
func TestOCache_GC(t *testing.T) {

View File

@ -0,0 +1,84 @@
package example
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"go.uber.org/zap"
"strings"
"time"
)
var log = logger.NewNamed("example")
type Example struct {
pool pool.Pool
peerConf config.PeerList
}
func (e *Example) Init(ctx context.Context, a *app.App) (err error) {
e.pool = a.MustComponent(pool.CName).(pool.Pool)
e.peerConf = a.MustComponent(config.CName).(*config.Config).PeerList
// subscribe for sync messages
e.pool.AddHandler(syncproto.MessageType_MessageTypeSync, e.syncHandler)
return
}
func (e *Example) Name() (name string) {
return "example"
}
func (e *Example) Run(ctx context.Context) (err error) {
// dial manually with all peers
for _, rp := range e.peerConf.Remote {
if er := e.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil {
log.Info("can't dial to peer", zap.Error(er))
} else {
log.Info("connected with peer", zap.String("peerId", rp.PeerId))
}
}
go e.doRequests()
return nil
}
func (e *Example) syncHandler(ctx context.Context, msg *pool.Message) (err error) {
data := string(msg.Data) // you need unmarshal this bytes
log.Info("msg received", zap.String("peerId", msg.Peer().Id()), zap.String("data", data))
if strings.HasPrefix(data, "ack:") {
if err = msg.Ack(); err != nil {
log.Error("ack error", zap.Error(err))
}
} else if strings.HasPrefix(data, "ackErr:") {
if err = msg.AckError(42, "ack error description"); err != nil {
log.Error("ackErr error", zap.Error(err))
}
} else if strings.HasPrefix(data, "reply:") {
if err = msg.Reply([]byte("reply for:" + strings.TrimPrefix(data, "reply:"))); err != nil {
log.Error("reply error", zap.Error(err))
}
}
return nil
}
func (e *Example) doRequests() {
time.Sleep(time.Second)
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
st := time.Now()
err := e.pool.SendAndWait(ctx, e.peerConf.Remote[0].PeerId, &syncproto.Message{
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
Data: []byte("ack: something"),
})
log.Info("sent with ack:", zap.Error(err), zap.Duration("dur", time.Since(st)))
}
func (e *Example) Close(ctx context.Context) (err error) {
return
}

View File

@ -0,0 +1,104 @@
package dialer
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"github.com/libp2p/go-libp2p-core/sec"
"go.uber.org/zap"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"sync"
)
const CName = "net/dialer"
var ErrArrdsNotFound = errors.New("addrs for peer not found")
var log = logger.NewNamed(CName)
func New() Dialer {
return &dialer{}
}
type Dialer interface {
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
UpdateAddrs(addrs map[string][]string)
app.Component
}
type dialer struct {
transport secure.Service
peerAddrs map[string][]string
mu sync.RWMutex
}
func (d *dialer) Init(ctx context.Context, a *app.App) (err error) {
d.transport = a.MustComponent(secure.CName).(secure.Service)
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote
d.peerAddrs = map[string][]string{}
for _, rp := range peerConf {
d.peerAddrs[rp.PeerId] = []string{rp.Addr}
}
return
}
func (d *dialer) Name() (name string) {
return CName
}
func (d *dialer) UpdateAddrs(addrs map[string][]string) {
d.mu.Lock()
d.peerAddrs = addrs
d.mu.Unlock()
}
func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) {
d.mu.RLock()
defer d.mu.RUnlock()
addrs, ok := d.peerAddrs[peerId]
if !ok || len(addrs) == 0 {
return nil, ErrArrdsNotFound
}
var (
stream drpc.Stream
sc sec.SecureConn
)
for _, addr := range addrs {
stream, sc, err = d.makeStream(ctx, addr)
if err != nil {
log.Info("can't connect to host", zap.String("addr", addr))
} else {
err = nil
break
}
}
if err != nil {
return
}
return rpc.PeerFromStream(sc, stream, false), nil
}
func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Stream, sc sec.SecureConn, err error) {
tcpConn, err := net.Dial("tcp", addr)
if err != nil {
return
}
sc, err = d.transport.TLSConn(ctx, tcpConn)
if err != nil {
return
}
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String()))
stream, err = drpcconn.New(sc).NewStream(ctx, "", rpc.Encoding)
if err != nil {
return
}
return stream, sc, err
}

35
service/net/peer/peer.go Normal file
View File

@ -0,0 +1,35 @@
package peer
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"time"
)
type Dir uint
const (
// DirInbound indicates peer created connection
DirInbound Dir = iota
// DirOutbound indicates that our host created connection
DirOutbound
)
type Info struct {
Id string
Dir Dir
LastActiveUnix int64
}
func (i Info) LastActive() time.Time {
return time.Unix(i.LastActiveUnix, 0)
}
type Peer interface {
Id() string
Info() Info
Recv() (*syncproto.Message, error)
Send(msg *syncproto.Message) (err error)
Context() context.Context
Close() error
}

View File

@ -0,0 +1,71 @@
package pool
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"gopkg.in/mgo.v2/bson"
)
type Message struct {
*syncproto.Message
peer peer.Peer
}
func (m *Message) Peer() peer.Peer {
return m.peer
}
func (m *Message) Reply(data []byte) (err error) {
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSync,
},
Data: data,
}
return m.peer.Send(rep)
}
func (m *Message) Ack() (err error) {
ack := &syncproto.System{
Ack: &syncproto.SystemAck{},
}
data, err := ack.Marshal()
if err != nil {
return
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
},
Data: data,
}
return m.peer.Send(rep)
}
func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) {
ack := &syncproto.System{
Ack: &syncproto.SystemAck{
Error: &syncproto.SystemError{
Code: code,
Description: description,
},
},
}
data, err := ack.Marshal()
if err != nil {
return
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: []byte(bson.NewObjectId()),
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
},
Data: data,
}
return m.peer.Send(rep)
}

28
service/net/pool/peer.go Normal file
View File

@ -0,0 +1,28 @@
package pool
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type peerEntry struct {
peer peer.Peer
groupIds []string
ready chan struct{}
}
func (pe *peerEntry) addGroup(groupId string) (ok bool) {
if slice.FindPos(pe.groupIds, groupId) != -1 {
return false
}
pe.groupIds = append(pe.groupIds, groupId)
return true
}
func (pe *peerEntry) removeGroup(groupId string) (ok bool) {
if slice.FindPos(pe.groupIds, groupId) == -1 {
return false
}
pe.groupIds = slice.Remove(pe.groupIds, groupId)
return true
}

305
service/net/pool/pool.go Normal file
View File

@ -0,0 +1,305 @@
package pool
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
"sync"
"sync/atomic"
)
const CName = "sync/peerPool"
var log = logger.NewNamed("peerPool")
var (
ErrPoolClosed = errors.New("peer pool is closed")
ErrPeerNotFound = errors.New("peer not found")
)
func NewPool() Pool {
return &pool{closed: true}
}
type Handler func(ctx context.Context, msg *Message) (err error)
type Pool interface {
DialAndAddPeer(ctx context.Context, id string) (err error)
AddAndReadPeer(peer peer.Peer) (err error)
AddHandler(msgType syncproto.MessageType, h Handler)
AddPeerIdToGroup(peerId, groupId string) (err error)
RemovePeerIdFromGroup(peerId, groupId string) (err error)
SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error)
Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error)
app.ComponentRunnable
}
type pool struct {
peersById map[string]*peerEntry
waiters waiters
handlers map[syncproto.MessageType][]Handler
peersIdsByGroup map[string][]string
dialer dialer.Dialer
closed bool
mu sync.RWMutex
wg *sync.WaitGroup
}
func (p *pool) Init(ctx context.Context, a *app.App) (err error) {
p.peersById = map[string]*peerEntry{}
p.handlers = map[syncproto.MessageType][]Handler{}
p.peersIdsByGroup = map[string][]string{}
p.waiters = waiters{waiters: map[uint64]*waiter{}}
p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer)
p.wg = &sync.WaitGroup{}
return nil
}
func (p *pool) Name() (name string) {
return CName
}
func (p *pool) Run(ctx context.Context) (err error) {
p.closed = false
return nil
}
func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) {
p.mu.Lock()
defer p.mu.Unlock()
if !p.closed {
// unable to add handler after Run
return
}
p.handlers[msgType] = append(p.handlers[msgType], h)
}
func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrPoolClosed
}
if _, ok := p.peersById[peerId]; ok {
return nil
}
peer, err := p.dialer.Dial(ctx, peerId)
if err != nil {
return
}
p.peersById[peer.Id()] = &peerEntry{
peer: peer,
}
p.wg.Add(1)
go p.readPeerLoop(peer)
return nil
}
func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return ErrPoolClosed
}
p.peersById[peer.Id()] = &peerEntry{
peer: peer,
}
p.wg.Add(1)
p.mu.Unlock()
return p.readPeerLoop(peer)
}
func (p *pool) AddPeerIdToGroup(peerId, groupId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
peer, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
if slice.FindPos(peer.groupIds, groupId) != -1 {
return nil
}
peer.addGroup(groupId)
p.peersIdsByGroup[groupId] = append(p.peersIdsByGroup[groupId], peerId)
return
}
func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
peer, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
if slice.FindPos(peer.groupIds, groupId) == -1 {
return nil
}
peer.removeGroup(groupId)
p.peersIdsByGroup[groupId] = slice.Remove(p.peersIdsByGroup[groupId], peerId)
return
}
func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) {
p.mu.RLock()
peer := p.peersById[peerId]
p.mu.RUnlock()
if peer == nil {
return ErrPeerNotFound
}
repId := p.waiters.NewReplyId()
msg.GetHeader().RequestId = repId
ch := make(chan Reply, 1)
p.waiters.Add(repId, &waiter{ch: ch})
defer p.waiters.Remove(repId)
if err = peer.peer.Send(msg); err != nil {
return
}
select {
case rep := <-ch:
if rep.Error != nil {
return rep.Error
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) {
//TODO implement me
panic("implement me")
}
func (p *pool) readPeerLoop(peer peer.Peer) (err error) {
defer p.wg.Done()
for {
msg, err := peer.Recv()
if err != nil {
log.Debug("peer receive error", zap.Error(err), zap.String("peerId", peer.Id()))
break
}
p.handleMessage(peer, msg)
}
if err = p.removePeer(peer.Id()); err != nil {
log.Error("remove peer error", zap.String("peerId", peer.Id()))
}
return
}
func (p *pool) removePeer(peerId string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.peersById[peerId]
if !ok {
return ErrPeerNotFound
}
delete(p.peersById, peerId)
return
}
func (p *pool) handleMessage(peer peer.Peer, msg *syncproto.Message) {
replyId := msg.GetHeader().GetReplyId()
if replyId != 0 {
if !p.waiters.Send(replyId, Reply{
PeerInfo: peer.Info(),
Message: &Message{
Message: msg,
peer: peer,
},
}) {
log.Debug("received reply with unknown (or expired) replyId", zap.Uint64("replyId", replyId))
}
return
}
handlers := p.handlers[msg.GetHeader().GetType()]
if len(handlers) == 0 {
return
}
message := &Message{Message: msg, peer: peer}
for _, h := range handlers {
if err := h(peer.Context(), message); err != nil {
log.Error("handle message error", zap.Error(err))
}
}
}
func (p *pool) Close(ctx context.Context) (err error) {
p.mu.Lock()
for _, peer := range p.peersById {
peer.peer.Close()
}
wg := p.wg
p.mu.Unlock()
if wg != nil {
wg.Wait()
}
return nil
}
type waiter struct {
sent int
ch chan<- Reply
}
type waiters struct {
waiters map[uint64]*waiter
replySeq uint64
mu sync.Mutex
}
func (w waiters) Send(replyId uint64, r Reply) (ok bool) {
w.mu.Lock()
wait := w.waiters[replyId]
if wait == nil {
w.mu.Unlock()
return false
}
wait.sent++
var lastMessage = wait.sent == cap(wait.ch)
if lastMessage {
delete(w.waiters, replyId)
}
w.mu.Unlock()
wait.ch <- r
if lastMessage {
close(wait.ch)
}
return true
}
func (w waiters) Add(replyId uint64, wait *waiter) {
w.mu.Lock()
w.waiters[replyId] = wait
w.mu.Unlock()
}
func (w waiters) Remove(id uint64) error {
w.mu.Lock()
defer w.mu.Unlock()
if _, ok := w.waiters[id]; ok {
delete(w.waiters, id)
return nil
}
return fmt.Errorf("waiter not found")
}
func (w waiters) NewReplyId() uint64 {
res := atomic.AddUint64(&w.replySeq, 1)
if res == 0 {
return w.NewReplyId()
}
return res
}

View File

@ -0,0 +1,45 @@
package pool
import "context"
// 1. message for one peerId with ack
// pool.SendAndWait(ctx context,.C
// 2. message for many peers without ack (or group)
type Request struct {
groupId string
oneOf []string
all []string
tryDial bool
needReply bool
pool *pool
}
func (r *Request) GroupId(groupId string) *Request {
r.groupId = groupId
return r
}
func (r *Request) All(peerIds ...string) *Request {
r.all = peerIds
return r
}
func (r *Request) OneOf(peerIds ...string) *Request {
r.oneOf = peerIds
return r
}
func (r *Request) TryDial(is bool) *Request {
r.tryDial = is
return r
}
func (r *Request) NeedReply(is bool) *Request {
r.needReply = is
return r
}
func (r *Request) Exec(ctx context.Context, msg *Message) *Results {
return nil
}

View File

@ -0,0 +1,53 @@
package pool
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
)
// Results of request collects replies and errors
// Must be closed after usage r.Close()
type Results struct {
ctx context.Context
cancel func()
waiterId uint64
ch chan Reply
pool *pool
}
// Iterate iterates over replies
// if callback will return a non-nil error then iteration stops
func (r *Results) Iterate(callback func(r Reply) (err error)) (err error) {
if r.ctx == nil || r.ch == nil {
return fmt.Errorf("results not initialized")
}
for {
select {
case <-r.ctx.Done():
return r.ctx.Err()
case m, ok := <-r.ch:
if ok {
if err = callback(m); err != nil {
return err
}
} else {
return
}
}
}
}
// Close cancels iteration and unregister reply handler in the pool
// Required to call to avoid memory leaks
func (r *Results) Close() (err error) {
r.cancel()
return r.pool.waiters.Remove(r.waiterId)
}
// Reply presents the result of request executing can be error or result message
type Reply struct {
PeerInfo peer.Info
Error error
Message *Message
}

View File

@ -0,0 +1,18 @@
package rpc
import (
"github.com/gogo/protobuf/proto"
"storj.io/drpc"
)
var Encoding = enc{}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -1,27 +1,24 @@
package drpcserver
package server
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/transport"
"github.com/gogo/protobuf/proto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
"go.uber.org/zap"
"io"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcserver"
"strings"
"sync"
"time"
)
var log = logger.NewNamed("drpcserver")
const CName = "net/drpcserver"
const CName = "drpcserver"
var log = logger.NewNamed(CName)
func New() DRPCServer {
return &drpcServer{}
@ -34,17 +31,16 @@ type DRPCServer interface {
type drpcServer struct {
config config.GrpcServer
drpcServer *drpcserver.Server
transport transport.Service
listeners []transport.ContextListener
messageService message.Service
transport secure.Service
listeners []secure.ContextListener
pool pool.Pool
cancel func()
}
func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) {
s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer
s.transport = a.MustComponent(transport.CName).(transport.Service)
s.messageService = a.MustComponent(message.CName).(message.Service)
s.transport = a.MustComponent(secure.CName).(secure.Service)
s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil
}
@ -66,7 +62,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
return
}
func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) {
func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
@ -90,7 +86,7 @@ func (s *drpcServer) serve(ctx context.Context, lis transport.ContextListener) {
}
continue
}
if _, ok := err.(transport.HandshakeError); ok {
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}
@ -113,29 +109,14 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) {
}
}
func (s *drpcServer) HandleRPC(stream drpc.Stream, rpc string) (err error) {
func (s *drpcServer) HandleRPC(stream drpc.Stream, _ string) (err error) {
ctx := stream.Context()
sc, err := transport.CtxSecureConn(ctx)
sc, err := secure.CtxSecureConn(ctx)
if err != nil {
return
}
peerId := sc.RemotePeer().String()
l := log.With(zap.String("peer", peerId))
l.Info("stream opened")
defer func() {
l.Info("stream closed", zap.Error(err))
}()
ch := s.messageService.RegisterMessageSender(peerId)
defer s.messageService.UnregisterMessageSender(peerId)
wg := &sync.WaitGroup{}
wg.Add(2)
go s.sendMessages(stream, wg, ch)
go s.receiveMessages(stream, wg, peerId)
wg.Wait()
return nil
log.With(zap.String("peer", sc.RemotePeer().String())).Debug("stream opened")
return s.pool.AddAndReadPeer(rpc.PeerFromStream(sc, stream, true))
}
func (s *drpcServer) Close(ctx context.Context) (err error) {
@ -149,43 +130,3 @@ func (s *drpcServer) Close(ctx context.Context) (err error) {
}
return
}
func (s *drpcServer) sendMessages(stream drpc.Stream, wg *sync.WaitGroup, ch chan *syncpb.SyncContent) {
defer wg.Done()
for {
select {
case msg := <-ch:
if err := stream.MsgSend(msg, enc{}); err != nil {
return
}
case <-stream.Context().Done():
return
}
}
}
func (s *drpcServer) receiveMessages(stream drpc.Stream, wg *sync.WaitGroup, peerId string) {
defer wg.Done()
for {
msg := &syncpb.SyncContent{}
if err := stream.MsgRecv(msg, enc{}); err != nil {
if err == io.EOF {
return
}
}
err := s.messageService.HandleMessage(peerId, msg)
if err != nil {
log.Error("error handling message", zap.Error(err))
}
}
}
type enc struct{}
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
return msg.(proto.Marshaler).Marshal()
}
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
return msg.(proto.Unmarshaler).Unmarshal(buf)
}

View File

@ -1,6 +1,6 @@
//go:build !windows
package drpcserver
package server
import (
"errors"

View File

@ -1,6 +1,6 @@
//go:build windows
package drpcserver
package server
import (
"errors"

55
service/net/rpc/stream.go Normal file
View File

@ -0,0 +1,55 @@
package rpc
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/libp2p/go-libp2p-core/sec"
"storj.io/drpc"
"sync/atomic"
"time"
)
func PeerFromStream(sc sec.SecureConn, stream drpc.Stream, incoming bool) peer.Peer {
dp := &drpcPeer{
sc: sc,
Stream: stream,
}
dp.info.Id = sc.RemotePeer().String()
if incoming {
dp.info.Dir = peer.DirInbound
} else {
dp.info.Dir = peer.DirOutbound
}
return dp
}
type drpcPeer struct {
sc sec.SecureConn
info peer.Info
drpc.Stream
}
func (d *drpcPeer) Id() string {
return d.info.Id
}
func (d *drpcPeer) Info() peer.Info {
return d.info
}
func (d *drpcPeer) Recv() (msg *syncproto.Message, err error) {
msg = &syncproto.Message{}
if err = d.Stream.MsgRecv(msg, Encoding); err != nil {
return
}
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
return
}
func (d *drpcPeer) Send(msg *syncproto.Message) (err error) {
if err = d.Stream.MsgSend(msg, Encoding); err != nil {
return
}
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
return
}

View File

@ -1,4 +1,4 @@
package transport
package secure
import (
"context"

View File

@ -1,4 +1,4 @@
package transport
package secure
import (
"context"

View File

@ -1,11 +1,13 @@
package transport
package secure
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"go.uber.org/zap"
@ -14,9 +16,9 @@ import (
type HandshakeError error
var log = logger.NewNamed("transport")
const CName = "net/secure"
const CName = "transport"
var log = logger.NewNamed(CName)
func New() Service {
return &service{}
@ -33,20 +35,39 @@ type service struct {
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
acc := a.MustComponent(account.CName).(account.Service)
rawKey, err := acc.Account().SignKey.Raw()
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList
pkb, err := crypto.ConfigDecodeKey(peerConf.MyId.PrivKey)
if err != nil {
return err
return
}
if s.key, err = crypto.UnmarshalEd25519PrivateKey(pkb); err != nil {
return
}
// converting into libp2p crypto structure
s.key, err = crypto.UnmarshalEd25519PrivateKey(rawKey)
pid, err := peer.Decode(peerConf.MyId.PeerId)
if err != nil {
return err
return
}
pubKeyRaw, _ := s.key.GetPublic().Raw()
log.Info("transport keys generated", zap.Binary("pubKey", pubKeyRaw))
var testData = []byte("test data")
sign, err := s.key.Sign(testData)
if err != nil {
return
}
pubKey, err := pid.ExtractPublicKey()
if err != nil {
return
}
ok, err := pubKey.Verify(testData, sign)
if err != nil {
return
}
if !ok {
return fmt.Errorf("peerId and privateKey mismatched")
}
log.Info("secure service init", zap.String("peerId", peerConf.MyId.PeerId))
return nil
}

View File

@ -2,6 +2,63 @@ syntax = "proto3";
package anytype;
option go_package = "/syncproto";
message SyncMessage {
int64 seq = 1;
message Message {
Header header = 1;
bytes data = 2;
}
message Header {
bytes traceId = 1;
uint64 requestId = 2;
uint64 replyId = 3;
MessageType type = 4;
}
enum MessageType {
MessageTypeSystem = 0;
MessageTypeSubscription = 1;
MessageTypeSync = 2;
}
message System {
Handshake handshake = 1;
Ping ping = 2;
Ack ack = 3;
message Handshake {
string protocolVersion = 1;
}
message Ping {
uint64 unixTime = 1;
}
message Ack {
Error error = 2;
}
message Error {
Code code = 1;
string description = 2;
enum Code {
UNKNOWN = 0;
UNSUPPORTED_PROTOCOL_VERSION = 10;
}
}
}
message Subscription {
SubscribeSpace subscribeSpace = 1;
UnsubscribeSpace unsubscribeSpace = 2;
message SubscribeSpace {
string spaceId = 1;
}
message UnsubscribeSpace {
string spaceId = 1;
}
}
message Sync {
string spaceId = 1;
}

File diff suppressed because it is too large Load Diff