From 6c98cfbf57eb250b033aeac1609ba7ca6b71e20c Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 18 May 2023 13:21:01 +0200 Subject: [PATCH 01/10] app.AppName + prometheus versions metric --- app/app.go | 10 +++++++--- metric/metric.go | 5 +++++ metric/version.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 metric/version.go diff --git a/app/app.go b/app/app.go index 84682f2c..a83e0157 100644 --- a/app/app.go +++ b/app/app.go @@ -15,8 +15,8 @@ import ( var ( // values of this vars will be defined while compilation - GitCommit, GitBranch, GitState, GitSummary, BuildDate string - name string + AppName, GitCommit, GitBranch, GitState, GitSummary, BuildDate string + name string ) var ( @@ -66,6 +66,10 @@ func (app *App) Name() string { return name } +func (app *App) AppName() string { + return AppName +} + // Version return app version func (app *App) Version() string { return GitSummary @@ -257,7 +261,7 @@ func (app *App) Close(ctx context.Context) error { case <-time.After(StopWarningAfter): statLogger(app.stopStat, log). With(zap.String("in_progress", currentComponentStopping)). - Warn("components close in progress") + Warn("components close in progress") } }() go func() { diff --git a/metric/metric.go b/metric/metric.go index d5319982..fedff6bd 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -32,9 +32,11 @@ type metric struct { registry *prometheus.Registry rpcLog logger.CtxLogger config Config + a *app.App } func (m *metric) Init(a *app.App) (err error) { + m.a = a m.registry = prometheus.NewRegistry() m.config = a.MustComponent("config").(configSource).GetMetric() m.rpcLog = logger.NewNamed("rpcLog") @@ -52,6 +54,9 @@ func (m *metric) Run(ctx context.Context) (err error) { if err = m.registry.Register(collectors.NewGoCollector()); err != nil { return err } + if err = m.registry.Register(newVersionsCollector(m.a)); err != nil { + return + } if m.config.Addr != "" { var errCh = make(chan error) http.Handle("/metrics", promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{})) diff --git a/metric/version.go b/metric/version.go new file mode 100644 index 00000000..614552d1 --- /dev/null +++ b/metric/version.go @@ -0,0 +1,43 @@ +package metric + +import ( + "github.com/anytypeio/any-sync/app" + "github.com/prometheus/client_golang/prometheus" + "runtime/debug" +) + +func newVersionsCollector(a *app.App) prometheus.Collector { + return &versionCollector{prometheus.MustNewConstMetric(prometheus.NewDesc( + "anysync_versions", + "Build information about the main Go module.", + nil, prometheus.Labels{ + "anysync_version": anySyncVerion(), + "app_name": a.AppName(), + "app_version": a.Version(), + }, + ), prometheus.GaugeValue, 1)} +} + +func anySyncVerion() string { + info, ok := debug.ReadBuildInfo() + if ok { + for _, mod := range info.Deps { + if mod.Path == "github.com/anytypeio/any-sync" { + return mod.Version + } + } + } + return "" +} + +type versionCollector struct { + ver prometheus.Metric +} + +func (v *versionCollector) Describe(descs chan<- *prometheus.Desc) { + descs <- v.ver.Desc() +} + +func (v *versionCollector) Collect(metrics chan<- prometheus.Metric) { + metrics <- v.ver +} From b22ddb612a0ce9a24282cae97336cbbe9ec178ee Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 18 May 2023 13:31:53 +0200 Subject: [PATCH 02/10] app.AnySyncVersion --- app/app.go | 30 +++++++++++++++++++++++++----- metric/version.go | 15 +-------------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/app/app.go b/app/app.go index a83e0157..e9e69391 100644 --- a/app/app.go +++ b/app/app.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "os" "runtime" + "runtime/debug" "strings" "sync" "time" @@ -54,11 +55,12 @@ type ComponentStatable interface { // App is the central part of the application // It contains and manages all components type App struct { - components []Component - mu sync.RWMutex - startStat Stat - stopStat Stat - deviceState int + components []Component + mu sync.RWMutex + startStat Stat + stopStat Stat + deviceState int + anySyncVersion string } // Name returns app name @@ -315,3 +317,21 @@ func (app *App) SetDeviceState(state int) { } } } + +var onceVersion sync.Once + +func (app *App) AnySyncVersion() string { + onceVersion.Do(func() { + fmt.Println("111") + info, ok := debug.ReadBuildInfo() + if ok { + for _, mod := range info.Deps { + if mod.Path == "github.com/anytypeio/any-sync" { + app.anySyncVersion = mod.Version + break + } + } + } + }) + return app.anySyncVersion +} diff --git a/metric/version.go b/metric/version.go index 614552d1..19a93a12 100644 --- a/metric/version.go +++ b/metric/version.go @@ -3,7 +3,6 @@ package metric import ( "github.com/anytypeio/any-sync/app" "github.com/prometheus/client_golang/prometheus" - "runtime/debug" ) func newVersionsCollector(a *app.App) prometheus.Collector { @@ -11,25 +10,13 @@ func newVersionsCollector(a *app.App) prometheus.Collector { "anysync_versions", "Build information about the main Go module.", nil, prometheus.Labels{ - "anysync_version": anySyncVerion(), + "anysync_version": a.AnySyncVersion(), "app_name": a.AppName(), "app_version": a.Version(), }, ), prometheus.GaugeValue, 1)} } -func anySyncVerion() string { - info, ok := debug.ReadBuildInfo() - if ok { - for _, mod := range info.Deps { - if mod.Path == "github.com/anytypeio/any-sync" { - return mod.Version - } - } - } - return "" -} - type versionCollector struct { ver prometheus.Metric } From 190d48b430e965e50f37e1dd2a48f7425c5787da Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 18 May 2023 13:35:49 +0200 Subject: [PATCH 03/10] write app version to rpc log --- metric/log.go | 6 +++++- metric/metric.go | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/metric/log.go b/metric/log.go index 7cc34e66..edb63249 100644 --- a/metric/log.go +++ b/metric/log.go @@ -35,6 +35,10 @@ func Identity(val string) zap.Field { return zap.String("identity", val) } +func App(app string) zap.Field { + return zap.String("app", app) +} + func FileId(fileId string) zap.Field { return zap.String("fileId", fileId) } @@ -57,5 +61,5 @@ func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field if ak != nil { acc = ak.Account() } - m.rpcLog.Info("", append(fields, PeerId(peerId), Identity(acc), Method(rpc))...) + m.rpcLog.Info("", append(fields, m.appField, PeerId(peerId), Identity(acc), Method(rpc))...) } diff --git a/metric/metric.go b/metric/metric.go index fedff6bd..aeac0550 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -33,6 +33,7 @@ type metric struct { rpcLog logger.CtxLogger config Config a *app.App + appField zap.Field } func (m *metric) Init(a *app.App) (err error) { @@ -40,6 +41,7 @@ func (m *metric) Init(a *app.App) (err error) { m.registry = prometheus.NewRegistry() m.config = a.MustComponent("config").(configSource).GetMetric() m.rpcLog = logger.NewNamed("rpcLog") + m.appField = App(a.Version()) return nil } From 014d8d72dfc29796598b258cda3e1509a023fa8d Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 19 May 2023 22:18:39 +0200 Subject: [PATCH 04/10] remove debug --- app/app.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/app.go b/app/app.go index e9e69391..80d5b1fe 100644 --- a/app/app.go +++ b/app/app.go @@ -322,7 +322,6 @@ var onceVersion sync.Once func (app *App) AnySyncVersion() string { onceVersion.Do(func() { - fmt.Println("111") info, ok := debug.ReadBuildInfo() if ok { for _, mod := range info.Deps { From be29456a29ddd2e889f11e3cb32a1df761ac3f8e Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 19 May 2023 22:21:12 +0200 Subject: [PATCH 05/10] fix test --- metric/log_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metric/log_test.go b/metric/log_test.go index 50835795..e22a7112 100644 --- a/metric/log_test.go +++ b/metric/log_test.go @@ -3,10 +3,11 @@ package metric import ( "context" "github.com/anytypeio/any-sync/app/logger" + "go.uber.org/zap" "testing" ) func TestLog(t *testing.T) { - m := &metric{rpcLog: logger.NewNamed("rpcLog")} + m := &metric{rpcLog: logger.NewNamed("rpcLog"), appField: zap.String("appName", "test")} m.RequestLog(context.Background(), "rpc") } From 4a7bf3ef461a439955e696c6fe53911154c5db10 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 22 May 2023 13:51:13 +0200 Subject: [PATCH 06/10] handshake proto version --- net/secureservice/credential.go | 26 ++++-- net/secureservice/credential_test.go | 29 +++++- net/secureservice/handshake/handshake.go | 2 + net/secureservice/handshake/handshake_test.go | 26 ++++-- .../handshake/handshakeproto/handshake.pb.go | 88 ++++++++++++++----- .../handshakeproto/protos/handshake.proto | 2 + net/secureservice/secureservice.go | 13 +-- net/secureservice/secureservice_test.go | 33 ++++++- 8 files changed, 172 insertions(+), 47 deletions(-) diff --git a/net/secureservice/credential.go b/net/secureservice/credential.go index 82fde3a3..b0341226 100644 --- a/net/secureservice/credential.go +++ b/net/secureservice/credential.go @@ -9,12 +9,16 @@ import ( "go.uber.org/zap" ) -func newNoVerifyChecker() handshake.CredentialChecker { - return &noVerifyChecker{cred: &handshakeproto.Credentials{Type: handshakeproto.CredentialsType_SkipVerify}} +func newNoVerifyChecker(protoVersion uint32) handshake.CredentialChecker { + return &noVerifyChecker{ + protoVersion: protoVersion, + cred: &handshakeproto.Credentials{Type: handshakeproto.CredentialsType_SkipVerify}, + } } type noVerifyChecker struct { - cred *handshakeproto.Credentials + protoVersion uint32 + cred *handshakeproto.Credentials } func (n noVerifyChecker) MakeCredentials(sc sec.SecureConn) *handshakeproto.Credentials { @@ -22,15 +26,22 @@ func (n noVerifyChecker) MakeCredentials(sc sec.SecureConn) *handshakeproto.Cred } func (n noVerifyChecker) CheckCredential(sc sec.SecureConn, cred *handshakeproto.Credentials) (identity []byte, err error) { + if cred.Version != n.protoVersion { + return nil, handshake.ErrIncompatibleVersion + } return nil, nil } -func newPeerSignVerifier(account *accountdata.AccountKeys) handshake.CredentialChecker { - return &peerSignVerifier{account: account} +func newPeerSignVerifier(protoVersion uint32, account *accountdata.AccountKeys) handshake.CredentialChecker { + return &peerSignVerifier{ + protoVersion: protoVersion, + account: account, + } } type peerSignVerifier struct { - account *accountdata.AccountKeys + protoVersion uint32 + account *accountdata.AccountKeys } func (p *peerSignVerifier) MakeCredentials(sc sec.SecureConn) *handshakeproto.Credentials { @@ -52,6 +63,9 @@ func (p *peerSignVerifier) MakeCredentials(sc sec.SecureConn) *handshakeproto.Cr } func (p *peerSignVerifier) CheckCredential(sc sec.SecureConn, cred *handshakeproto.Credentials) (identity []byte, err error) { + if cred.Version != p.protoVersion { + return nil, handshake.ErrIncompatibleVersion + } if cred.Type != handshakeproto.CredentialsType_SignedPeerIds { return nil, handshake.ErrSkipVerifyNotAllowed } diff --git a/net/secureservice/credential_test.go b/net/secureservice/credential_test.go index 1bab8ac9..ad3853df 100644 --- a/net/secureservice/credential_test.go +++ b/net/secureservice/credential_test.go @@ -20,8 +20,8 @@ func TestPeerSignVerifier_CheckCredential(t *testing.T) { identity1, _ := a1.SignKey.GetPublic().Marshall() identity2, _ := a2.SignKey.GetPublic().Marshall() - cc1 := newPeerSignVerifier(a1) - cc2 := newPeerSignVerifier(a2) + cc1 := newPeerSignVerifier(0, a1) + cc2 := newPeerSignVerifier(0, a2) c1 := newTestSC(a2.PeerId) c2 := newTestSC(a1.PeerId) @@ -40,6 +40,31 @@ func TestPeerSignVerifier_CheckCredential(t *testing.T) { assert.EqualError(t, err, handshake.ErrInvalidCredentials.Error()) } +func TestIncompatibleVersion(t *testing.T) { + a1 := newTestAccData(t) + a2 := newTestAccData(t) + _, _ = a1.SignKey.GetPublic().Marshall() + identity2, _ := a2.SignKey.GetPublic().Marshall() + + cc1 := newPeerSignVerifier(0, a1) + cc2 := newPeerSignVerifier(1, a2) + + c1 := newTestSC(a2.PeerId) + c2 := newTestSC(a1.PeerId) + + cr1 := cc1.MakeCredentials(c1) + cr2 := cc2.MakeCredentials(c2) + id1, err := cc1.CheckCredential(c1, cr2) + assert.NoError(t, err) + assert.Equal(t, identity2, id1) + + _, err = cc2.CheckCredential(c2, cr1) + assert.EqualError(t, err, handshake.ErrIncompatibleVersion.Error()) + + _, err = cc1.CheckCredential(c1, cr1) + assert.EqualError(t, err, handshake.ErrInvalidCredentials.Error()) +} + func newTestAccData(t *testing.T) *accountdata.AccountKeys { as := accounttest.AccountTestService{} require.NoError(t, as.Init(nil)) diff --git a/net/secureservice/handshake/handshake.go b/net/secureservice/handshake/handshake.go index b1d2eabf..044311a9 100644 --- a/net/secureservice/handshake/handshake.go +++ b/net/secureservice/handshake/handshake.go @@ -34,6 +34,8 @@ var ( ErrSkipVerifyNotAllowed = handshakeError{handshakeproto.Error_SkipVerifyNotAllowed} ErrUnexpected = handshakeError{handshakeproto.Error_Unexpected} + ErrIncompatibleVersion = handshakeError{handshakeproto.Error_IncompatibleVersion} + ErrGotNotAHandshakeMessage = errors.New("go not a handshake message") ) diff --git a/net/secureservice/handshake/handshake_test.go b/net/secureservice/handshake/handshake_test.go index b766952f..8f332363 100644 --- a/net/secureservice/handshake/handshake_test.go +++ b/net/secureservice/handshake/handshake_test.go @@ -11,16 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "net" - "net/http" - _ "net/http/pprof" "testing" "time" ) -func init() { - go http.ListenAndServe(":6060", nil) -} - var noVerifyChecker = &testCredChecker{ makeCred: &handshakeproto.Credentials{Type: handshakeproto.CredentialsType_SkipVerify}, checkCred: func(sc sec.SecureConn, cred *handshakeproto.Credentials) (identity []byte, err error) { @@ -324,6 +318,26 @@ func TestIncomingHandshake(t *testing.T) { res := <-handshakeResCh require.EqualError(t, res.err, ErrInvalidCredentials.Error()) }) + t.Run("invalid cred version", func(t *testing.T) { + c1, c2 := newConnPair(t) + var handshakeResCh = make(chan handshakeRes, 1) + go func() { + identity, err := IncomingHandshake(nil, c1, &testCredChecker{makeCred: noVerifyChecker.makeCred, checkErr: ErrIncompatibleVersion}) + handshakeResCh <- handshakeRes{identity: identity, err: err} + }() + h := newHandshake() + h.conn = c2 + // write credentials + require.NoError(t, h.writeCredentials(noVerifyChecker.MakeCredentials(c2))) + // except ack with error + msg, err := h.readMsg() + require.NoError(t, err) + require.Nil(t, msg.cred) + require.Equal(t, handshakeproto.Error_IncompatibleVersion, msg.ack.Error) + + res := <-handshakeResCh + require.EqualError(t, res.err, ErrIncompatibleVersion.Error()) + }) t.Run("write cred instead ack", func(t *testing.T) { c1, c2 := newConnPair(t) var handshakeResCh = make(chan handshakeRes, 1) diff --git a/net/secureservice/handshake/handshakeproto/handshake.pb.go b/net/secureservice/handshake/handshakeproto/handshake.pb.go index d38b7e3d..3d868ef0 100644 --- a/net/secureservice/handshake/handshakeproto/handshake.pb.go +++ b/net/secureservice/handshake/handshakeproto/handshake.pb.go @@ -58,6 +58,7 @@ const ( Error_UnexpectedPayload Error = 3 Error_SkipVerifyNotAllowed Error = 4 Error_DeadlineExceeded Error = 5 + Error_IncompatibleVersion Error = 6 ) var Error_name = map[int32]string{ @@ -67,6 +68,7 @@ var Error_name = map[int32]string{ 3: "UnexpectedPayload", 4: "SkipVerifyNotAllowed", 5: "DeadlineExceeded", + 6: "IncompatibleVersion", } var Error_value = map[string]int32{ @@ -76,6 +78,7 @@ var Error_value = map[string]int32{ "UnexpectedPayload": 3, "SkipVerifyNotAllowed": 4, "DeadlineExceeded": 5, + "IncompatibleVersion": 6, } func (x Error) String() string { @@ -89,6 +92,7 @@ func (Error) EnumDescriptor() ([]byte, []int) { type Credentials struct { Type CredentialsType `protobuf:"varint,1,opt,name=type,proto3,enum=anyHandshake.CredentialsType" json:"type,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` } func (m *Credentials) Reset() { *m = Credentials{} } @@ -138,6 +142,13 @@ func (m *Credentials) GetPayload() []byte { return nil } +func (m *Credentials) GetVersion() uint32 { + if m != nil { + return m.Version + } + return 0 +} + type PayloadSignedPeerIds struct { // account identity Identity []byte `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` @@ -249,30 +260,32 @@ func init() { } var fileDescriptor_60283fc75f020893 = []byte{ - // 362 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x51, 0xcd, 0x8e, 0xda, 0x30, - 0x18, 0x8c, 0x21, 0xb4, 0xe8, 0x2b, 0xa5, 0xc6, 0xa5, 0x55, 0x54, 0xa9, 0x11, 0xe2, 0x44, 0x39, - 0x40, 0xff, 0x5e, 0x80, 0x16, 0xaa, 0x72, 0x41, 0x28, 0xb4, 0x3d, 0x70, 0x73, 0xe3, 0xaf, 0x60, - 0x61, 0x39, 0x91, 0x13, 0x28, 0xb9, 0xed, 0x23, 0xec, 0x63, 0xed, 0x91, 0xe3, 0x1e, 0x57, 0xf0, - 0x22, 0x2b, 0x0c, 0x2c, 0x61, 0x4f, 0x7b, 0xb1, 0xe7, 0x1b, 0x8f, 0x67, 0xc6, 0x32, 0xf4, 0x34, - 0xa6, 0xdd, 0x04, 0xc3, 0xa5, 0xc1, 0x04, 0xcd, 0x4a, 0x86, 0xd8, 0x9d, 0x73, 0x2d, 0x92, 0x39, - 0x5f, 0xe4, 0x50, 0x6c, 0xa2, 0x34, 0xea, 0xda, 0x35, 0x39, 0xb3, 0x1d, 0x4b, 0xb0, 0x0a, 0xd7, - 0xd9, 0xcf, 0x13, 0xd7, 0x9c, 0xc2, 0x8b, 0xef, 0x06, 0x05, 0xea, 0x54, 0x72, 0x95, 0xb0, 0x4f, - 0xe0, 0xa6, 0x59, 0x8c, 0x1e, 0x69, 0x90, 0x56, 0xf5, 0xf3, 0xfb, 0x4e, 0x5e, 0xdb, 0xc9, 0x09, - 0x7f, 0x65, 0x31, 0x06, 0x56, 0xca, 0x3c, 0x78, 0x1e, 0xf3, 0x4c, 0x45, 0x5c, 0x78, 0x85, 0x06, - 0x69, 0x55, 0x82, 0xd3, 0xd8, 0xfc, 0x01, 0xf5, 0xf1, 0x01, 0x4e, 0xe4, 0x4c, 0xa3, 0x18, 0x23, - 0x9a, 0xa1, 0x48, 0xd8, 0x3b, 0x28, 0x4b, 0x6b, 0x94, 0x66, 0x36, 0xa8, 0x12, 0x3c, 0xcc, 0x8c, - 0x81, 0x9b, 0xc8, 0x99, 0x3e, 0x5a, 0x59, 0xdc, 0xfc, 0x08, 0xc5, 0x5e, 0xb8, 0x60, 0x1f, 0xa0, - 0x84, 0xc6, 0x44, 0xe6, 0x58, 0xee, 0xf5, 0x65, 0xb9, 0xc1, 0xfe, 0x28, 0x38, 0x28, 0xda, 0x5f, - 0xe1, 0xd5, 0xa3, 0xb2, 0xac, 0x0a, 0x30, 0x59, 0xc8, 0xf8, 0x0f, 0x1a, 0xf9, 0x2f, 0xa3, 0x0e, - 0xab, 0xc1, 0xcb, 0x8b, 0x56, 0x94, 0xb4, 0xaf, 0x08, 0x94, 0xac, 0x0d, 0x2b, 0x83, 0x3b, 0x5a, - 0x2a, 0x45, 0x9d, 0xfd, 0xb5, 0xdf, 0x1a, 0xd7, 0x31, 0x86, 0x29, 0x0a, 0x4a, 0xd8, 0x5b, 0x60, - 0x43, 0xbd, 0xe2, 0x4a, 0x8a, 0x5c, 0x00, 0x2d, 0xb0, 0x37, 0x50, 0x3b, 0xeb, 0x8e, 0xaf, 0xa6, - 0x45, 0xe6, 0x41, 0xfd, 0x9c, 0x3a, 0x8a, 0xd2, 0x9e, 0x52, 0xd1, 0x7f, 0x14, 0xd4, 0x65, 0x75, - 0xa0, 0x7d, 0xe4, 0x42, 0x49, 0x8d, 0x83, 0x75, 0x88, 0x28, 0x50, 0xd0, 0xd2, 0xb7, 0xfe, 0xcd, - 0xd6, 0x27, 0x9b, 0xad, 0x4f, 0xee, 0xb6, 0x3e, 0xb9, 0xde, 0xf9, 0xce, 0x66, 0xe7, 0x3b, 0xb7, - 0x3b, 0xdf, 0x99, 0xb6, 0x9f, 0xfe, 0xf3, 0x7f, 0x9f, 0xd9, 0xed, 0xcb, 0x7d, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xa0, 0x48, 0xdf, 0x7a, 0x2e, 0x02, 0x00, 0x00, + // 395 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xcd, 0x6e, 0x13, 0x31, + 0x10, 0xc7, 0xd7, 0x4d, 0x52, 0xaa, 0x21, 0x2d, 0xee, 0x34, 0xc0, 0x0a, 0x89, 0x55, 0x94, 0x53, + 0xc8, 0x21, 0xe1, 0xeb, 0x05, 0x02, 0x2d, 0x22, 0x97, 0xaa, 0xda, 0x42, 0x0f, 0xdc, 0xdc, 0xf5, + 0xd0, 0x5a, 0x31, 0xf6, 0xca, 0x76, 0x43, 0xf7, 0x2d, 0xb8, 0xf2, 0x46, 0x1c, 0x7b, 0xe4, 0x88, + 0x92, 0x17, 0x41, 0x71, 0x12, 0x92, 0x70, 0xea, 0xc5, 0x9e, 0x8f, 0x9f, 0xfd, 0xff, 0x8f, 0x65, + 0x18, 0x1a, 0x0a, 0x03, 0x4f, 0xc5, 0x8d, 0x23, 0x4f, 0x6e, 0xa2, 0x0a, 0x1a, 0x5c, 0x0b, 0x23, + 0xfd, 0xb5, 0x18, 0x6f, 0x44, 0xa5, 0xb3, 0xc1, 0x0e, 0xe2, 0xea, 0xd7, 0xd5, 0x7e, 0x2c, 0x60, + 0x53, 0x98, 0xea, 0xe3, 0xaa, 0xd6, 0x09, 0xf0, 0xf0, 0xbd, 0x23, 0x49, 0x26, 0x28, 0xa1, 0x3d, + 0xbe, 0x82, 0x7a, 0xa8, 0x4a, 0x4a, 0x59, 0x9b, 0x75, 0x0f, 0x5e, 0x3f, 0xef, 0x6f, 0xb2, 0xfd, + 0x0d, 0xf0, 0x53, 0x55, 0x52, 0x1e, 0x51, 0x4c, 0xe1, 0x41, 0x29, 0x2a, 0x6d, 0x85, 0x4c, 0x77, + 0xda, 0xac, 0xdb, 0xcc, 0x57, 0xe9, 0xbc, 0x33, 0x21, 0xe7, 0x95, 0x35, 0x69, 0xad, 0xcd, 0xba, + 0xfb, 0xf9, 0x2a, 0xed, 0x7c, 0x80, 0xd6, 0xd9, 0x02, 0x3a, 0x57, 0x57, 0x86, 0xe4, 0x19, 0x91, + 0x1b, 0x49, 0x8f, 0xcf, 0x60, 0x4f, 0x45, 0x89, 0x50, 0x45, 0x0b, 0xcd, 0xfc, 0x5f, 0x8e, 0x08, + 0x75, 0xaf, 0xae, 0xcc, 0x52, 0x24, 0xc6, 0x9d, 0x97, 0x50, 0x1b, 0x16, 0x63, 0x7c, 0x01, 0x0d, + 0x72, 0xce, 0xba, 0xa5, 0xed, 0xa3, 0x6d, 0xdb, 0x27, 0xf3, 0x56, 0xbe, 0x20, 0x7a, 0x6f, 0xe1, + 0xd1, 0x7f, 0x63, 0xe0, 0x01, 0xc0, 0xf9, 0x58, 0x95, 0x17, 0xe4, 0xd4, 0xd7, 0x8a, 0x27, 0x78, + 0x08, 0xfb, 0x5b, 0xae, 0x38, 0xeb, 0xfd, 0x64, 0xd0, 0x88, 0xd7, 0xe0, 0x1e, 0xd4, 0x4f, 0x6f, + 0xb4, 0xe6, 0xc9, 0xfc, 0xd8, 0x67, 0x43, 0xb7, 0x25, 0x15, 0x81, 0x24, 0x67, 0xf8, 0x04, 0x70, + 0x64, 0x26, 0x42, 0x2b, 0xb9, 0x21, 0xc0, 0x77, 0xf0, 0x31, 0x1c, 0xae, 0xb9, 0xe5, 0xd4, 0xbc, + 0x86, 0x29, 0xb4, 0xd6, 0xaa, 0xa7, 0x36, 0x0c, 0xb5, 0xb6, 0xdf, 0x49, 0xf2, 0x3a, 0xb6, 0x80, + 0x1f, 0x93, 0x90, 0x5a, 0x19, 0x3a, 0xb9, 0x2d, 0x88, 0x24, 0x49, 0xde, 0xc0, 0xa7, 0x70, 0x34, + 0x32, 0x85, 0xfd, 0x56, 0x8a, 0xa0, 0x2e, 0x35, 0x5d, 0x2c, 0x5e, 0x92, 0xef, 0xbe, 0x3b, 0xfe, + 0x35, 0xcd, 0xd8, 0xdd, 0x34, 0x63, 0x7f, 0xa6, 0x19, 0xfb, 0x31, 0xcb, 0x92, 0xbb, 0x59, 0x96, + 0xfc, 0x9e, 0x65, 0xc9, 0x97, 0xde, 0xfd, 0x3f, 0xcb, 0xe5, 0x6e, 0xdc, 0xde, 0xfc, 0x0d, 0x00, + 0x00, 0xff, 0xff, 0xbf, 0x78, 0x2f, 0x36, 0x61, 0x02, 0x00, 0x00, } func (m *Credentials) Marshal() (dAtA []byte, err error) { @@ -295,6 +308,11 @@ func (m *Credentials) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Version != 0 { + i = encodeVarintHandshake(dAtA, i, uint64(m.Version)) + i-- + dAtA[i] = 0x18 + } if len(m.Payload) > 0 { i -= len(m.Payload) copy(dAtA[i:], m.Payload) @@ -399,6 +417,9 @@ func (m *Credentials) Size() (n int) { if l > 0 { n += 1 + l + sovHandshake(uint64(l)) } + if m.Version != 0 { + n += 1 + sovHandshake(uint64(m.Version)) + } return n } @@ -519,6 +540,25 @@ func (m *Credentials) Unmarshal(dAtA []byte) error { m.Payload = []byte{} } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + m.Version = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHandshake + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Version |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipHandshake(dAtA[iNdEx:]) diff --git a/net/secureservice/handshake/handshakeproto/protos/handshake.proto b/net/secureservice/handshake/handshakeproto/protos/handshake.proto index 8f014248..cca5822e 100644 --- a/net/secureservice/handshake/handshakeproto/protos/handshake.proto +++ b/net/secureservice/handshake/handshakeproto/protos/handshake.proto @@ -36,6 +36,7 @@ Successful handshake scheme: message Credentials { CredentialsType type = 1; bytes payload = 2; + uint32 version = 3; } enum CredentialsType { @@ -66,4 +67,5 @@ enum Error { UnexpectedPayload = 3; SkipVerifyNotAllowed = 4; DeadlineExceeded = 5; + IncompatibleVersion = 6; } \ No newline at end of file diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index 918c1987..3d7e9045 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -44,10 +44,11 @@ type SecureService interface { } type secureService struct { - p2pTr *libp2ptls.Transport - account *accountdata.AccountKeys - key crypto.PrivKey - nodeconf nodeconf.Service + p2pTr *libp2ptls.Transport + account *accountdata.AccountKeys + key crypto.PrivKey + nodeconf nodeconf.Service + protoVersion uint32 noVerifyChecker handshake.CredentialChecker peerSignVerifier handshake.CredentialChecker @@ -64,8 +65,8 @@ func (s *secureService) Init(a *app.App) (err error) { return } - s.noVerifyChecker = newNoVerifyChecker() - s.peerSignVerifier = newPeerSignVerifier(account.Account()) + s.noVerifyChecker = newNoVerifyChecker(s.protoVersion) + s.peerSignVerifier = newPeerSignVerifier(s.protoVersion, account.Account()) s.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service) diff --git a/net/secureservice/secureservice_test.go b/net/secureservice/secureservice_test.go index e5d4e5d9..a70a883f 100644 --- a/net/secureservice/secureservice_test.go +++ b/net/secureservice/secureservice_test.go @@ -5,6 +5,7 @@ import ( "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/secureservice/handshake" "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf/mock_nodeconf" "github.com/anytypeio/any-sync/testutil/testnodeconf" @@ -19,7 +20,7 @@ var ctx = context.Background() func TestHandshake(t *testing.T) { nc := testnodeconf.GenNodeConfig(2) - fxS := newFixture(t, nc, nc.GetAccountService(0)) + fxS := newFixture(t, nc, nc.GetAccountService(0), 0) defer fxS.Finish(t) sc, cc := net.Pipe() @@ -35,7 +36,7 @@ func TestHandshake(t *testing.T) { resCh <- ar }() - fxC := newFixture(t, nc, nc.GetAccountService(1)) + fxC := newFixture(t, nc, nc.GetAccountService(1), 0) defer fxC.Finish(t) secConn, err := fxC.SecureOutbound(ctx, cc) @@ -52,13 +53,39 @@ func TestHandshake(t *testing.T) { assert.Equal(t, marshalledId, accId) } -func newFixture(t *testing.T, nc *testnodeconf.Config, acc accountservice.Service) *fixture { +func TestHandshakeIncompatibleVersion(t *testing.T) { + nc := testnodeconf.GenNodeConfig(2) + fxS := newFixture(t, nc, nc.GetAccountService(0), 0) + defer fxS.Finish(t) + sc, cc := net.Pipe() + + type acceptRes struct { + ctx context.Context + conn net.Conn + err error + } + resCh := make(chan acceptRes) + go func() { + var ar acceptRes + ar.ctx, ar.conn, ar.err = fxS.SecureInbound(ctx, sc) + resCh <- ar + }() + fxC := newFixture(t, nc, nc.GetAccountService(1), 1) + defer fxC.Finish(t) + _, err := fxC.SecureOutbound(ctx, cc) + require.EqualError(t, err, handshake.ErrIncompatibleVersion.Error()) + res := <-resCh + require.EqualError(t, res.err, handshake.ErrIncompatibleVersion.Error()) +} + +func newFixture(t *testing.T, nc *testnodeconf.Config, acc accountservice.Service, protoVersion uint32) *fixture { fx := &fixture{ ctrl: gomock.NewController(t), secureService: New().(*secureService), acc: acc, a: new(app.App), } + fx.secureService.protoVersion = protoVersion fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl) fx.mockNodeConf.EXPECT().Init(gomock.Any()) fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes() From 0aabebb1102fb730e86625ad33ae2ad60d7a1f4d Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 22 May 2023 15:31:55 +0200 Subject: [PATCH 07/10] make credentials with version, fix test --- net/secureservice/credential.go | 9 ++++----- net/secureservice/credential_test.go | 6 ++---- net/secureservice/secureservice.go | 1 - 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/net/secureservice/credential.go b/net/secureservice/credential.go index b0341226..b754b450 100644 --- a/net/secureservice/credential.go +++ b/net/secureservice/credential.go @@ -11,14 +11,12 @@ import ( func newNoVerifyChecker(protoVersion uint32) handshake.CredentialChecker { return &noVerifyChecker{ - protoVersion: protoVersion, - cred: &handshakeproto.Credentials{Type: handshakeproto.CredentialsType_SkipVerify}, + cred: &handshakeproto.Credentials{Type: handshakeproto.CredentialsType_SkipVerify, Version: protoVersion}, } } type noVerifyChecker struct { - protoVersion uint32 - cred *handshakeproto.Credentials + cred *handshakeproto.Credentials } func (n noVerifyChecker) MakeCredentials(sc sec.SecureConn) *handshakeproto.Credentials { @@ -26,7 +24,7 @@ func (n noVerifyChecker) MakeCredentials(sc sec.SecureConn) *handshakeproto.Cred } func (n noVerifyChecker) CheckCredential(sc sec.SecureConn, cred *handshakeproto.Credentials) (identity []byte, err error) { - if cred.Version != n.protoVersion { + if cred.Version != n.cred.Version { return nil, handshake.ErrIncompatibleVersion } return nil, nil @@ -59,6 +57,7 @@ func (p *peerSignVerifier) MakeCredentials(sc sec.SecureConn) *handshakeproto.Cr return &handshakeproto.Credentials{ Type: handshakeproto.CredentialsType_SignedPeerIds, Payload: payload, + Version: p.protoVersion, } } diff --git a/net/secureservice/credential_test.go b/net/secureservice/credential_test.go index ad3853df..c3ae8338 100644 --- a/net/secureservice/credential_test.go +++ b/net/secureservice/credential_test.go @@ -44,7 +44,6 @@ func TestIncompatibleVersion(t *testing.T) { a1 := newTestAccData(t) a2 := newTestAccData(t) _, _ = a1.SignKey.GetPublic().Marshall() - identity2, _ := a2.SignKey.GetPublic().Marshall() cc1 := newPeerSignVerifier(0, a1) cc2 := newPeerSignVerifier(1, a2) @@ -54,9 +53,8 @@ func TestIncompatibleVersion(t *testing.T) { cr1 := cc1.MakeCredentials(c1) cr2 := cc2.MakeCredentials(c2) - id1, err := cc1.CheckCredential(c1, cr2) - assert.NoError(t, err) - assert.Equal(t, identity2, id1) + _, err := cc1.CheckCredential(c1, cr2) + assert.EqualError(t, err, handshake.ErrIncompatibleVersion.Error()) _, err = cc2.CheckCredential(c2, cr1) assert.EqualError(t, err, handshake.ErrIncompatibleVersion.Error()) diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index 3d7e9045..2167b20c 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -64,7 +64,6 @@ func (s *secureService) Init(a *app.App) (err error) { if s.key, err = crypto.UnmarshalEd25519PrivateKey(peerKey); err != nil { return } - s.noVerifyChecker = newNoVerifyChecker(s.protoVersion) s.peerSignVerifier = newPeerSignVerifier(s.protoVersion, account.Account()) From c17166b383b09d53c95749456b3020354d297af5 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 22 May 2023 17:57:00 +0200 Subject: [PATCH 08/10] propagate handshake error + NetworkCompatibilityStatus method --- net/config.go | 6 + net/dialer/dialer.go | 4 + net/pool/pool.go | 21 ++- net/pool/pool_test.go | 14 +- net/secureservice/handshake/handshake.go | 24 ++-- net/streampool/streampool.go | 4 +- nodeconf/mock_nodeconf/mock_nodeconf.go | 14 ++ nodeconf/nodeconf.go | 1 - nodeconf/service.go | 43 +++++- nodeconf/service_test.go | 175 +++++++++++++++++++++++ testutil/accounttest/accountservice.go | 9 -- testutil/testnodeconf/testnodeconf.go | 5 +- 12 files changed, 286 insertions(+), 34 deletions(-) create mode 100644 nodeconf/service_test.go diff --git a/net/config.go b/net/config.go index b0cdf564..333261dd 100644 --- a/net/config.go +++ b/net/config.go @@ -1,5 +1,11 @@ package net +import "errors" + +var ( + ErrUnableToConnect = errors.New("unable to connect") +) + type ConfigGetter interface { GetNet() Config } diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index d862e78f..1992e6e5 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -9,6 +9,7 @@ import ( net2 "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/secureservice" + "github.com/anytypeio/any-sync/net/secureservice/handshake" "github.com/anytypeio/any-sync/net/timeoutconn" "github.com/anytypeio/any-sync/nodeconf" "github.com/libp2p/go-libp2p/core/sec" @@ -120,6 +121,9 @@ func (d *dialer) handshake(ctx context.Context, addr, peerId string) (conn drpc. timeoutConn := timeoutconn.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) sc, err = d.transport.SecureOutbound(ctx, timeoutConn) if err != nil { + if he, ok := err.(handshake.HandshakeError); ok { + return nil, nil, he + } return nil, nil, fmt.Errorf("tls handshaeke error: %v; since start: %v", err, time.Since(st)) } if peerId != sc.RemotePeer().String() { diff --git a/net/pool/pool.go b/net/pool/pool.go index b7e391c6..81698e5a 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -2,18 +2,15 @@ package pool import ( "context" - "errors" "github.com/anytypeio/any-sync/app/ocache" + "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/dialer" "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/secureservice/handshake" "go.uber.org/zap" "math/rand" ) -var ( - ErrUnableToConnect = errors.New("unable to connect") -) - // Pool creates and caches outgoing connection type Pool interface { // Get lookups to peer in existing connections or creates and cache new one @@ -76,14 +73,19 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error peerIds[i], peerIds[j] = peerIds[j], peerIds[i] }) // connecting + var lastErr error for _, peerId := range peerIds { if v, err := p.cache.Get(ctx, peerId); err == nil { return v.(peer.Peer), nil } else { log.Debug("unable to connect", zap.String("peerId", peerId), zap.Error(err)) + lastErr = err } } - return nil, ErrUnableToConnect + if _, ok := lastErr.(handshake.HandshakeError); !ok { + lastErr = net.ErrUnableToConnect + } + return nil, lastErr } func (p *pool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { @@ -92,14 +94,19 @@ func (p *pool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, erro peerIds[i], peerIds[j] = peerIds[j], peerIds[i] }) // connecting + var lastErr error for _, peerId := range peerIds { if v, err := p.dialer.Dial(ctx, peerId); err == nil { return v.(peer.Peer), nil } else { log.Debug("unable to connect", zap.String("peerId", peerId), zap.Error(err)) + lastErr = err } } - return nil, ErrUnableToConnect + if _, ok := lastErr.(handshake.HandshakeError); !ok { + lastErr = net.ErrUnableToConnect + } + return nil, lastErr } func (p *pool) Close(ctx context.Context) (err error) { diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index 262a59f6..40d33cfa 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/dialer" "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/secureservice/handshake" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "storj.io/drpc" @@ -116,7 +118,17 @@ func TestPool_GetOneOf(t *testing.T) { return nil, fmt.Errorf("persistent error") } p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"}) - assert.Equal(t, ErrUnableToConnect, err) + assert.Equal(t, net.ErrUnableToConnect, err) + assert.Nil(t, p) + }) + t.Run("handshake error", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish() + fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { + return nil, handshake.ErrIncompatibleVersion + } + p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"}) + assert.Equal(t, handshake.ErrIncompatibleVersion, err) assert.Nil(t, p) }) } diff --git a/net/secureservice/handshake/handshake.go b/net/secureservice/handshake/handshake.go index 044311a9..1faef9e9 100644 --- a/net/secureservice/handshake/handshake.go +++ b/net/secureservice/handshake/handshake.go @@ -18,23 +18,23 @@ const ( msgTypeAck = byte(2) ) -type handshakeError struct { +type HandshakeError struct { e handshakeproto.Error } -func (he handshakeError) Error() string { +func (he HandshakeError) Error() string { return he.e.String() } var ( - ErrUnexpectedPayload = handshakeError{handshakeproto.Error_UnexpectedPayload} - ErrDeadlineExceeded = handshakeError{handshakeproto.Error_DeadlineExceeded} - ErrInvalidCredentials = handshakeError{handshakeproto.Error_InvalidCredentials} + ErrUnexpectedPayload = HandshakeError{handshakeproto.Error_UnexpectedPayload} + ErrDeadlineExceeded = HandshakeError{handshakeproto.Error_DeadlineExceeded} + ErrInvalidCredentials = HandshakeError{handshakeproto.Error_InvalidCredentials} ErrPeerDeclinedCredentials = errors.New("remote peer declined the credentials") - ErrSkipVerifyNotAllowed = handshakeError{handshakeproto.Error_SkipVerifyNotAllowed} - ErrUnexpected = handshakeError{handshakeproto.Error_Unexpected} + ErrSkipVerifyNotAllowed = HandshakeError{handshakeproto.Error_SkipVerifyNotAllowed} + ErrUnexpected = HandshakeError{handshakeproto.Error_Unexpected} - ErrIncompatibleVersion = handshakeError{handshakeproto.Error_IncompatibleVersion} + ErrIncompatibleVersion = HandshakeError{handshakeproto.Error_IncompatibleVersion} ErrGotNotAHandshakeMessage = errors.New("go not a handshake message") ) @@ -89,7 +89,7 @@ func outgoingHandshake(h *handshake, sc sec.SecureConn, cc CredentialChecker) (i if msg.ack.Error == handshakeproto.Error_InvalidCredentials { return nil, ErrPeerDeclinedCredentials } - return nil, handshakeError{e: msg.ack.Error} + return nil, HandshakeError{e: msg.ack.Error} } if identity, err = cc.CheckCredential(sc, msg.cred); err != nil { @@ -116,7 +116,7 @@ func outgoingHandshake(h *handshake, sc sec.SecureConn, cc CredentialChecker) (i return identity, nil } else { _ = h.conn.Close() - return nil, handshakeError{e: msg.ack.Error} + return nil, HandshakeError{e: msg.ack.Error} } } @@ -175,7 +175,7 @@ func incomingHandshake(h *handshake, sc sec.SecureConn, cc CredentialChecker) (i if msg.ack.Error == handshakeproto.Error_InvalidCredentials { return nil, ErrPeerDeclinedCredentials } - return nil, handshakeError{e: msg.ack.Error} + return nil, HandshakeError{e: msg.ack.Error} } if err = h.writeAck(handshakeproto.Error_Null); err != nil { h.tryWriteErrAndClose(err) @@ -212,7 +212,7 @@ func (h *handshake) tryWriteErrAndClose(err error) { return } var ackErr handshakeproto.Error - if he, ok := err.(handshakeError); ok { + if he, ok := err.(HandshakeError); ok { ackErr = he.e } else { ackErr = handshakeproto.Error_Unexpected diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 442a5097..42e3a0cb 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -2,8 +2,8 @@ package streampool import ( "fmt" + "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/util/multiqueue" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -172,7 +172,7 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ... } } if len(streamsByPeer) == 0 { - return pool.ErrUnableToConnect + return net.ErrUnableToConnect } return } diff --git a/nodeconf/mock_nodeconf/mock_nodeconf.go b/nodeconf/mock_nodeconf/mock_nodeconf.go index b422c8a9..dc261709 100644 --- a/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -177,6 +177,20 @@ func (mr *MockServiceMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name)) } +// NetworkCompatibilityStatus mocks base method. +func (m *MockService) NetworkCompatibilityStatus() nodeconf.NetworkCompatibilityStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NetworkCompatibilityStatus") + ret0, _ := ret[0].(nodeconf.NetworkCompatibilityStatus) + return ret0 +} + +// NetworkCompatibilityStatus indicates an expected call of NetworkCompatibilityStatus. +func (mr *MockServiceMockRecorder) NetworkCompatibilityStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetworkCompatibilityStatus", reflect.TypeOf((*MockService)(nil).NetworkCompatibilityStatus)) +} + // NodeIds mocks base method. func (m *MockService) NodeIds(arg0 string) []string { m.ctrl.T.Helper() diff --git a/nodeconf/nodeconf.go b/nodeconf/nodeconf.go index ac665b9f..c49893d5 100644 --- a/nodeconf/nodeconf.go +++ b/nodeconf/nodeconf.go @@ -1,4 +1,3 @@ -//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/any-sync/nodeconf Service package nodeconf import ( diff --git a/nodeconf/service.go b/nodeconf/service.go index d19d0874..be8b5dc6 100644 --- a/nodeconf/service.go +++ b/nodeconf/service.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/any-sync/nodeconf Service package nodeconf import ( @@ -5,6 +6,8 @@ import ( commonaccount "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" + "github.com/anytypeio/any-sync/net" + "github.com/anytypeio/any-sync/net/secureservice/handshake" "github.com/anytypeio/any-sync/util/periodicsync" "github.com/anytypeio/go-chash" "go.uber.org/zap" @@ -20,12 +23,22 @@ const ( var log = logger.NewNamed(CName) +type NetworkCompatibilityStatus int + +const ( + NetworkCompatibilityStatusUnknown NetworkCompatibilityStatus = iota + NetworkCompatibilityStatusOk + NetworkCompatibilityStatusError + NetworkCompatibilityStatusIncompatible +) + func New() Service { return new(service) } type Service interface { NodeConf + NetworkCompatibilityStatus() NetworkCompatibilityStatus app.ComponentRunnable } @@ -37,6 +50,8 @@ type service struct { last NodeConf mu sync.RWMutex sync periodicsync.PeriodicSync + + compatibilityStatus NetworkCompatibilityStatus } func (s *service) Init(a *app.App) (err error) { @@ -75,10 +90,19 @@ func (s *service) Run(_ context.Context) (err error) { return } +func (s *service) NetworkCompatibilityStatus() NetworkCompatibilityStatus { + s.mu.RLock() + defer s.mu.RUnlock() + return s.compatibilityStatus +} + func (s *service) updateConfiguration(ctx context.Context) (err error) { last, err := s.source.GetLast(ctx, s.Configuration().Id) if err != nil { + s.setCompatibilityStatusByErr(err) return + } else { + s.setCompatibilityStatusByErr(nil) } if err = s.store.SaveLast(ctx, last); err != nil { return @@ -139,6 +163,21 @@ func (s *service) setLastConfiguration(c Configuration) (err error) { return } +func (s *service) setCompatibilityStatusByErr(err error) { + s.mu.Lock() + defer s.mu.Unlock() + switch err { + case nil: + s.compatibilityStatus = NetworkCompatibilityStatusOk + case handshake.ErrIncompatibleVersion: + s.compatibilityStatus = NetworkCompatibilityStatusIncompatible + case net.ErrUnableToConnect: + s.compatibilityStatus = NetworkCompatibilityStatusUnknown + default: + s.compatibilityStatus = NetworkCompatibilityStatusError + } +} + func (s *service) Id() string { s.mu.RLock() defer s.mu.RUnlock() @@ -206,6 +245,8 @@ func (s *service) NodeTypes(nodeId string) []NodeType { } func (s *service) Close(ctx context.Context) (err error) { - s.sync.Close() + if s.sync != nil { + s.sync.Close() + } return } diff --git a/nodeconf/service_test.go b/nodeconf/service_test.go new file mode 100644 index 00000000..5ed8154c --- /dev/null +++ b/nodeconf/service_test.go @@ -0,0 +1,175 @@ +package nodeconf + +import ( + "context" + "errors" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/net" + "github.com/anytypeio/any-sync/net/secureservice/handshake" + "github.com/anytypeio/any-sync/testutil/accounttest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sync" + "testing" + "time" +) + +var ctx = context.Background() + +func TestService_NetworkCompatibilityStatus(t *testing.T) { + t.Run("unknown", func(t *testing.T) { + fx := newFixture(t) + defer fx.finish(t) + fx.testSource.call = func() (c Configuration, e error) { + e = net.ErrUnableToConnect + return + } + fx.run(t) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, NetworkCompatibilityStatusUnknown, fx.NetworkCompatibilityStatus()) + }) + t.Run("incompatible", func(t *testing.T) { + fx := newFixture(t) + defer fx.finish(t) + fx.testSource.err = handshake.ErrIncompatibleVersion + fx.run(t) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, NetworkCompatibilityStatusIncompatible, fx.NetworkCompatibilityStatus()) + }) + t.Run("error", func(t *testing.T) { + fx := newFixture(t) + defer fx.finish(t) + fx.testSource.call = func() (c Configuration, e error) { + e = errors.New("some error") + return + } + fx.run(t) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, NetworkCompatibilityStatusError, fx.NetworkCompatibilityStatus()) + }) + t.Run("ok", func(t *testing.T) { + fx := newFixture(t) + defer fx.finish(t) + fx.run(t) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, NetworkCompatibilityStatusOk, fx.NetworkCompatibilityStatus()) + }) +} + +func newFixture(t *testing.T) *fixture { + fx := &fixture{ + Service: New(), + a: new(app.App), + testStore: &testStore{}, + testSource: &testSource{}, + testConf: newTestConf(), + } + fx.a.Register(fx.testConf).Register(&accounttest.AccountTestService{}).Register(fx.Service).Register(fx.testSource).Register(fx.testStore) + return fx +} + +type fixture struct { + Service + a *app.App + testStore *testStore + testSource *testSource + testConf *testConf +} + +func (fx *fixture) run(t *testing.T) { + require.NoError(t, fx.a.Start(ctx)) +} + +func (fx *fixture) finish(t *testing.T) { + require.NoError(t, fx.a.Close(ctx)) +} + +type testSource struct { + conf Configuration + err error + call func() (Configuration, error) +} + +func (t *testSource) Init(a *app.App) error { return nil } +func (t *testSource) Name() string { return CNameSource } + +func (t *testSource) GetLast(ctx context.Context, currentId string) (c Configuration, err error) { + if t.call != nil { + return t.call() + } + return t.conf, t.err +} + +type testStore struct { + conf *Configuration + mu sync.Mutex +} + +func (t *testStore) Init(a *app.App) error { return nil } +func (t *testStore) Name() string { return CNameStore } + +func (t *testStore) GetLast(ctx context.Context, netId string) (c Configuration, err error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.conf != nil { + return *t.conf, nil + } else { + err = ErrConfigurationNotFound + } + return +} + +func (t *testStore) SaveLast(ctx context.Context, c Configuration) (err error) { + t.mu.Lock() + defer t.mu.Unlock() + t.conf = &c + return +} + +type testConf struct { + Configuration +} + +func (t *testConf) Init(a *app.App) error { return nil } +func (t *testConf) Name() string { return "config" } + +func (t *testConf) GetNodeConf() Configuration { + return t.Configuration +} + +func newTestConf() *testConf { + return &testConf{ + Configuration{ + Id: "test", + NetworkId: "testNetwork", + Nodes: []Node{ + { + PeerId: "12D3KooWKLCajM89S8unbt3tgGbRLgmiWnFZT3adn9A5pQciBSLa", + Addresses: []string{"127.0.0.1:4830"}, + Types: []NodeType{NodeTypeCoordinator}, + }, + { + PeerId: "12D3KooWKnXTtbveMDUFfeSqR5dt9a4JW66tZQXG7C7PdDh3vqGu", + Addresses: []string{"127.0.0.1:4730"}, + Types: []NodeType{NodeTypeTree}, + }, + { + PeerId: "12D3KooWKgVN2kW8xw5Uvm2sLUnkeUNQYAvcWvF58maTzev7FjPi", + Addresses: []string{"127.0.0.1:4731"}, + Types: []NodeType{NodeTypeTree}, + }, + { + PeerId: "12D3KooWCUPYuMnQhu9yREJgQyjcz8zWY83rZGmDLwb9YR6QkbZX", + Addresses: []string{"127.0.0.1:4732"}, + Types: []NodeType{NodeTypeTree}, + }, + { + PeerId: "12D3KooWQxiZ5a7vcy4DTJa8Gy1eVUmwb5ojN4SrJC9Rjxzigw6C", + Addresses: []string{"127.0.0.1:4733"}, + Types: []NodeType{NodeTypeFile}, + }, + }, + CreationTime: time.Now(), + }, + } +} diff --git a/testutil/accounttest/accountservice.go b/testutil/accounttest/accountservice.go index eaa2a139..610f640b 100644 --- a/testutil/accounttest/accountservice.go +++ b/testutil/accounttest/accountservice.go @@ -4,7 +4,6 @@ import ( accountService "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/commonspace/object/accountdata" - "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/crypto" ) @@ -46,11 +45,3 @@ func (s *AccountTestService) Name() (name string) { func (s *AccountTestService) Account() *accountdata.AccountKeys { return s.acc } - -func (s *AccountTestService) NodeConf(addrs []string) nodeconf.Node { - return nodeconf.Node{ - PeerId: s.acc.PeerId, - Addresses: addrs, - Types: []nodeconf.NodeType{nodeconf.NodeTypeTree}, - } -} diff --git a/testutil/testnodeconf/testnodeconf.go b/testutil/testnodeconf/testnodeconf.go index ca1c4d06..a0188369 100644 --- a/testutil/testnodeconf/testnodeconf.go +++ b/testutil/testnodeconf/testnodeconf.go @@ -17,7 +17,10 @@ func GenNodeConfig(num int) (conf *Config) { if err := ac.Init(nil); err != nil { panic(err) } - conf.nodes.Nodes = append(conf.nodes.Nodes, ac.NodeConf(nil)) + conf.nodes.Nodes = append(conf.nodes.Nodes, nodeconf.Node{ + PeerId: ac.Account().PeerId, + Types: []nodeconf.NodeType{nodeconf.NodeTypeTree}, + }) conf.configs = append(conf.configs, ac) } return conf From cfca99cf1934eed263cb0c102b08c1db92b1e0fa Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 22 May 2023 18:00:30 +0200 Subject: [PATCH 09/10] fix HandshakeError --- net/secureservice/handshake/handshake.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/net/secureservice/handshake/handshake.go b/net/secureservice/handshake/handshake.go index 1faef9e9..271de569 100644 --- a/net/secureservice/handshake/handshake.go +++ b/net/secureservice/handshake/handshake.go @@ -19,22 +19,26 @@ const ( ) type HandshakeError struct { - e handshakeproto.Error + err error + e handshakeproto.Error } func (he HandshakeError) Error() string { + if he.err != nil { + return he.err.Error() + } return he.e.String() } var ( - ErrUnexpectedPayload = HandshakeError{handshakeproto.Error_UnexpectedPayload} - ErrDeadlineExceeded = HandshakeError{handshakeproto.Error_DeadlineExceeded} - ErrInvalidCredentials = HandshakeError{handshakeproto.Error_InvalidCredentials} - ErrPeerDeclinedCredentials = errors.New("remote peer declined the credentials") - ErrSkipVerifyNotAllowed = HandshakeError{handshakeproto.Error_SkipVerifyNotAllowed} - ErrUnexpected = HandshakeError{handshakeproto.Error_Unexpected} + ErrUnexpectedPayload = HandshakeError{e: handshakeproto.Error_UnexpectedPayload} + ErrDeadlineExceeded = HandshakeError{e: handshakeproto.Error_DeadlineExceeded} + ErrInvalidCredentials = HandshakeError{e: handshakeproto.Error_InvalidCredentials} + ErrPeerDeclinedCredentials = HandshakeError{err: errors.New("remote peer declined the credentials")} + ErrSkipVerifyNotAllowed = HandshakeError{e: handshakeproto.Error_SkipVerifyNotAllowed} + ErrUnexpected = HandshakeError{e: handshakeproto.Error_Unexpected} - ErrIncompatibleVersion = HandshakeError{handshakeproto.Error_IncompatibleVersion} + ErrIncompatibleVersion = HandshakeError{e: handshakeproto.Error_IncompatibleVersion} ErrGotNotAHandshakeMessage = errors.New("go not a handshake message") ) From 7a778a5c9ad1a24d016e1d63254a7cf2dbb3107e Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 22 May 2023 19:17:20 +0200 Subject: [PATCH 10/10] refactor handshake err --- net/secureservice/handshake/handshake.go | 8 +++--- net/secureservice/handshake/handshake_test.go | 2 +- net/secureservice/secureservice.go | 27 ++++--------------- net/secureservice/secureservice_test.go | 4 +-- 4 files changed, 12 insertions(+), 29 deletions(-) diff --git a/net/secureservice/handshake/handshake.go b/net/secureservice/handshake/handshake.go index 271de569..24cc84a8 100644 --- a/net/secureservice/handshake/handshake.go +++ b/net/secureservice/handshake/handshake.go @@ -19,13 +19,13 @@ const ( ) type HandshakeError struct { - err error + Err error e handshakeproto.Error } func (he HandshakeError) Error() string { - if he.err != nil { - return he.err.Error() + if he.Err != nil { + return he.Err.Error() } return he.e.String() } @@ -34,7 +34,7 @@ var ( ErrUnexpectedPayload = HandshakeError{e: handshakeproto.Error_UnexpectedPayload} ErrDeadlineExceeded = HandshakeError{e: handshakeproto.Error_DeadlineExceeded} ErrInvalidCredentials = HandshakeError{e: handshakeproto.Error_InvalidCredentials} - ErrPeerDeclinedCredentials = HandshakeError{err: errors.New("remote peer declined the credentials")} + ErrPeerDeclinedCredentials = HandshakeError{Err: errors.New("remote peer declined the credentials")} ErrSkipVerifyNotAllowed = HandshakeError{e: handshakeproto.Error_SkipVerifyNotAllowed} ErrUnexpected = HandshakeError{e: handshakeproto.Error_Unexpected} diff --git a/net/secureservice/handshake/handshake_test.go b/net/secureservice/handshake/handshake_test.go index 8f332363..d548e9a0 100644 --- a/net/secureservice/handshake/handshake_test.go +++ b/net/secureservice/handshake/handshake_test.go @@ -336,7 +336,7 @@ func TestIncomingHandshake(t *testing.T) { require.Equal(t, handshakeproto.Error_IncompatibleVersion, msg.ack.Error) res := <-handshakeResCh - require.EqualError(t, res.err, ErrIncompatibleVersion.Error()) + assert.Equal(t, res.err, ErrIncompatibleVersion) }) t.Run("write cred instead ack", func(t *testing.T) { c1, c2 := newConnPair(t) diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index 2167b20c..a98e332b 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -16,19 +16,6 @@ import ( "net" ) -type HandshakeError struct { - remoteAddr string - err error -} - -func (he HandshakeError) RemoteAddr() string { - return he.remoteAddr -} - -func (he HandshakeError) Error() string { - return he.err.Error() -} - const CName = "common.net.secure" var log = logger.NewNamed(CName) @@ -91,18 +78,14 @@ func (s *secureService) Name() (name string) { func (s *secureService) SecureInbound(ctx context.Context, conn net.Conn) (cctx context.Context, sc sec.SecureConn, err error) { sc, err = s.p2pTr.SecureInbound(ctx, conn, "") if err != nil { - return nil, nil, HandshakeError{ - remoteAddr: conn.RemoteAddr().String(), - err: err, + return nil, nil, handshake.HandshakeError{ + Err: err, } } identity, err := handshake.IncomingHandshake(ctx, sc, s.inboundChecker) if err != nil { - return nil, nil, HandshakeError{ - remoteAddr: conn.RemoteAddr().String(), - err: err, - } + return nil, nil, err } cctx = context.Background() cctx = peer.CtxWithPeerId(cctx, sc.RemotePeer().String()) @@ -113,7 +96,7 @@ func (s *secureService) SecureInbound(ctx context.Context, conn net.Conn) (cctx func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (sec.SecureConn, error) { sc, err := s.p2pTr.SecureOutbound(ctx, conn, "") if err != nil { - return nil, HandshakeError{err: err, remoteAddr: conn.RemoteAddr().String()} + return nil, handshake.HandshakeError{Err: err} } peerId := sc.RemotePeer().String() confTypes := s.nodeconf.NodeTypes(peerId) @@ -126,7 +109,7 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (sec. // ignore identity for outgoing connection because we don't need it at this moment _, err = handshake.OutgoingHandshake(ctx, sc, checker) if err != nil { - return nil, HandshakeError{err: err, remoteAddr: conn.RemoteAddr().String()} + return nil, err } return sc, nil } diff --git a/net/secureservice/secureservice_test.go b/net/secureservice/secureservice_test.go index a70a883f..c60effab 100644 --- a/net/secureservice/secureservice_test.go +++ b/net/secureservice/secureservice_test.go @@ -73,9 +73,9 @@ func TestHandshakeIncompatibleVersion(t *testing.T) { fxC := newFixture(t, nc, nc.GetAccountService(1), 1) defer fxC.Finish(t) _, err := fxC.SecureOutbound(ctx, cc) - require.EqualError(t, err, handshake.ErrIncompatibleVersion.Error()) + require.Equal(t, handshake.ErrIncompatibleVersion, err) res := <-resCh - require.EqualError(t, res.err, handshake.ErrIncompatibleVersion.Error()) + require.Equal(t, handshake.ErrIncompatibleVersion, res.err) } func newFixture(t *testing.T, nc *testnodeconf.Config, acc accountservice.Service, protoVersion uint32) *fixture {