From 54bff9012e2e259e76a3ba066f62179406d37a4e Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 8 Sep 2022 12:05:30 +0300 Subject: [PATCH] space service periodic sync --- cmd/node/node.go | 22 ++++---- config/config.go | 2 +- config/space.go | 4 +- etc/config.yml | 5 +- ...K2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1.yml | 25 ++++++++++ ...zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw.yml | 25 ++++++++++ pkg/ocache/ocache.go | 4 +- service/net/pool/pool.go | 3 +- service/net/rpc/server/drpcserver.go | 5 +- service/space/remotediff/remotediff.go | 6 +-- service/space/rpc.go | 18 +++++++ service/space/service.go | 16 +++--- service/space/space.go | 50 +++++++++++-------- 13 files changed, 129 insertions(+), 56 deletions(-) create mode 100755 etc/configs/12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1.yml create mode 100755 etc/configs/12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw.yml create mode 100644 service/space/rpc.go diff --git a/cmd/node/node.go b/cmd/node/node.go index d6ded226..c73a164c 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -8,17 +8,13 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/api" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/node" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space" "go.uber.org/zap" "net/http" _ "net/http/pprof" @@ -94,14 +90,16 @@ func main() { func Bootstrap(a *app.App) { a.Register(account.New()). Register(node.New()). + Register(configuration.New()). Register(secure.New()). - Register(server.New()). Register(dialer.New()). Register(pool.New()). - Register(configuration.New()). - Register(document.New()). - Register(message.New()). - Register(requesthandler.New()). - Register(treecache.New()). - Register(api.New()) + Register(space.New()). + Register(server.New()) + + //Register(document.New()). + //Register(message.New()). + //Register(requesthandler.New()). + //Register(treecache.New()). + //Register(api.New()) } diff --git a/config/config.go b/config/config.go index 4c1e8aa9..4f5ed612 100644 --- a/config/config.go +++ b/config/config.go @@ -33,7 +33,7 @@ type Config struct { } func (c *Config) Init(ctx context.Context, a *app.App) (err error) { - logger.NewNamed("config").Info(fmt.Sprint(*c)) + logger.NewNamed("config").Info(fmt.Sprint(c.Space)) return } diff --git a/config/space.go b/config/space.go index 4de7cde0..5b603715 100644 --- a/config/space.go +++ b/config/space.go @@ -1,6 +1,6 @@ package config type Space struct { - GCTTL int `json:"gcTTL"` - SyncPeriod int `json:"syncPeriod"` + GCTTL int `yaml:"gcTTL"` + SyncPeriod int `yaml:"syncPeriod"` } diff --git a/etc/config.yml b/etc/config.yml index c9b15df0..09b13d54 100644 --- a/etc/config.yml +++ b/etc/config.yml @@ -10,7 +10,10 @@ account: signingKey: 3id6ddLcoNoe9rDgGM88ET8T6TnvHm5GFqFdN6kBzn7Q8d6VUGgjeT59CNWFiaofdeRnHBvX2A5ZacMXvfwaYEFuCbug encryptionKey: JgG4CcCbae1qEpe7mKpBzsHjZhXUmDSNVNX2B1gxFZsJyMX4V6kBQUott9zRWyeXaW1ZmpzuxDXnwSQpAnNurhXyGa9iQaAPqzY9A9VWBPD33Yy1eW7TRuVemzToh8jJQKQKnZNbF8ucTWV9qahusKzyvN8uyhrqoW2tAPfA9S3E3ognCuqbLSW6yjE2rBKayvyS1BVwzjSd6FZK4DDyjfU3pbEVjut3wytGEAn9af6sNMmyCnf2MX5vLovWs9rU8av61wD4z7HTsXyGFx4K75N4Go249Hpe9SKAT6HxhRc3yvj63krPLiQV5yMuH2UeMUXBDekUQyNmBEdn9wrur7mLqB67Bc6tcc2PP8XApBCdWJHvHjN4FktSpaG5vbCqoZbLD1oCbk36q2x9s6XM8pydVqD1J9P3nTbfgMb5pJCTFjNtgKeuKv6wjfJeA9jF1VhcJQisfsahgv9MvZ9M8FJpZTq1zKUhYDCRnZxUkraoMS5yNNVdDzaUckKEDthqik7BMWCWT79vq7uVgMwEvGwGi76gtoMg1159bbPMLZ4bdPVfhH2S9QjPrzQfwZSrzB2YeVPjWpaXDeLDity5H8n1NK2oniAQR6gE71n81neSptsuhV6o6QpQ89AU8y57XmEsou4VEryn8vUxBHhULLxrLNUouxyWamCeFiDjk5cSN6koQsf9BYKSNTPFTrwjTKForDokMhcPdMtFktKwjv7u9UEGcY4MKvNzZZkc77gHiP8bqVtdNNoLpTFUC5SZ9i7bKdHvK12HpSy7yzzPeMXJ9UwhLxkok1g81ngTbN1yxRhvYXyHZFtguCR9kvGojDjka91MTBtk551qDw9eCn2xZT9U8jqzBCjdpvSg3mRWKMPnYAGB7m7u1ye165wyGFvzcHAx3vtXjxAqLUeKYZCjv2m6V9D2Y4qH1TQNddWqH14T1JVMis971UCH9Ddpj6a3387oUnufD1P6HZN2ieJCvptrmbGVvxJYYSvmVf1dkwbtqurDRNWD7TJ7gf6iqSP549C9bxP4GpLt3ygjHmMtcuUzstBuztvunJUnQhfnJxqU6LjRdsFzm53wGWgXNxab7ZvQcPyLwsevn1b98FGPnVpS5iY4LjmqW4ugrC6HgrbsjrXiKzR1yZKhLQkCbLzPoaHb8iB5iBnCr7d4yf5CtfpFRqgoqMFdK5LNZYmDX4HzUKN6A7wC3gGiSRFTLcgGZeSMkB5Pa61CZBU7WCQgFxykycE9HRA7PiQa496GWDCV15teToCpFRsAa6jDmR1MGXPeLRqQgve49VXnQN5FL7c1VuEv5SWjeTuCnMB47DJKBaP7eKJNKgLwETALzSCMF3nRiRgeb15kfoS4BbrJ5yupjrvwmbmvNg1AYFFS5sYNWft7K8v87wQvBakRtGP71Kp8NX77XFtu6xdB7sR6jpfC6qJPyB9akWNXgCrWy9kE4ih42gwAZdUugNZ9YtEsgRM3pwb6qJhkAPyEJtrxrja859PCAgqPSQiPQN33PaMkgQ6HJknu8CrjKRiXAycZ16KLUkHV64TNhEjPTcX1a7rqpD131AYMWX8d7CCdc9Ys7RUb6BwguuNSh8rJK3x4AkMDSUsaE8ynKvpC7RXZpJ9Nxfhd apiServer: - port: "8084" + port: "8080" +space: + gcTTL: 60 + syncPeriod: 10 nodes: - peerId: 12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1 address: 127.0.0.1:4430 diff --git a/etc/configs/12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1.yml b/etc/configs/12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1.yml new file mode 100755 index 00000000..dcea6558 --- /dev/null +++ b/etc/configs/12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1.yml @@ -0,0 +1,25 @@ +anytype: + swarmKey: /key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec +grpcServer: + listenAddrs: + - 127.0.0.1:4430 + - 127.0.0.1:4431 + tls: false +account: + peerId: 12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1 + signingKey: 3id6ddLcoNoe9rDgGM88ET8T6TnvHm5GFqFdN6kBzn7Q8d6VUGgjeT59CNWFiaofdeRnHBvX2A5ZacMXvfwaYEFuCbug + encryptionKey: JgG4CcCbae1qEpe7mKpBzsHjZhXUmDSNVNX2B1gxFZsJyMX4V6kBQUott9zRWyeXaW1ZmpzuxDXnwSQpAnNurhXyGa9iQaAPqzY9A9VWBPD33Yy1eW7TRuVemzToh8jJQKQKnZNbF8ucTWV9qahusKzyvN8uyhrqoW2tAPfA9S3E3ognCuqbLSW6yjE2rBKayvyS1BVwzjSd6FZK4DDyjfU3pbEVjut3wytGEAn9af6sNMmyCnf2MX5vLovWs9rU8av61wD4z7HTsXyGFx4K75N4Go249Hpe9SKAT6HxhRc3yvj63krPLiQV5yMuH2UeMUXBDekUQyNmBEdn9wrur7mLqB67Bc6tcc2PP8XApBCdWJHvHjN4FktSpaG5vbCqoZbLD1oCbk36q2x9s6XM8pydVqD1J9P3nTbfgMb5pJCTFjNtgKeuKv6wjfJeA9jF1VhcJQisfsahgv9MvZ9M8FJpZTq1zKUhYDCRnZxUkraoMS5yNNVdDzaUckKEDthqik7BMWCWT79vq7uVgMwEvGwGi76gtoMg1159bbPMLZ4bdPVfhH2S9QjPrzQfwZSrzB2YeVPjWpaXDeLDity5H8n1NK2oniAQR6gE71n81neSptsuhV6o6QpQ89AU8y57XmEsou4VEryn8vUxBHhULLxrLNUouxyWamCeFiDjk5cSN6koQsf9BYKSNTPFTrwjTKForDokMhcPdMtFktKwjv7u9UEGcY4MKvNzZZkc77gHiP8bqVtdNNoLpTFUC5SZ9i7bKdHvK12HpSy7yzzPeMXJ9UwhLxkok1g81ngTbN1yxRhvYXyHZFtguCR9kvGojDjka91MTBtk551qDw9eCn2xZT9U8jqzBCjdpvSg3mRWKMPnYAGB7m7u1ye165wyGFvzcHAx3vtXjxAqLUeKYZCjv2m6V9D2Y4qH1TQNddWqH14T1JVMis971UCH9Ddpj6a3387oUnufD1P6HZN2ieJCvptrmbGVvxJYYSvmVf1dkwbtqurDRNWD7TJ7gf6iqSP549C9bxP4GpLt3ygjHmMtcuUzstBuztvunJUnQhfnJxqU6LjRdsFzm53wGWgXNxab7ZvQcPyLwsevn1b98FGPnVpS5iY4LjmqW4ugrC6HgrbsjrXiKzR1yZKhLQkCbLzPoaHb8iB5iBnCr7d4yf5CtfpFRqgoqMFdK5LNZYmDX4HzUKN6A7wC3gGiSRFTLcgGZeSMkB5Pa61CZBU7WCQgFxykycE9HRA7PiQa496GWDCV15teToCpFRsAa6jDmR1MGXPeLRqQgve49VXnQN5FL7c1VuEv5SWjeTuCnMB47DJKBaP7eKJNKgLwETALzSCMF3nRiRgeb15kfoS4BbrJ5yupjrvwmbmvNg1AYFFS5sYNWft7K8v87wQvBakRtGP71Kp8NX77XFtu6xdB7sR6jpfC6qJPyB9akWNXgCrWy9kE4ih42gwAZdUugNZ9YtEsgRM3pwb6qJhkAPyEJtrxrja859PCAgqPSQiPQN33PaMkgQ6HJknu8CrjKRiXAycZ16KLUkHV64TNhEjPTcX1a7rqpD131AYMWX8d7CCdc9Ys7RUb6BwguuNSh8rJK3x4AkMDSUsaE8ynKvpC7RXZpJ9Nxfhd +apiServer: + port: "8080" +space: + gcTTL: 60 + syncPeriod: 10 +nodes: + - peerId: 12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1 + address: 127.0.0.1:4430 + signingKey: 3id6ddLcoNoe9rDgGM88ET8T6TnvHm5GFqFdN6kBzn7Q8d6VUGgjeT59CNWFiaofdeRnHBvX2A5ZacMXvfwaYEFuCbug + encryptionKey: JgG4CcCbae1qEpe7mKpBzsHjZhXUmDSNVNX2B1gxFZsJyMX4V6kBQUott9zRWyeXaW1ZmpzuxDXnwSQpAnNurhXyGa9iQaAPqzY9A9VWBPD33Yy1eW7TRuVemzToh8jJQKQKnZNbF8ucTWV9qahusKzyvN8uyhrqoW2tAPfA9S3E3ognCuqbLSW6yjE2rBKayvyS1BVwzjSd6FZK4DDyjfU3pbEVjut3wytGEAn9af6sNMmyCnf2MX5vLovWs9rU8av61wD4z7HTsXyGFx4K75N4Go249Hpe9SKAT6HxhRc3yvj63krPLiQV5yMuH2UeMUXBDekUQyNmBEdn9wrur7mLqB67Bc6tcc2PP8XApBCdWJHvHjN4FktSpaG5vbCqoZbLD1oCbk36q2x9s6XM8pydVqD1J9P3nTbfgMb5pJCTFjNtgKeuKv6wjfJeA9jF1VhcJQisfsahgv9MvZ9M8FJpZTq1zKUhYDCRnZxUkraoMS5yNNVdDzaUckKEDthqik7BMWCWT79vq7uVgMwEvGwGi76gtoMg1159bbPMLZ4bdPVfhH2S9QjPrzQfwZSrzB2YeVPjWpaXDeLDity5H8n1NK2oniAQR6gE71n81neSptsuhV6o6QpQ89AU8y57XmEsou4VEryn8vUxBHhULLxrLNUouxyWamCeFiDjk5cSN6koQsf9BYKSNTPFTrwjTKForDokMhcPdMtFktKwjv7u9UEGcY4MKvNzZZkc77gHiP8bqVtdNNoLpTFUC5SZ9i7bKdHvK12HpSy7yzzPeMXJ9UwhLxkok1g81ngTbN1yxRhvYXyHZFtguCR9kvGojDjka91MTBtk551qDw9eCn2xZT9U8jqzBCjdpvSg3mRWKMPnYAGB7m7u1ye165wyGFvzcHAx3vtXjxAqLUeKYZCjv2m6V9D2Y4qH1TQNddWqH14T1JVMis971UCH9Ddpj6a3387oUnufD1P6HZN2ieJCvptrmbGVvxJYYSvmVf1dkwbtqurDRNWD7TJ7gf6iqSP549C9bxP4GpLt3ygjHmMtcuUzstBuztvunJUnQhfnJxqU6LjRdsFzm53wGWgXNxab7ZvQcPyLwsevn1b98FGPnVpS5iY4LjmqW4ugrC6HgrbsjrXiKzR1yZKhLQkCbLzPoaHb8iB5iBnCr7d4yf5CtfpFRqgoqMFdK5LNZYmDX4HzUKN6A7wC3gGiSRFTLcgGZeSMkB5Pa61CZBU7WCQgFxykycE9HRA7PiQa496GWDCV15teToCpFRsAa6jDmR1MGXPeLRqQgve49VXnQN5FL7c1VuEv5SWjeTuCnMB47DJKBaP7eKJNKgLwETALzSCMF3nRiRgeb15kfoS4BbrJ5yupjrvwmbmvNg1AYFFS5sYNWft7K8v87wQvBakRtGP71Kp8NX77XFtu6xdB7sR6jpfC6qJPyB9akWNXgCrWy9kE4ih42gwAZdUugNZ9YtEsgRM3pwb6qJhkAPyEJtrxrja859PCAgqPSQiPQN33PaMkgQ6HJknu8CrjKRiXAycZ16KLUkHV64TNhEjPTcX1a7rqpD131AYMWX8d7CCdc9Ys7RUb6BwguuNSh8rJK3x4AkMDSUsaE8ynKvpC7RXZpJ9Nxfhd + - peerId: 12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw + address: 127.0.0.1:4432 + signingKey: 3iiLPj6wMUQpPwTBNZcUgkbXub1jumg4AEV9LfMyFHZVc84GLyAjVbVvH6EAGhcNrxRxL82aW4BimhDZCpLsRCqx5vwj + encryptionKey: JgG4CcCbae1qEpe7mKXzp7m5hNc56SSyZd9DwUaEStKJrq7RToAC2Vgd3i6hKRwa58zCWeN6Wjc3o6qrdKPEPRvcyEPysamajVo5mdQiUgWAmr97pGEsyjuRjQoC2GY2LvLiEQxEgwFgJxKGMHMiaWMtDfxCDUaDEm4bu5RdMhqRZekAWho6c3WoEeruSr14iX1TrocFNfBkBY7CjEw8kcywXCTNgtvhb2Qiwgj5AxEF4wyw4bzaNA9ctXb1hoHPFVMu6C51pkFY7jUD9zwyH3ukgnAewkGAcPNbKmaTAtMosKRVaAN97mAwXh2VRt1hWmRvVk7r76EjnVKhD4vbsKZc56RVcHTVWRVdhU7FGyPsiE5rSQAz1JQGYzxnZpX7EG77CyrmUGyfueVfRHhwY2oq8A4uQCRaQxSaJHYLowjXSxh8DQ2V6MTqyzti32C27utBYdHzLVCJSGkmdzGwrFcHqsq7nLDxmvJVErPvyReixEe8kFmqopJ3e6LLm8WdYw9K6JYBjXnEfwPzm7Von9sf3dcaGDUHYfttMyeke7fAXJkvPRje69hYVyzdQGAauuojzGkkvQWCSMK1KCMNMznRaPDCNvofrQhYrub24WhmwpKhorufdfW8Cb4T6reBDCtaWVsbuinjtL6F6Sui5aYHJFLJ6e4pPewr1P4EuZYRbMBZwN5KvDLhTGLBuBnaTqUUdF6bj2U22NoRYMogiHiftqKqiexKNDXX1Zg9RQEvxgjuVo6SBW42mVEA8agrLhruRqCmiduJxVrfqLNGeYXHXrcmMEgW7uosJbPXvTcfRvdFWS1ov7oSALvj6vhDQ28Yi9D2ETNdNsfVWAFQuwvPpW7CHQGXTitprVbqH8JYxNZuGygcLmr5efbB22Vzu4ntd1HoraQpG12qeDEUA7tXYUpoYyuSdWwKPjSAMtaQcCSfVrhKQHQuKJargrVrez8vjWuwLfvSucV7ZHe7gjqvYgULdE1ubRCRSd7DuLjEN2Vd6obzV2c3MRet7ZSf4Sp88WM5AuTyW7BjArBc4S3gUQ8rYaiZ8Tu7NCxkEzbFwWRaemZkwfvcsX3XxqjyF37tFSGkEqE5kuBvpZW72675LkDffj7kH1zA8yE6dVujJjWsNYVFJWndUtz5Vy2KCdZAbBgq19q4AtsxWPodU2N3yZXzFAFAzTrxS6V4P7Scpdau1avgRvHLcBQPunA37xaYMy8YMifJwtmRY25mnAQwZAk3eANk7tXwZd58SDnciLNvARJvwKzTQBXcshkwyy52SX8XmXDJsPnRLaHmiYBJ63Yzr5XpZuuAtxb9qrWG2NHCNxfomHokWacV1hjZPPd6ZxT1FuRozB6Qt2NLcyqY7bnTcQJb1jPUaTAGXXCR8WVmmmYo2fDQe8CdBmgyPvbzNTEJUyScBz4RdycB5PZap4SurJCWtHbuMyQbQUB6jJgURDstfXS5Akfe4oruNq9rnYcNtnsDJPtrhXHBqzDizmf1BDxR5FB2RCxzCgeAfg8WQ1Ug9PVAGTzob6ZqCrGXzWXEUniZnf1vjr7QhGKBYXEX9SWDoSMUpP4FreVDTnx15ijRZTV3p8xG5fE9e36TnugRVvTyq7XzmyPBjW2r66f1bior diff --git a/etc/configs/12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw.yml b/etc/configs/12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw.yml new file mode 100755 index 00000000..eb88a19b --- /dev/null +++ b/etc/configs/12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw.yml @@ -0,0 +1,25 @@ +anytype: + swarmKey: /key/swarm/psk/1.0.0/base16/209992e611c27d5dce8fbd2e7389f6b51da9bee980992ef60739460b536139ec +grpcServer: + listenAddrs: + - 127.0.0.1:4432 + - 127.0.0.1:4433 + tls: false +account: + peerId: 12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw + signingKey: 3iiLPj6wMUQpPwTBNZcUgkbXub1jumg4AEV9LfMyFHZVc84GLyAjVbVvH6EAGhcNrxRxL82aW4BimhDZCpLsRCqx5vwj + encryptionKey: JgG4CcCbae1qEpe7mKXzp7m5hNc56SSyZd9DwUaEStKJrq7RToAC2Vgd3i6hKRwa58zCWeN6Wjc3o6qrdKPEPRvcyEPysamajVo5mdQiUgWAmr97pGEsyjuRjQoC2GY2LvLiEQxEgwFgJxKGMHMiaWMtDfxCDUaDEm4bu5RdMhqRZekAWho6c3WoEeruSr14iX1TrocFNfBkBY7CjEw8kcywXCTNgtvhb2Qiwgj5AxEF4wyw4bzaNA9ctXb1hoHPFVMu6C51pkFY7jUD9zwyH3ukgnAewkGAcPNbKmaTAtMosKRVaAN97mAwXh2VRt1hWmRvVk7r76EjnVKhD4vbsKZc56RVcHTVWRVdhU7FGyPsiE5rSQAz1JQGYzxnZpX7EG77CyrmUGyfueVfRHhwY2oq8A4uQCRaQxSaJHYLowjXSxh8DQ2V6MTqyzti32C27utBYdHzLVCJSGkmdzGwrFcHqsq7nLDxmvJVErPvyReixEe8kFmqopJ3e6LLm8WdYw9K6JYBjXnEfwPzm7Von9sf3dcaGDUHYfttMyeke7fAXJkvPRje69hYVyzdQGAauuojzGkkvQWCSMK1KCMNMznRaPDCNvofrQhYrub24WhmwpKhorufdfW8Cb4T6reBDCtaWVsbuinjtL6F6Sui5aYHJFLJ6e4pPewr1P4EuZYRbMBZwN5KvDLhTGLBuBnaTqUUdF6bj2U22NoRYMogiHiftqKqiexKNDXX1Zg9RQEvxgjuVo6SBW42mVEA8agrLhruRqCmiduJxVrfqLNGeYXHXrcmMEgW7uosJbPXvTcfRvdFWS1ov7oSALvj6vhDQ28Yi9D2ETNdNsfVWAFQuwvPpW7CHQGXTitprVbqH8JYxNZuGygcLmr5efbB22Vzu4ntd1HoraQpG12qeDEUA7tXYUpoYyuSdWwKPjSAMtaQcCSfVrhKQHQuKJargrVrez8vjWuwLfvSucV7ZHe7gjqvYgULdE1ubRCRSd7DuLjEN2Vd6obzV2c3MRet7ZSf4Sp88WM5AuTyW7BjArBc4S3gUQ8rYaiZ8Tu7NCxkEzbFwWRaemZkwfvcsX3XxqjyF37tFSGkEqE5kuBvpZW72675LkDffj7kH1zA8yE6dVujJjWsNYVFJWndUtz5Vy2KCdZAbBgq19q4AtsxWPodU2N3yZXzFAFAzTrxS6V4P7Scpdau1avgRvHLcBQPunA37xaYMy8YMifJwtmRY25mnAQwZAk3eANk7tXwZd58SDnciLNvARJvwKzTQBXcshkwyy52SX8XmXDJsPnRLaHmiYBJ63Yzr5XpZuuAtxb9qrWG2NHCNxfomHokWacV1hjZPPd6ZxT1FuRozB6Qt2NLcyqY7bnTcQJb1jPUaTAGXXCR8WVmmmYo2fDQe8CdBmgyPvbzNTEJUyScBz4RdycB5PZap4SurJCWtHbuMyQbQUB6jJgURDstfXS5Akfe4oruNq9rnYcNtnsDJPtrhXHBqzDizmf1BDxR5FB2RCxzCgeAfg8WQ1Ug9PVAGTzob6ZqCrGXzWXEUniZnf1vjr7QhGKBYXEX9SWDoSMUpP4FreVDTnx15ijRZTV3p8xG5fE9e36TnugRVvTyq7XzmyPBjW2r66f1bior +apiServer: + port: "8081" +space: + gcTTL: 60 + syncPeriod: 10 +nodes: + - peerId: 12D3KooWMHuhZgK2skkLrvL51QQTXaXQKYy2QqfvPNBFnzR2ubA1 + address: 127.0.0.1:4430 + signingKey: 3id6ddLcoNoe9rDgGM88ET8T6TnvHm5GFqFdN6kBzn7Q8d6VUGgjeT59CNWFiaofdeRnHBvX2A5ZacMXvfwaYEFuCbug + encryptionKey: JgG4CcCbae1qEpe7mKpBzsHjZhXUmDSNVNX2B1gxFZsJyMX4V6kBQUott9zRWyeXaW1ZmpzuxDXnwSQpAnNurhXyGa9iQaAPqzY9A9VWBPD33Yy1eW7TRuVemzToh8jJQKQKnZNbF8ucTWV9qahusKzyvN8uyhrqoW2tAPfA9S3E3ognCuqbLSW6yjE2rBKayvyS1BVwzjSd6FZK4DDyjfU3pbEVjut3wytGEAn9af6sNMmyCnf2MX5vLovWs9rU8av61wD4z7HTsXyGFx4K75N4Go249Hpe9SKAT6HxhRc3yvj63krPLiQV5yMuH2UeMUXBDekUQyNmBEdn9wrur7mLqB67Bc6tcc2PP8XApBCdWJHvHjN4FktSpaG5vbCqoZbLD1oCbk36q2x9s6XM8pydVqD1J9P3nTbfgMb5pJCTFjNtgKeuKv6wjfJeA9jF1VhcJQisfsahgv9MvZ9M8FJpZTq1zKUhYDCRnZxUkraoMS5yNNVdDzaUckKEDthqik7BMWCWT79vq7uVgMwEvGwGi76gtoMg1159bbPMLZ4bdPVfhH2S9QjPrzQfwZSrzB2YeVPjWpaXDeLDity5H8n1NK2oniAQR6gE71n81neSptsuhV6o6QpQ89AU8y57XmEsou4VEryn8vUxBHhULLxrLNUouxyWamCeFiDjk5cSN6koQsf9BYKSNTPFTrwjTKForDokMhcPdMtFktKwjv7u9UEGcY4MKvNzZZkc77gHiP8bqVtdNNoLpTFUC5SZ9i7bKdHvK12HpSy7yzzPeMXJ9UwhLxkok1g81ngTbN1yxRhvYXyHZFtguCR9kvGojDjka91MTBtk551qDw9eCn2xZT9U8jqzBCjdpvSg3mRWKMPnYAGB7m7u1ye165wyGFvzcHAx3vtXjxAqLUeKYZCjv2m6V9D2Y4qH1TQNddWqH14T1JVMis971UCH9Ddpj6a3387oUnufD1P6HZN2ieJCvptrmbGVvxJYYSvmVf1dkwbtqurDRNWD7TJ7gf6iqSP549C9bxP4GpLt3ygjHmMtcuUzstBuztvunJUnQhfnJxqU6LjRdsFzm53wGWgXNxab7ZvQcPyLwsevn1b98FGPnVpS5iY4LjmqW4ugrC6HgrbsjrXiKzR1yZKhLQkCbLzPoaHb8iB5iBnCr7d4yf5CtfpFRqgoqMFdK5LNZYmDX4HzUKN6A7wC3gGiSRFTLcgGZeSMkB5Pa61CZBU7WCQgFxykycE9HRA7PiQa496GWDCV15teToCpFRsAa6jDmR1MGXPeLRqQgve49VXnQN5FL7c1VuEv5SWjeTuCnMB47DJKBaP7eKJNKgLwETALzSCMF3nRiRgeb15kfoS4BbrJ5yupjrvwmbmvNg1AYFFS5sYNWft7K8v87wQvBakRtGP71Kp8NX77XFtu6xdB7sR6jpfC6qJPyB9akWNXgCrWy9kE4ih42gwAZdUugNZ9YtEsgRM3pwb6qJhkAPyEJtrxrja859PCAgqPSQiPQN33PaMkgQ6HJknu8CrjKRiXAycZ16KLUkHV64TNhEjPTcX1a7rqpD131AYMWX8d7CCdc9Ys7RUb6BwguuNSh8rJK3x4AkMDSUsaE8ynKvpC7RXZpJ9Nxfhd + - peerId: 12D3KooWT3c7Y5zvWhhjSxd5Ve3GKZi6WCsG6JHxcxgXixRFdBbw + address: 127.0.0.1:4432 + signingKey: 3iiLPj6wMUQpPwTBNZcUgkbXub1jumg4AEV9LfMyFHZVc84GLyAjVbVvH6EAGhcNrxRxL82aW4BimhDZCpLsRCqx5vwj + encryptionKey: JgG4CcCbae1qEpe7mKXzp7m5hNc56SSyZd9DwUaEStKJrq7RToAC2Vgd3i6hKRwa58zCWeN6Wjc3o6qrdKPEPRvcyEPysamajVo5mdQiUgWAmr97pGEsyjuRjQoC2GY2LvLiEQxEgwFgJxKGMHMiaWMtDfxCDUaDEm4bu5RdMhqRZekAWho6c3WoEeruSr14iX1TrocFNfBkBY7CjEw8kcywXCTNgtvhb2Qiwgj5AxEF4wyw4bzaNA9ctXb1hoHPFVMu6C51pkFY7jUD9zwyH3ukgnAewkGAcPNbKmaTAtMosKRVaAN97mAwXh2VRt1hWmRvVk7r76EjnVKhD4vbsKZc56RVcHTVWRVdhU7FGyPsiE5rSQAz1JQGYzxnZpX7EG77CyrmUGyfueVfRHhwY2oq8A4uQCRaQxSaJHYLowjXSxh8DQ2V6MTqyzti32C27utBYdHzLVCJSGkmdzGwrFcHqsq7nLDxmvJVErPvyReixEe8kFmqopJ3e6LLm8WdYw9K6JYBjXnEfwPzm7Von9sf3dcaGDUHYfttMyeke7fAXJkvPRje69hYVyzdQGAauuojzGkkvQWCSMK1KCMNMznRaPDCNvofrQhYrub24WhmwpKhorufdfW8Cb4T6reBDCtaWVsbuinjtL6F6Sui5aYHJFLJ6e4pPewr1P4EuZYRbMBZwN5KvDLhTGLBuBnaTqUUdF6bj2U22NoRYMogiHiftqKqiexKNDXX1Zg9RQEvxgjuVo6SBW42mVEA8agrLhruRqCmiduJxVrfqLNGeYXHXrcmMEgW7uosJbPXvTcfRvdFWS1ov7oSALvj6vhDQ28Yi9D2ETNdNsfVWAFQuwvPpW7CHQGXTitprVbqH8JYxNZuGygcLmr5efbB22Vzu4ntd1HoraQpG12qeDEUA7tXYUpoYyuSdWwKPjSAMtaQcCSfVrhKQHQuKJargrVrez8vjWuwLfvSucV7ZHe7gjqvYgULdE1ubRCRSd7DuLjEN2Vd6obzV2c3MRet7ZSf4Sp88WM5AuTyW7BjArBc4S3gUQ8rYaiZ8Tu7NCxkEzbFwWRaemZkwfvcsX3XxqjyF37tFSGkEqE5kuBvpZW72675LkDffj7kH1zA8yE6dVujJjWsNYVFJWndUtz5Vy2KCdZAbBgq19q4AtsxWPodU2N3yZXzFAFAzTrxS6V4P7Scpdau1avgRvHLcBQPunA37xaYMy8YMifJwtmRY25mnAQwZAk3eANk7tXwZd58SDnciLNvARJvwKzTQBXcshkwyy52SX8XmXDJsPnRLaHmiYBJ63Yzr5XpZuuAtxb9qrWG2NHCNxfomHokWacV1hjZPPd6ZxT1FuRozB6Qt2NLcyqY7bnTcQJb1jPUaTAGXXCR8WVmmmYo2fDQe8CdBmgyPvbzNTEJUyScBz4RdycB5PZap4SurJCWtHbuMyQbQUB6jJgURDstfXS5Akfe4oruNq9rnYcNtnsDJPtrhXHBqzDizmf1BDxR5FB2RCxzCgeAfg8WQ1Ug9PVAGTzob6ZqCrGXzWXEUniZnf1vjr7QhGKBYXEX9SWDoSMUpP4FreVDTnx15ijRZTV3p8xG5fE9e36TnugRVvTyq7XzmyPBjW2r66f1bior diff --git a/pkg/ocache/ocache.go b/pkg/ocache/ocache.go index 5e10508a..4f0cf0ba 100644 --- a/pkg/ocache/ocache.go +++ b/pkg/ocache/ocache.go @@ -27,9 +27,9 @@ type LoadFunc func(ctx context.Context, id string) (value Object, err error) type Option func(*oCache) -var WithLogServiceName = func(name string) Option { +var WithLogger = func(l *zap.SugaredLogger) Option { return func(cache *oCache) { - cache.log = cache.log.With("service_name", name) + cache.log = l } } diff --git a/service/net/pool/pool.go b/service/net/pool/pool.go index cbd0973a..6b7cfa67 100644 --- a/service/net/pool/pool.go +++ b/service/net/pool/pool.go @@ -9,6 +9,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" "math/rand" + "time" ) const ( @@ -43,7 +44,7 @@ func (p *pool) Init(ctx context.Context, a *app.App) (err error) { dialer := a.MustComponent(dialer.CName).(dialer.Dialer) p.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) { return dialer.Dial(ctx, id) - }) + }, ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(time.Minute), ocache.WithTTL(time.Minute*5)) return nil } diff --git a/service/net/rpc/server/drpcserver.go b/service/net/rpc/server/drpcserver.go index 1af9af7b..975c270f 100644 --- a/service/net/rpc/server/drpcserver.go +++ b/service/net/rpc/server/drpcserver.go @@ -7,12 +7,13 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure" + "github.com/zeebo/errs" "go.uber.org/zap" + "io" "net" "storj.io/drpc" "storj.io/drpc/drpcmux" "storj.io/drpc/drpcserver" - "strings" "time" ) @@ -103,7 +104,7 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) { l := log.With(zap.String("remoteAddr", conn.RemoteAddr().String())).With(zap.String("localAddr", conn.LocalAddr().String())) l.Debug("connection opened") if err := s.drpcServer.ServeOne(ctx, conn); err != nil { - if err == context.Canceled || strings.Contains(err.Error(), "EOF") { + if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) { l.Debug("connection closed") } else { l.Warn("serve connection error", zap.Error(err)) diff --git a/service/space/remotediff/remotediff.go b/service/space/remotediff/remotediff.go index df10aec6..2ba6769a 100644 --- a/service/space/remotediff/remotediff.go +++ b/service/space/remotediff/remotediff.go @@ -74,7 +74,7 @@ func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesync.HeadS return } - var rangeResp = &spacesync.HeadSyncResponse{ + resp = &spacesync.HeadSyncResponse{ Results: make([]*spacesync.HeadSyncResult, 0, len(res)), } for _, rangeRes := range res { @@ -88,11 +88,11 @@ func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesync.HeadS }) } } - rangeResp.Results = append(rangeResp.Results, &spacesync.HeadSyncResult{ + resp.Results = append(resp.Results, &spacesync.HeadSyncResult{ Hash: rangeRes.Hash, Elements: elements, Count: uint32(rangeRes.Count), }) } - return rangeResp, nil + return } diff --git a/service/space/rpc.go b/service/space/rpc.go new file mode 100644 index 00000000..db056079 --- /dev/null +++ b/service/space/rpc.go @@ -0,0 +1,18 @@ +package space + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" +) + +type rpcServer struct { + s *service +} + +func (r rpcServer) HeadSync(ctx context.Context, request *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { + sp, err := r.s.get(ctx, request.SpaceId) + if err != nil { + return nil, err + } + return sp.HeadSync(ctx, request) +} diff --git a/service/space/service.go b/service/space/service.go index 183b5425..ae9be5a4 100644 --- a/service/space/service.go +++ b/service/space/service.go @@ -2,7 +2,6 @@ package space import ( "context" - "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" @@ -11,7 +10,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" - "storj.io/drpc/drpcerr" "time" ) @@ -24,7 +22,6 @@ func New() Service { } type Service interface { - spacesync.DRPCSpaceServer app.ComponentRunnable } @@ -40,9 +37,8 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) { s.pool = a.MustComponent(pool.CName).(pool.Pool) s.confService = a.MustComponent(configuration.CName).(configuration.Service) ttlSec := time.Second * time.Duration(s.conf.GCTTL) - s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute)) - spacesync.DRPCRegisterSpace(a.MustComponent(server.CName).(server.DRPCServer), s) - + s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute), ocache.WithLogger(log.Sugar())) + spacesync.DRPCRegisterSpace(a.MustComponent(server.CName).(server.DRPCServer), rpcServer{s}) return nil } @@ -51,6 +47,10 @@ func (s *service) Name() (name string) { } func (s *service) Run(ctx context.Context) (err error) { + go func() { + time.Sleep(time.Second * 10) + s.get(ctx, "testSpace") + }() return } @@ -71,10 +71,6 @@ func (s *service) get(ctx context.Context, id string) (Space, error) { return obj.(Space), nil } -func (s *service) HeadSync(ctx context.Context, request *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { - return nil, drpcerr.WithCode(fmt.Errorf("check"), 42) -} - func (s *service) Close(ctx context.Context) (err error) { return s.cache.Close() } diff --git a/service/space/space.go b/service/space/space.go index 169fd408..7be85ef4 100644 --- a/service/space/space.go +++ b/service/space/space.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/remotediff" + "github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync" "go.uber.org/zap" "math/rand" "sync" @@ -14,6 +16,9 @@ import ( type Space interface { Id() string + + HeadSync(ctx context.Context, req *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) + Close() error } @@ -44,21 +49,26 @@ func (s *space) Run(ctx context.Context) error { return nil } +func (s *space) HeadSync(ctx context.Context, req *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) { + return remotediff.HandlerRangeRequest(ctx, s.diff, req) +} + func (s *space) testFill() { var n = 1000 var els = make([]ldiff.Element, 0, n) rand.Seed(time.Now().UnixNano()) for i := 0; i < n; i++ { if rand.Intn(n) > 2 { - id := fmt.Sprintf("%s.%d", s.id, n) + id := fmt.Sprintf("%s.%d", s.id, i) head := "head." + id - if rand.Intn(n) > 2 { + if rand.Intn(n) > n-100 { head += ".modified" } - els = append(els, ldiff.Element{ + el := ldiff.Element{ Id: id, Head: head, - }) + } + els = append(els, el) } } s.diff.Set(els...) @@ -80,6 +90,7 @@ func (s *space) syncLoop() { for { select { case <-s.syncCtx.Done(): + return case <-ticker.C: doSync() } @@ -88,45 +99,40 @@ func (s *space) syncLoop() { } func (s *space) sync(ctx context.Context) error { - peerIds, err := s.peerIds(ctx) + peers, err := s.getPeers(ctx) if err != nil { return err } - for _, peerId := range peerIds { - if err := s.syncWithPeer(ctx, peerId); err != nil { - log.Error("can't sync with peer", zap.String("peer", peerId), zap.Error(err)) + for _, p := range peers { + if err := s.syncWithPeer(ctx, p); err != nil { + log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) } } return nil } -func (s *space) syncWithPeer(ctx context.Context, peerId string) (err error) { - rdiff := remotediff.NewRemoteDiff(s.s.pool, peerId, s.id) +func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { + cl := spacesync.NewDRPCSpaceClient(p) + rdiff := remotediff.NewRemoteDiff(s.id, cl) newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff) if err != nil { return nil } - log.Info("sync done:", zap.Strings("newIds", newIds), zap.Strings("changedIds", changedIds), zap.Strings("removedIds", removedIds)) + log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds))) return } -func (s *space) peerIds(ctx context.Context) (peerIds []string, err error) { +func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { if s.conf.IsResponsible(s.id) { - peers, err := s.conf.AllPeers(ctx, s.id) - if err != nil { - return nil, err - } - for _, p := range peers { - peerIds = append(peerIds, p.Id()) - } + return s.conf.AllPeers(ctx, s.id) } else { - peer, err := s.conf.OnePeer(ctx, s.id) + var p peer.Peer + p, err = s.conf.OnePeer(ctx, s.id) if err != nil { return nil, err } - peerIds = append(peerIds, peer.Id()) + return []peer.Peer{p}, nil } - return } func (s *space) Close() error {