Merge pull request #6 from anytypeio/space-diff

This commit is contained in:
Mikhail Rakhmanov 2022-09-07 12:38:17 +02:00 committed by GitHub
commit 2e1c2cbc6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 3186 additions and 764 deletions

View File

@ -32,8 +32,9 @@ protos-go:
$(GOGO_START) protoc --gogofaster_out=:. $(P_TREE_STORAGE_PATH_PB)/protos/*.proto; mv $(P_TREE_STORAGE_PATH_PB)/protos/*.go $(P_TREE_STORAGE_PATH_PB)
$(GOGO_START) protoc --gogofaster_out=:. $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.proto; mv $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.go $(P_PLAINTEXT_CHANGES_PATH_PB)
$(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES))
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP),plugins=grpc:. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. service/space/spacesync/protos/*.proto
build:
@$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app))
go build -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go
go build -v -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go

View File

@ -9,6 +9,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/api"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
@ -97,7 +98,7 @@ func Bootstrap(a *app.App) {
Register(server.New()).
Register(dialer.New()).
Register(pool.NewPool()).
//Register(&example.Example{})
Register(configuration.New()).
Register(document.New()).
Register(message.New()).
Register(requesthandler.New()).

View File

@ -29,6 +29,7 @@ type Config struct {
Account Account `yaml:"account"`
APIServer APIServer `yaml:"apiServer"`
Nodes []Node `yaml:"nodes"`
Space Space `yaml:"space"`
}
func (c *Config) Init(ctx context.Context, a *app.App) (err error) {

6
config/space.go Normal file
View File

@ -0,0 +1,6 @@
package config
type Space struct {
GCTTL int `json:"gcTTL"`
SyncPeriod int `json:"syncPeriod"`
}

12
go.mod
View File

@ -4,22 +4,27 @@ go 1.18
require (
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab
github.com/cheggaaa/mb v1.0.3
github.com/cespare/xxhash v1.1.0
github.com/goccy/go-graphviz v0.0.9
github.com/gogo/protobuf v1.3.2
github.com/huandu/skiplist v1.2.0
github.com/ipfs/go-cid v0.1.0
github.com/libp2p/go-libp2p v0.20.3
github.com/libp2p/go-libp2p-core v0.16.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.1.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.0
github.com/zeebo/blake3 v0.2.3
go.uber.org/zap v1.21.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v3 v3.0.1
storj.io/drpc v0.0.32
)
require (
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 // indirect
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
@ -27,7 +32,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
@ -48,7 +53,6 @@ require (
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
)

29
go.sum
View File

@ -1,3 +1,8 @@
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 h1:kMPPZYmJgbs4AJfodbg2OCXg5cp+9LPAJcLZJqmcghk=
github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232/go.mod h1:+PeHBAWp7gUh/yw6uAauKc5ku0w4cFNg6DUddGxoGq0=
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab h1:+cdNqtOJWjvepyhxy23G7z7vmpYCoC65AP0nqi1f53s=
github.com/awalterschulze/gographviz v0.0.0-20190522210029-fa59802746ab/go.mod h1:GEV5wmg4YquNw7v1kkyoX9etIk8yVmXj+AkDHuuETHs=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@ -8,8 +13,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.1.3 h1:xM/n3yIhHAhHy04z4i43C8p4ehixJZMsnrVJ
github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/cheggaaa/mb v1.0.3 h1:03ksWum+6kHclB+kjwKMaBtgl5gtNYUwNpxsHQciKe8=
github.com/cheggaaa/mb v1.0.3/go.mod h1:NUl0GBtFLlfg2o6iZwxzcG7Lslc2wV/ADTFbLXtVPE4=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA=
github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -27,6 +32,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c=
github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U=
github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw=
github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-cid v0.1.0 h1:YN33LQulcRHjfom/i25yoOZR4Telp1Hr/2RU3d0PnC0=
github.com/ipfs/go-cid v0.1.0/go.mod h1:rH5/Xv83Rfy8Rw6xG+id3DYAMUVmem1MowoKwdXmN2o=
@ -34,8 +43,9 @@ github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJS
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -85,18 +95,28 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg=
github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ=
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo=
github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
@ -164,6 +184,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

294
pkg/ldiff/diff.go Normal file
View File

@ -0,0 +1,294 @@
// Package ldiff provides a container of elements with fixed id and changeable content.
// Diff can calculate the difference with another diff container (you can make it remote) with minimum hops and traffic.
package ldiff
import (
"bytes"
"context"
"errors"
"github.com/cespare/xxhash"
"github.com/huandu/skiplist"
"github.com/zeebo/blake3"
"math"
"sync"
)
// New creates new Diff container
//
// divideFactor - means how many hashes you want to ask for once
//
// it must be 2 or greater
// normal value usually between 4 and 64
//
// compareThreshold - means the maximum count of elements remote diff will send directly
//
// if elements under range will be more - remote diff will send only hash
// it must be 1 or greater
// normal value between 8 and 64
//
// Less threshold and divideFactor - less traffic but more requests
func New(divideFactor, compareThreshold int) Diff {
if divideFactor < 2 {
divideFactor = 2
}
if compareThreshold < 1 {
compareThreshold = 1
}
d := &diff{
divideFactor: divideFactor,
compareThreshold: compareThreshold,
}
d.sl = skiplist.New(d)
return d
}
var hashersPool = &sync.Pool{
New: func() any {
return blake3.New()
},
}
var ErrElementNotFound = errors.New("element not found")
// Element of data
type Element struct {
Id string
Head string
}
// Range request to get RangeResult
type Range struct {
From, To uint64
Limit int
}
// RangeResult response for Range
type RangeResult struct {
Hash []byte
Elements []Element
Count int
}
type element struct {
Element
hash uint64
}
// Diff contains elements and can compare it with Remote diff
type Diff interface {
Remote
// Set adds or update elements in container
Set(elements ...Element)
// RemoveId removes element by id
RemoveId(id string) error
// Diff makes diff with remote container
Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error)
}
// Remote interface for using in the Diff
type Remote interface {
// Ranges calculates given ranges and return results
Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error)
}
type diff struct {
sl *skiplist.SkipList
divideFactor int
compareThreshold int
mu sync.RWMutex
}
// Compare implements skiplist interface
func (d *diff) Compare(lhs, rhs interface{}) int {
lhe := lhs.(*element)
rhe := rhs.(*element)
if lhe.Id == rhe.Id {
return 0
}
if lhe.hash > rhe.hash {
return 1
} else if lhe.hash < rhe.hash {
return -1
}
if lhe.Id > rhe.Id {
return 1
} else {
return -1
}
}
// CalcScore implements skiplist interface
func (d *diff) CalcScore(key interface{}) float64 {
return 0
}
// Set adds or update element in container
func (d *diff) Set(elements ...Element) {
d.mu.Lock()
defer d.mu.Unlock()
for _, e := range elements {
el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))}
d.sl.Remove(el)
d.sl.Set(el, nil)
}
}
// RemoveId removes element by id
func (d *diff) RemoveId(id string) error {
d.mu.Lock()
defer d.mu.Unlock()
el := &element{Element: Element{
Id: id,
}, hash: xxhash.Sum64([]byte(id))}
if d.sl.Remove(el) == nil {
return ErrElementNotFound
}
return nil
}
func (d *diff) getRange(r Range) (rr RangeResult) {
hasher := hashersPool.Get().(*blake3.Hasher)
defer hashersPool.Put(hasher)
hasher.Reset()
el := d.sl.Find(&element{hash: r.From})
rr.Elements = make([]Element, 0, r.Limit)
var overfill bool
for el != nil && el.Key().(*element).hash <= r.To {
elem := el.Key().(*element).Element
el = el.Next()
hasher.WriteString(elem.Id)
hasher.WriteString(elem.Head)
rr.Count++
if !overfill {
if len(rr.Elements) < r.Limit {
rr.Elements = append(rr.Elements, elem)
}
if len(rr.Elements) == r.Limit && el != nil {
overfill = true
}
}
}
if overfill {
rr.Elements = nil
}
rr.Hash = hasher.Sum(nil)
return
}
// Ranges calculates given ranges and return results
func (d *diff) Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error) {
d.mu.RLock()
defer d.mu.RUnlock()
results = resBuf[:0]
for _, r := range ranges {
results = append(results, d.getRange(r))
}
return
}
type diffCtx struct {
newIds, changedIds, removedIds []string
toSend, prepare []Range
myRes, otherRes []RangeResult
}
var errMismatched = errors.New("query and results mismatched")
// Diff makes diff with remote container
func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) {
d.mu.RLock()
defer d.mu.RUnlock()
dctx := &diffCtx{}
dctx.toSend = append(dctx.toSend, Range{
From: 0,
To: math.MaxUint64,
Limit: d.compareThreshold,
})
for len(dctx.toSend) > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
if dctx.myRes, err = d.Ranges(ctx, dctx.toSend, dctx.myRes); err != nil {
return
}
if dctx.otherRes, err = dl.Ranges(ctx, dctx.toSend, dctx.otherRes); err != nil {
return
}
if len(dctx.otherRes) != len(dctx.toSend) || len(dctx.myRes) != len(dctx.toSend) {
err = errMismatched
return
}
for i, r := range dctx.toSend {
d.compareResults(dctx, r, dctx.myRes[i], dctx.otherRes[i])
}
dctx.toSend, dctx.prepare = dctx.prepare, dctx.toSend
dctx.prepare = dctx.prepare[:0]
}
return dctx.newIds, dctx.changedIds, dctx.removedIds, nil
}
func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResult) {
// both hash equals - do nothing
if bytes.Equal(myRes.Hash, otherRes.Hash) {
return
}
// both has elements
if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count {
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
return
}
// make more queries
divideFactor := uint64(d.divideFactor)
perRange := (r.To - r.From) / divideFactor
align := ((r.To-r.From)%divideFactor + 1) % divideFactor
if align == 0 {
perRange += 1
}
var j = r.From
for i := 0; i < d.divideFactor; i++ {
if i == d.divideFactor-1 {
perRange += align
}
dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit})
j += perRange
}
return
}
func (d *diff) compareElements(dctx *diffCtx, my, other []Element) {
find := func(list []Element, targetEl Element) (has, eq bool) {
for _, el := range list {
if el.Id == targetEl.Id {
return true, el.Head == targetEl.Head
}
}
return false, false
}
for _, el := range my {
has, eq := find(other, el)
if !has {
dctx.removedIds = append(dctx.removedIds, el.Id)
continue
} else {
if !eq {
dctx.changedIds = append(dctx.changedIds, el.Id)
}
}
}
for _, el := range other {
if has, _ := find(my, el); !has {
dctx.newIds = append(dctx.newIds, el.Id)
}
}
}

140
pkg/ldiff/diff_test.go Normal file
View File

@ -0,0 +1,140 @@
package ldiff
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/mgo.v2/bson"
"math"
"testing"
)
func TestDiff_fillRange(t *testing.T) {
d := New(4, 4).(*diff)
for i := 0; i < 10; i++ {
el := Element{
Id: fmt.Sprint(i),
Head: fmt.Sprint("h", i),
}
d.Set(el)
}
t.Log(d.sl.Len())
t.Run("elements", func(t *testing.T) {
r := Range{From: 0, To: math.MaxUint64, Limit: 10}
res := d.getRange(r)
assert.NotNil(t, res.Hash)
assert.Len(t, res.Elements, 10)
})
t.Run("hash", func(t *testing.T) {
r := Range{From: 0, To: math.MaxUint64, Limit: 9}
res := d.getRange(r)
t.Log(len(res.Elements))
assert.NotNil(t, res.Hash)
assert.Nil(t, res.Elements)
})
}
func TestDiff_Diff(t *testing.T) {
ctx := context.Background()
t.Run("basic", func(t *testing.T) {
d1 := New(16, 16)
d2 := New(16, 16)
for i := 0; i < 1000; i++ {
id := fmt.Sprint(i)
head := bson.NewObjectId().Hex()
d1.Set(Element{
Id: id,
Head: head,
})
d2.Set(Element{
Id: id,
Head: head,
})
}
newIds, changedIds, removedIds, err := d1.Diff(ctx, d2)
require.NoError(t, err)
assert.Len(t, newIds, 0)
assert.Len(t, changedIds, 0)
assert.Len(t, removedIds, 0)
d2.Set(Element{
Id: "newD1",
Head: "newD1",
})
d2.Set(Element{
Id: "1",
Head: "changed",
})
require.NoError(t, d2.RemoveId("0"))
newIds, changedIds, removedIds, err = d1.Diff(ctx, d2)
require.NoError(t, err)
assert.Len(t, newIds, 1)
assert.Len(t, changedIds, 1)
assert.Len(t, removedIds, 1)
})
t.Run("empty", func(t *testing.T) {
d1 := New(16, 16)
d2 := New(16, 16)
newIds, changedIds, removedIds, err := d1.Diff(ctx, d2)
require.NoError(t, err)
assert.Len(t, newIds, 0)
assert.Len(t, changedIds, 0)
assert.Len(t, removedIds, 0)
})
t.Run("one empty", func(t *testing.T) {
d1 := New(4, 4)
d2 := New(4, 4)
for i := 0; i < 10; i++ {
d2.Set(Element{
Id: fmt.Sprint(i),
Head: bson.NewObjectId().Hex(),
})
}
newIds, changedIds, removedIds, err := d1.Diff(ctx, d2)
require.NoError(t, err)
assert.Len(t, newIds, 10)
assert.Len(t, changedIds, 0)
assert.Len(t, removedIds, 0)
})
t.Run("context cancel", func(t *testing.T) {
d1 := New(4, 4)
d2 := New(4, 4)
for i := 0; i < 10; i++ {
d2.Set(Element{
Id: fmt.Sprint(i),
Head: bson.NewObjectId().Hex(),
})
}
var cancel func()
ctx, cancel = context.WithCancel(ctx)
cancel()
_, _, _, err := d1.Diff(ctx, d2)
assert.ErrorIs(t, err, context.Canceled)
})
}
func BenchmarkDiff_Ranges(b *testing.B) {
d := New(16, 16)
for i := 0; i < 10000; i++ {
id := fmt.Sprint(i)
head := bson.NewObjectId().Hex()
d.Set(Element{
Id: id,
Head: head,
})
}
ctx := context.Background()
b.ResetTimer()
b.ReportAllocs()
var resBuf []RangeResult
var ranges = []Range{{From: 0, To: math.MaxUint64, Limit: 10}}
for i := 0; i < b.N; i++ {
d.Ranges(ctx, ranges, resBuf)
resBuf = resBuf[:0]
}
}

View File

@ -0,0 +1,77 @@
package configuration
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-chash"
)
func New() Service {
return new(service)
}
type Configuration interface {
// Id returns current configuration id
Id() string
// AllPeers returns all peers by spaceId except current account
AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error)
// OnePeer returns one of peer for spaceId
OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error)
// NodeIds returns list of peerId for given spaceId
NodeIds(spaceId string) []string
// IsResponsible checks if current account responsible for given spaceId
IsResponsible(spaceId string) bool
}
type configuration struct {
id string
accountId string
pool pool.Pool
chash chash.CHash
}
func (c *configuration) Id() string {
return c.id
}
func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) {
nodeIds := c.NodeIds(spaceId)
peers = make([]peer.Peer, 0, len(nodeIds))
for _, id := range nodeIds {
p, e := c.pool.DialAndAddPeer(ctx, id)
if e == nil {
peers = append(peers, p)
}
}
if len(peers) == 0 {
return nil, fmt.Errorf("unable to connect to any node")
}
return
}
func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) {
nodeIds := c.NodeIds(spaceId)
return c.pool.GetOrDialOneOf(ctx, nodeIds)
}
func (c *configuration) NodeIds(spaceId string) []string {
members := c.chash.GetMembers(spaceId)
res := make([]string, 0, len(members))
for _, m := range members {
if m.Id() != c.accountId {
res = append(res, m.Id())
}
}
return res
}
func (c *configuration) IsResponsible(spaceId string) bool {
for _, m := range c.chash.GetMembers(spaceId) {
if m.Id() == c.accountId {
return true
}
}
return false
}

View File

@ -0,0 +1,73 @@
package configuration
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
"github.com/anytypeio/go-chash"
)
const CName = "configuration"
const (
partitionCount = 3000
replicationFactor = 3
)
var log = logger.NewNamed(CName)
type Service interface {
GetLast() Configuration
GetById(id string) Configuration
app.Component
}
type service struct {
accountId string
pool pool.Pool
last Configuration
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
conf := a.MustComponent(config.CName).(*config.Config)
s.accountId = conf.Account.PeerId
s.pool = a.MustComponent(pool.CName).(pool.Pool)
configNodes := a.MustComponent(node.CName).(node.Service).Nodes()
config := &configuration{
id: "config",
accountId: s.accountId,
pool: s.pool,
}
if config.chash, err = chash.New(chash.Config{
PartitionCount: partitionCount,
ReplicationFactor: replicationFactor,
}); err != nil {
return
}
members := make([]chash.Member, 0, len(configNodes))
for _, n := range configNodes {
members = append(members, n)
}
if err = config.chash.AddMembers(members...); err != nil {
return
}
s.last = config
return
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) GetLast() Configuration {
return s.last
}
func (s *service) GetById(id string) Configuration {
//TODO implement me
panic("implement me")
}

View File

@ -1,84 +0,0 @@
package example
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"go.uber.org/zap"
"strings"
"time"
)
var log = logger.NewNamed("example")
type Example struct {
pool pool.Pool
peerConf config.PeerList
}
func (e *Example) Init(ctx context.Context, a *app.App) (err error) {
e.pool = a.MustComponent(pool.CName).(pool.Pool)
e.peerConf = a.MustComponent(config.CName).(*config.Config).PeerList
// subscribe for sync messages
e.pool.AddHandler(syncproto.MessageType_MessageTypeSync, e.syncHandler)
return
}
func (e *Example) Name() (name string) {
return "example"
}
func (e *Example) Run(ctx context.Context) (err error) {
// dial manually with all peers
for _, rp := range e.peerConf.Remote {
if er := e.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil {
log.Info("can't dial to peer", zap.Error(er))
} else {
log.Info("connected with peer", zap.String("peerId", rp.PeerId))
}
}
go e.doRequests()
return nil
}
func (e *Example) syncHandler(ctx context.Context, msg *pool.Message) (err error) {
data := string(msg.Data) // you need unmarshal this bytes
log.Info("msg received", zap.String("peerId", msg.Peer().Id()), zap.String("data", data))
if strings.HasPrefix(data, "ack:") {
if err = msg.Ack(); err != nil {
log.Error("ack error", zap.Error(err))
}
} else if strings.HasPrefix(data, "ackErr:") {
if err = msg.AckError(42, "ack error description"); err != nil {
log.Error("ackErr error", zap.Error(err))
}
} else if strings.HasPrefix(data, "reply:") {
if err = msg.Reply([]byte("reply for:" + strings.TrimPrefix(data, "reply:"))); err != nil {
log.Error("reply error", zap.Error(err))
}
}
return nil
}
func (e *Example) doRequests() {
time.Sleep(time.Second)
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
st := time.Now()
err := e.pool.SendAndWait(ctx, e.peerConf.Remote[0].PeerId, &syncproto.Message{
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
Data: []byte("ack: something"),
})
log.Info("sent with ack:", zap.Error(err), zap.Duration("dur", time.Since(st)))
}
func (e *Example) Close(ctx context.Context) (err error) {
return
}

View File

@ -0,0 +1,32 @@
package handler
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)
var log = logger.NewNamed("replyHandler")
type ReplyHandler interface {
Handle(ctx context.Context, req []byte) (rep proto.Marshaler, err error)
}
type Reply struct {
ReplyHandler
}
func (r Reply) Handle(ctx context.Context, msg *pool.Message) error {
rep, e := r.ReplyHandler.Handle(ctx, msg.GetData())
if msg.GetHeader().RequestId == 0 {
if e != nil {
log.Error("handler returned error", zap.Error(e))
} else if rep != nil {
log.Debug("sender didn't expect a reply, but the handler made")
}
return nil
}
return msg.ReplyType(msg.GetHeader().GetType(), rep)
}

View File

@ -1,8 +1,10 @@
package pool
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"gopkg.in/mgo.v2/bson"
)
@ -28,6 +30,22 @@ func (m *Message) Reply(data []byte) (err error) {
return m.peer.Send(rep)
}
func (m *Message) ReplyType(tp syncproto.MessageType, data proto.Marshaler) (err error) {
dataBytes, err := data.Marshal()
if err != nil {
return err
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: tp,
},
Data: dataBytes,
}
return m.peer.Send(rep)
}
func (m *Message) Ack() (err error) {
ack := &syncproto.System{
Ack: &syncproto.SystemAck{},
@ -95,3 +113,24 @@ func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (
}
return m.peer.Send(rep)
}
func (m *Message) IsAck() (err error) {
if tp := m.GetHeader().GetType(); tp != syncproto.MessageType_MessageTypeSystem {
return fmt.Errorf("unexpected message type in response: %v, want System", tp)
}
sys := &syncproto.System{}
if err = sys.Unmarshal(m.GetData()); err != nil {
return
}
if ack := sys.Ack; ack != nil {
if ack.Error != nil {
return fmt.Errorf("response error: code=%d; descriptipon=%s", ack.Error.Code, ack.Error.Description)
}
return nil
}
return fmt.Errorf("received not ack response")
}
func (m *Message) UnmarshalData(msg proto.Unmarshaler) error {
return msg.Unmarshal(m.Data)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
"math/rand"
"sync"
"sync/atomic"
)
@ -34,13 +35,15 @@ func NewPool() Pool {
type Handler func(ctx context.Context, msg *Message) (err error)
type Pool interface {
DialAndAddPeer(ctx context.Context, id string) (err error)
AddAndReadPeer(peer peer.Peer) (err error)
AddHandler(msgType syncproto.MessageType, h Handler)
AddPeerIdToGroup(peerId, groupId string) (err error)
RemovePeerIdFromGroup(peerId, groupId string) (err error)
DialAndAddPeer(ctx context.Context, id string) (peer.Peer, error)
GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error)
SendAndWaitResponse(ctx context.Context, id string, s *syncproto.Message) (resp *Message, err error)
Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error)
app.ComponentRunnable
@ -88,25 +91,29 @@ func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) {
p.handlers[msgType] = append(p.handlers[msgType], h)
}
func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (err error) {
func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (peer.Peer, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrPoolClosed
return nil, ErrPoolClosed
}
if _, ok := p.peersById[peerId]; ok {
return nil
return p.dialAndAdd(ctx, peerId)
}
func (p *pool) dialAndAdd(ctx context.Context, peerId string) (peer.Peer, error) {
if peer, ok := p.peersById[peerId]; ok {
return peer.peer, nil
}
peer, err := p.dialer.Dial(ctx, peerId)
if err != nil {
return
return nil, err
}
p.peersById[peer.Id()] = &peerEntry{
peer: peer,
}
p.wg.Add(1)
go p.readPeerLoop(peer)
return nil
return peer, nil
}
func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) {
@ -154,6 +161,14 @@ func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) {
}
func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) {
resp, err := p.SendAndWaitResponse(ctx, peerId, msg)
if err != nil {
return
}
return resp.IsAck()
}
func (p *pool) SendAndWaitResponse(ctx context.Context, peerId string, msg *syncproto.Message) (resp *Message, err error) {
defer func() {
if err != nil {
log.With(
@ -191,7 +206,10 @@ func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Me
case rep := <-ch:
if rep.Error != nil {
err = rep.Error
return
}
resp = rep.Message
return
case <-ctx.Done():
log.Debug("context done in SendAndWait")
err = ctx.Err()
@ -199,6 +217,38 @@ func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Me
return
}
func (p *pool) GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return nil, ErrPoolClosed
}
for _, peerId := range peerIds {
peer, ok := p.peersById[peerId]
if ok {
p.mu.RUnlock()
return peer.peer, nil
}
}
p.mu.RUnlock()
rand.Shuffle(len(peerIds), func(i, j int) {
peerIds[i], peerIds[j] = peerIds[j], peerIds[i]
})
p.mu.Lock()
defer p.mu.Unlock()
var lastErr error
for _, peerId := range peerIds {
peer, err := p.dialAndAdd(ctx, peerId)
if err != nil {
lastErr = err
continue
} else {
return peer, nil
}
}
return nil, lastErr
}
func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) {
//TODO implement me
panic("implement me")

View File

@ -24,6 +24,14 @@ type Node struct {
EncryptionKeyString string
}
func (n *Node) Id() string {
return n.PeerId
}
func (n *Node) Capacity() float64 {
return 1
}
func New() app.Component {
return &service{}
}

View File

@ -0,0 +1,125 @@
package remotediff
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
)
func NewRemoteDiff(p pool.Pool, peerId, spaceId string) ldiff.Remote {
return remote{
pool: p,
peerId: peerId,
spaceId: spaceId,
}
}
type remote struct {
pool pool.Pool
peerId string
spaceId string
}
func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) {
results = resBuf[:0]
pbRanges := make([]*spacesync.DiffRangeRequestRange, 0, len(ranges))
for _, rg := range ranges {
pbRanges = append(pbRanges, &spacesync.DiffRangeRequestRange{
From: rg.From,
To: rg.To,
Limit: uint32(rg.Limit),
})
}
req := &spacesync.Space{
SpaceId: r.spaceId,
Message: &spacesync.SpaceContent{
Value: &spacesync.SpaceContentValueOfDiffRange{
DiffRange: &spacesync.DiffRange{
Request: &spacesync.DiffRangeRequest{
Ranges: pbRanges,
},
},
},
},
}
msg, err := req.Marshal()
if err != nil {
return
}
resp, err := r.pool.SendAndWaitResponse(ctx, r.peerId, &syncproto.Message{
Header: &syncproto.Header{
Type: syncproto.MessageType_MessageTypeSpace,
},
Data: msg,
})
if err != nil {
return
}
var spaceResp = &spacesync.Space{}
if err = resp.UnmarshalData(spaceResp); err != nil {
return
}
rangeResp := spaceResp.GetMessage().GetDiffRange().GetResponse()
if rangeResp != nil {
return nil, fmt.Errorf("got nil response")
}
for _, rr := range rangeResp.Results {
var elms []ldiff.Element
if len(rr.Elements) > 0 {
elms = make([]ldiff.Element, 0, len(rr.Elements))
}
results = append(results, ldiff.RangeResult{
Hash: rr.Hash,
Elements: elms,
Count: int(rr.Count),
})
}
return
}
func HandlerRangeRequest(ctx context.Context, d ldiff.Diff, diffRange *spacesync.DiffRange) (resp *spacesync.DiffRange, err error) {
req := diffRange.GetRequest()
if req != nil {
return nil, fmt.Errorf("received nil request")
}
ranges := make([]ldiff.Range, 0, len(req.Ranges))
for _, reqRange := range req.Ranges {
ranges = append(ranges, ldiff.Range{
From: reqRange.From,
To: reqRange.To,
Limit: int(reqRange.Limit),
})
}
res, err := d.Ranges(ctx, ranges, nil)
if err != nil {
return
}
var rangeResp = &spacesync.DiffRangeResponse{
Results: make([]*spacesync.DiffRangeResponseResult, len(res)),
}
for _, rangeRes := range res {
var elements []*spacesync.DiffRangeResponseResultElement
if len(rangeRes.Elements) > 0 {
elements = make([]*spacesync.DiffRangeResponseResultElement, 0, len(rangeRes.Elements))
for _, el := range rangeRes.Elements {
elements = append(elements, &spacesync.DiffRangeResponseResultElement{
Id: el.Id,
Head: el.Head,
})
}
}
rangeResp.Results = append(rangeResp.Results, &spacesync.DiffRangeResponseResult{
Hash: rangeRes.Hash,
Elements: elements,
Count: uint32(rangeRes.Count),
})
}
return &spacesync.DiffRange{
Response: rangeResp,
}, nil
}

92
service/space/service.go Normal file
View File

@ -0,0 +1,92 @@
package space
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool/handler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"time"
)
const CName = "space"
var log = logger.NewNamed(CName)
func New() Service {
return new(service)
}
type Service interface {
handler.ReplyHandler
app.ComponentRunnable
}
type service struct {
conf config.Space
cache ocache.OCache
pool pool.Pool
confService configuration.Service
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
s.conf = a.MustComponent(config.CName).(*config.Config).Space
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.confService = a.MustComponent(configuration.CName).(configuration.Service)
ttlSec := time.Second * time.Duration(s.conf.GCTTL)
s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute))
s.pool.AddHandler(syncproto.MessageType_MessageTypeSpace, handler.Reply{ReplyHandler: s}.Handle)
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
return
}
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
// TODO: load from database here
sp := &space{s: s, id: id, conf: s.confService.GetLast()}
if err = sp.Run(ctx); err != nil {
return nil, err
}
return sp, nil
}
func (s *service) get(ctx context.Context, id string) (Space, error) {
obj, err := s.cache.Get(ctx, id)
if err != nil {
return nil, err
}
return obj.(Space), nil
}
func (s *service) Handle(ctx context.Context, data []byte) (resp proto.Marshaler, err error) {
var spaceReq = &spacesync.Space{}
if err = spaceReq.Unmarshal(data); err != nil {
return
}
if spaceReq.SpaceId != "" {
sp, err := s.get(ctx, spaceReq.SpaceId)
if err != nil {
return
}
return sp.Handle(ctx, spaceReq)
}
return nil, fmt.Errorf("unexpected space message")
}
func (s *service) Close(ctx context.Context) (err error) {
return s.cache.Close()
}

154
service/space/space.go Normal file
View File

@ -0,0 +1,154 @@
package space
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/remotediff"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync"
"go.uber.org/zap"
"math/rand"
"sync"
"time"
)
type Space interface {
Id() string
Handle(ctx context.Context, msg *spacesync.Space) (repl *spacesync.Space, err error)
Close() error
}
//
type space struct {
id string
conf configuration.Configuration
diff ldiff.Diff
diffHandler func()
syncCtx context.Context
syncCancel func()
syncLoopDone chan struct{}
s *service
mu sync.RWMutex
}
func (s *space) Id() string {
return s.id
}
func (s *space) Run(ctx context.Context) error {
s.diff = ldiff.New(16, 16)
s.syncCtx, s.syncCancel = context.WithCancel(context.Background())
s.syncLoopDone = make(chan struct{})
s.testFill()
go s.syncLoop()
return nil
}
func (s *space) testFill() {
var n = 1000
var els = make([]ldiff.Element, 0, n)
rand.Seed(time.Now().UnixNano())
for i := 0; i < n; i++ {
if rand.Intn(n) > 2 {
id := fmt.Sprintf("%s.%d", s.id, n)
head := "head." + id
if rand.Intn(n) > 2 {
head += ".modified"
}
els = append(els, ldiff.Element{
Id: id,
Head: head,
})
}
}
s.diff.Set(els...)
}
func (s *space) Handle(ctx context.Context, msg *spacesync.Space) (repl *spacesync.Space, err error) {
if diffRange := msg.GetMessage().GetDiffRange(); diffRange != nil {
resp, er := remotediff.HandlerRangeRequest(ctx, s.diff, diffRange)
if er != nil {
return nil, er
}
return &spacesync.Space{SpaceId: s.id, Message: &spacesync.SpaceContent{
Value: &spacesync.SpaceContentValueOfDiffRange{
DiffRange: resp,
},
}}, nil
}
return nil, fmt.Errorf("unexpected request")
}
func (s *space) syncLoop() {
defer close(s.syncLoopDone)
doSync := func() {
ctx, cancel := context.WithTimeout(s.syncCtx, time.Minute)
defer cancel()
if err := s.sync(ctx); err != nil {
log.Error("periodic sync error", zap.Error(err), zap.String("spaceId", s.id))
}
}
doSync()
if s.s.conf.SyncPeriod > 0 {
ticker := time.NewTicker(time.Second * time.Duration(s.s.conf.SyncPeriod))
defer ticker.Stop()
for {
select {
case <-s.syncCtx.Done():
case <-ticker.C:
doSync()
}
}
}
}
func (s *space) sync(ctx context.Context) error {
peerIds, err := s.peerIds(ctx)
if err != nil {
return err
}
for _, peerId := range peerIds {
if err := s.syncWithPeer(ctx, peerId); err != nil {
log.Error("can't sync with peer", zap.String("peer", peerId), zap.Error(err))
}
}
return nil
}
func (s *space) syncWithPeer(ctx context.Context, peerId string) (err error) {
rdiff := remotediff.NewRemoteDiff(s.s.pool, peerId, s.id)
newIds, changedIds, removedIds, err := s.diff.Diff(ctx, rdiff)
if err != nil {
return nil
}
log.Info("sync done:", zap.Strings("newIds", newIds), zap.Strings("changedIds", changedIds), zap.Strings("removedIds", removedIds))
return
}
func (s *space) peerIds(ctx context.Context) (peerIds []string, err error) {
if s.conf.IsResponsible(s.id) {
peers, err := s.conf.AllPeers(ctx, s.id)
if err != nil {
return nil, err
}
for _, p := range peers {
peerIds = append(peerIds, p.Id())
}
} else {
peer, err := s.conf.OnePeer(ctx, s.id)
if err != nil {
return nil, err
}
peerIds = append(peerIds, peer.Id())
}
return
}
func (s *space) Close() error {
s.syncCancel()
<-s.syncLoopDone
return nil
}

View File

@ -0,0 +1,41 @@
syntax = "proto3";
package anytype;
option go_package = "service/space/spacesync";
message Space {
string spaceId = 1;
Content message = 2;
message Content {
oneof value {
DiffRange diffRange = 1;
}
}
}
message DiffRange {
Request request = 1;
Response response = 2;
message Request {
repeated Range ranges = 1;
message Range {
uint64 from = 1;
uint64 to = 2;
uint32 limit = 3;
}
}
message Response {
repeated Result results = 1;
message Result {
bytes hash = 1;
repeated Element elements = 2;
uint32 count = 3;
message Element {
string id = 1;
string head = 2;
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -75,7 +75,7 @@ func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err err
}
func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) {
err = s.pool.DialAndAddPeer(context.Background(), peerId)
_, err = s.pool.DialAndAddPeer(context.Background(), peerId)
if err != nil {
return
}

View File

@ -20,7 +20,7 @@ message Header {
enum MessageType {
MessageTypeSystem = 0;
MessageTypeSubscription = 1;
MessageTypeSpace = 1;
MessageTypeSync = 2;
}
@ -49,18 +49,6 @@ message System {
}
}
message Subscription {
SubscribeSpace subscribeSpace = 1;
UnsubscribeSpace unsubscribeSpace = 2;
message SubscribeSpace {
string spaceId = 1;
}
message UnsubscribeSpace {
string spaceId = 1;
}
}
message Sync {
string spaceId = 1;
ContentValue message = 2;

View File

@ -27,21 +27,21 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type MessageType int32
const (
MessageType_MessageTypeSystem MessageType = 0
MessageType_MessageTypeSubscription MessageType = 1
MessageType_MessageTypeSync MessageType = 2
MessageType_MessageTypeSystem MessageType = 0
MessageType_MessageTypeSpace MessageType = 1
MessageType_MessageTypeSync MessageType = 2
)
var MessageType_name = map[int32]string{
0: "MessageTypeSystem",
1: "MessageTypeSubscription",
1: "MessageTypeSpace",
2: "MessageTypeSync",
}
var MessageType_value = map[string]int32{
"MessageTypeSystem": 0,
"MessageTypeSubscription": 1,
"MessageTypeSync": 2,
"MessageTypeSystem": 0,
"MessageTypeSpace": 1,
"MessageTypeSync": 2,
}
func (x MessageType) String() string {
@ -449,146 +449,6 @@ func (m *SystemError) GetDescription() string {
return ""
}
type Subscription struct {
SubscribeSpace *SubscriptionSubscribeSpace `protobuf:"bytes,1,opt,name=subscribeSpace,proto3" json:"subscribeSpace,omitempty"`
UnsubscribeSpace *SubscriptionUnsubscribeSpace `protobuf:"bytes,2,opt,name=unsubscribeSpace,proto3" json:"unsubscribeSpace,omitempty"`
}
func (m *Subscription) Reset() { *m = Subscription{} }
func (m *Subscription) String() string { return proto.CompactTextString(m) }
func (*Subscription) ProtoMessage() {}
func (*Subscription) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{3}
}
func (m *Subscription) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Subscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Subscription.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Subscription) XXX_Merge(src proto.Message) {
xxx_messageInfo_Subscription.Merge(m, src)
}
func (m *Subscription) XXX_Size() int {
return m.Size()
}
func (m *Subscription) XXX_DiscardUnknown() {
xxx_messageInfo_Subscription.DiscardUnknown(m)
}
var xxx_messageInfo_Subscription proto.InternalMessageInfo
func (m *Subscription) GetSubscribeSpace() *SubscriptionSubscribeSpace {
if m != nil {
return m.SubscribeSpace
}
return nil
}
func (m *Subscription) GetUnsubscribeSpace() *SubscriptionUnsubscribeSpace {
if m != nil {
return m.UnsubscribeSpace
}
return nil
}
type SubscriptionSubscribeSpace struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
}
func (m *SubscriptionSubscribeSpace) Reset() { *m = SubscriptionSubscribeSpace{} }
func (m *SubscriptionSubscribeSpace) String() string { return proto.CompactTextString(m) }
func (*SubscriptionSubscribeSpace) ProtoMessage() {}
func (*SubscriptionSubscribeSpace) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{3, 0}
}
func (m *SubscriptionSubscribeSpace) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SubscriptionSubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SubscriptionSubscribeSpace.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SubscriptionSubscribeSpace) XXX_Merge(src proto.Message) {
xxx_messageInfo_SubscriptionSubscribeSpace.Merge(m, src)
}
func (m *SubscriptionSubscribeSpace) XXX_Size() int {
return m.Size()
}
func (m *SubscriptionSubscribeSpace) XXX_DiscardUnknown() {
xxx_messageInfo_SubscriptionSubscribeSpace.DiscardUnknown(m)
}
var xxx_messageInfo_SubscriptionSubscribeSpace proto.InternalMessageInfo
func (m *SubscriptionSubscribeSpace) GetSpaceId() string {
if m != nil {
return m.SpaceId
}
return ""
}
type SubscriptionUnsubscribeSpace struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
}
func (m *SubscriptionUnsubscribeSpace) Reset() { *m = SubscriptionUnsubscribeSpace{} }
func (m *SubscriptionUnsubscribeSpace) String() string { return proto.CompactTextString(m) }
func (*SubscriptionUnsubscribeSpace) ProtoMessage() {}
func (*SubscriptionUnsubscribeSpace) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{3, 1}
}
func (m *SubscriptionUnsubscribeSpace) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SubscriptionUnsubscribeSpace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SubscriptionUnsubscribeSpace.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SubscriptionUnsubscribeSpace) XXX_Merge(src proto.Message) {
xxx_messageInfo_SubscriptionUnsubscribeSpace.Merge(m, src)
}
func (m *SubscriptionUnsubscribeSpace) XXX_Size() int {
return m.Size()
}
func (m *SubscriptionUnsubscribeSpace) XXX_DiscardUnknown() {
xxx_messageInfo_SubscriptionUnsubscribeSpace.DiscardUnknown(m)
}
var xxx_messageInfo_SubscriptionUnsubscribeSpace proto.InternalMessageInfo
func (m *SubscriptionUnsubscribeSpace) GetSpaceId() string {
if m != nil {
return m.SpaceId
}
return ""
}
type Sync struct {
SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"`
Message *SyncContentValue `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
@ -598,7 +458,7 @@ func (m *Sync) Reset() { *m = Sync{} }
func (m *Sync) String() string { return proto.CompactTextString(m) }
func (*Sync) ProtoMessage() {}
func (*Sync) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4}
return fileDescriptor_4b28dfdd48a89166, []int{3}
}
func (m *Sync) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -653,7 +513,7 @@ func (m *SyncContentValue) Reset() { *m = SyncContentValue{} }
func (m *SyncContentValue) String() string { return proto.CompactTextString(m) }
func (*SyncContentValue) ProtoMessage() {}
func (*SyncContentValue) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4, 0}
return fileDescriptor_4b28dfdd48a89166, []int{3, 0}
}
func (m *SyncContentValue) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -751,7 +611,7 @@ func (m *SyncHeadUpdate) Reset() { *m = SyncHeadUpdate{} }
func (m *SyncHeadUpdate) String() string { return proto.CompactTextString(m) }
func (*SyncHeadUpdate) ProtoMessage() {}
func (*SyncHeadUpdate) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4, 1}
return fileDescriptor_4b28dfdd48a89166, []int{3, 1}
}
func (m *SyncHeadUpdate) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -822,7 +682,7 @@ func (m *SyncFull) Reset() { *m = SyncFull{} }
func (m *SyncFull) String() string { return proto.CompactTextString(m) }
func (*SyncFull) ProtoMessage() {}
func (*SyncFull) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4, 2}
return fileDescriptor_4b28dfdd48a89166, []int{3, 2}
}
func (m *SyncFull) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -864,7 +724,7 @@ func (m *SyncFullRequest) Reset() { *m = SyncFullRequest{} }
func (m *SyncFullRequest) String() string { return proto.CompactTextString(m) }
func (*SyncFullRequest) ProtoMessage() {}
func (*SyncFullRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4, 2, 0}
return fileDescriptor_4b28dfdd48a89166, []int{3, 2, 0}
}
func (m *SyncFullRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -940,7 +800,7 @@ func (m *SyncFullResponse) Reset() { *m = SyncFullResponse{} }
func (m *SyncFullResponse) String() string { return proto.CompactTextString(m) }
func (*SyncFullResponse) ProtoMessage() {}
func (*SyncFullResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_4b28dfdd48a89166, []int{4, 2, 1}
return fileDescriptor_4b28dfdd48a89166, []int{3, 2, 1}
}
func (m *SyncFullResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -1014,9 +874,6 @@ func init() {
proto.RegisterType((*SystemPing)(nil), "anytype.System.Ping")
proto.RegisterType((*SystemAck)(nil), "anytype.System.Ack")
proto.RegisterType((*SystemError)(nil), "anytype.System.Error")
proto.RegisterType((*Subscription)(nil), "anytype.Subscription")
proto.RegisterType((*SubscriptionSubscribeSpace)(nil), "anytype.Subscription.SubscribeSpace")
proto.RegisterType((*SubscriptionUnsubscribeSpace)(nil), "anytype.Subscription.UnsubscribeSpace")
proto.RegisterType((*Sync)(nil), "anytype.Sync")
proto.RegisterType((*SyncContentValue)(nil), "anytype.Sync.ContentValue")
proto.RegisterType((*SyncHeadUpdate)(nil), "anytype.Sync.HeadUpdate")
@ -1028,62 +885,57 @@ func init() {
func init() { proto.RegisterFile("syncproto/proto/sync.proto", fileDescriptor_4b28dfdd48a89166) }
var fileDescriptor_4b28dfdd48a89166 = []byte{
// 868 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x56, 0xd1, 0x8e, 0xda, 0x46,
0x14, 0x65, 0xc0, 0x40, 0xb8, 0x20, 0xd6, 0x9d, 0x24, 0xad, 0xeb, 0x44, 0x08, 0xa1, 0xb4, 0x45,
0x69, 0xe4, 0x8d, 0x68, 0xa3, 0x4a, 0x7d, 0x4b, 0xb6, 0xbb, 0x02, 0x35, 0x05, 0x34, 0xc0, 0x56,
0xea, 0x4b, 0x34, 0xd8, 0x13, 0x40, 0x78, 0xc7, 0xae, 0xc7, 0xb4, 0xe5, 0x17, 0xfa, 0x94, 0x6f,
0xe8, 0x37, 0x54, 0x6a, 0xd5, 0x2f, 0xe8, 0x63, 0x1e, 0xfb, 0x58, 0xed, 0x4a, 0xfd, 0x88, 0xf6,
0xa5, 0x9a, 0x19, 0x1b, 0x7b, 0x9d, 0xcd, 0x07, 0xe4, 0x01, 0x98, 0x7b, 0xee, 0x39, 0xd7, 0xe7,
0xce, 0x30, 0x17, 0xc0, 0x16, 0x7b, 0xee, 0x86, 0x51, 0x10, 0x07, 0xc7, 0xfa, 0x5d, 0xc6, 0x8e,
0x5a, 0xe2, 0x3a, 0xe5, 0xfb, 0x78, 0x1f, 0x32, 0xfb, 0x71, 0xb8, 0x5d, 0x1d, 0x53, 0xd7, 0x97,
0x2f, 0x77, 0x4d, 0xf9, 0x8a, 0x09, 0xb9, 0x0c, 0x97, 0x5a, 0x23, 0x72, 0xb8, 0x96, 0xda, 0x8f,
0x52, 0x45, 0x1c, 0x31, 0x26, 0xe2, 0x20, 0xa2, 0x2b, 0xa6, 0xd6, 0x99, 0x46, 0x46, 0x9a, 0xdd,
0x3b, 0x83, 0xfa, 0x37, 0x4c, 0x08, 0xba, 0x62, 0xf8, 0x13, 0xa8, 0xad, 0x19, 0xf5, 0x58, 0x64,
0xa1, 0x2e, 0xea, 0x37, 0x07, 0x47, 0x4e, 0x62, 0xc2, 0x19, 0x2a, 0x98, 0x24, 0x69, 0x8c, 0xc1,
0xf0, 0x68, 0x4c, 0xad, 0x72, 0x17, 0xf5, 0x5b, 0x44, 0xad, 0x7b, 0xbf, 0x20, 0xa8, 0x69, 0x1a,
0xb6, 0xa0, 0x1e, 0x47, 0xd4, 0x65, 0x23, 0x4f, 0x15, 0x6a, 0x91, 0x34, 0xc4, 0xf7, 0xa1, 0x11,
0xb1, 0xef, 0x77, 0x4c, 0xc4, 0x23, 0x4f, 0xa9, 0x0d, 0x92, 0x01, 0x52, 0x17, 0xb1, 0xd0, 0xdf,
0x8f, 0x3c, 0xab, 0xa2, 0x72, 0x69, 0x88, 0xfb, 0x60, 0x48, 0x1f, 0x96, 0xd1, 0x45, 0xfd, 0xf6,
0xe0, 0xce, 0xc1, 0x57, 0xe2, 0x7c, 0xbe, 0x0f, 0x19, 0x51, 0x0c, 0xf9, 0x04, 0x8f, 0x2d, 0x77,
0xab, 0x11, 0x7f, 0x19, 0x58, 0xd5, 0x2e, 0xea, 0x37, 0x48, 0x06, 0xf4, 0x7e, 0xad, 0x40, 0x6d,
0xb6, 0x17, 0x31, 0xbb, 0xc0, 0x5f, 0x40, 0x63, 0x4d, 0xb9, 0x27, 0xd6, 0x74, 0xcb, 0x92, 0x7e,
0x3f, 0x3c, 0xd4, 0xd5, 0x1c, 0x67, 0x98, 0x12, 0x48, 0xc6, 0x95, 0x5e, 0xc2, 0x0d, 0x5f, 0x29,
0xfb, 0xcd, 0x9c, 0x97, 0x44, 0x33, 0xdd, 0xf0, 0x15, 0x51, 0x0c, 0xfc, 0x11, 0x54, 0xa8, 0xbb,
0x55, 0xbd, 0x34, 0x07, 0xb7, 0x8b, 0xc4, 0xa7, 0xee, 0x96, 0xc8, 0xbc, 0xfd, 0x04, 0x1a, 0xc3,
0x5c, 0xf5, 0x23, 0x75, 0x2e, 0x6e, 0xe0, 0x9f, 0xb3, 0x48, 0x6c, 0x02, 0xae, 0xcc, 0x35, 0x48,
0x11, 0xb6, 0x7b, 0x60, 0xc8, 0x67, 0x61, 0x1b, 0x6e, 0xed, 0xf8, 0xe6, 0xa7, 0xf9, 0xe6, 0x42,
0xf7, 0x61, 0x90, 0x43, 0x6c, 0x0f, 0xa0, 0xf2, 0xd4, 0xdd, 0xe2, 0x4f, 0xa1, 0xca, 0xa2, 0x28,
0x88, 0x12, 0xcf, 0x77, 0x8b, 0x56, 0x4e, 0x65, 0x92, 0x68, 0x8e, 0xfd, 0x0a, 0x41, 0x55, 0x01,
0xd8, 0x01, 0xc3, 0x0d, 0x3c, 0x5d, 0xb5, 0x3d, 0xb0, 0x6f, 0x54, 0x39, 0x27, 0x81, 0xc7, 0x88,
0xe2, 0xe1, 0x2e, 0x34, 0x3d, 0x26, 0xdc, 0x68, 0x13, 0xc6, 0xd2, 0x77, 0x59, 0xf9, 0xce, 0x43,
0xbd, 0x27, 0x60, 0x48, 0x3e, 0x6e, 0x42, 0x7d, 0x31, 0xfe, 0x7a, 0x3c, 0xf9, 0x76, 0x6c, 0x96,
0x70, 0x17, 0xee, 0x2f, 0xc6, 0xb3, 0xc5, 0x74, 0x3a, 0x21, 0xf3, 0xd3, 0xaf, 0x5e, 0x4c, 0xc9,
0x64, 0x3e, 0x39, 0x99, 0x3c, 0x7f, 0x71, 0x7e, 0x4a, 0x66, 0xa3, 0xc9, 0xd8, 0x84, 0xde, 0xcf,
0x65, 0x68, 0xcd, 0x76, 0xcb, 0x43, 0x1d, 0xfc, 0x1c, 0xda, 0x42, 0xc7, 0x4b, 0x36, 0x0b, 0xa9,
0x9b, 0x9e, 0xe0, 0x83, 0xcc, 0x63, 0x8e, 0x9e, 0x06, 0x09, 0x97, 0x14, 0xb4, 0x98, 0x80, 0xb9,
0xe3, 0x85, 0x7a, 0x7a, 0xa7, 0x3e, 0xbe, 0xb9, 0xde, 0xa2, 0xc0, 0x26, 0x6f, 0xe8, 0xed, 0x87,
0xd0, 0xbe, 0xfe, 0x54, 0xf9, 0xed, 0x16, 0x61, 0x76, 0x2b, 0x1a, 0x24, 0x0d, 0xed, 0x47, 0x60,
0x16, 0x2b, 0xbe, 0x9d, 0xdd, 0xfb, 0xb7, 0x06, 0xc6, 0x6c, 0xcf, 0xdd, 0xb7, 0x53, 0xf0, 0xe7,
0x50, 0xbf, 0xd0, 0x37, 0x23, 0xe9, 0x23, 0x7f, 0x76, 0xdc, 0x75, 0x4e, 0x02, 0x1e, 0x33, 0x1e,
0x9f, 0x53, 0x7f, 0xc7, 0x48, 0x4a, 0xb5, 0xff, 0x41, 0xd0, 0xca, 0x67, 0xf0, 0x97, 0x00, 0xf2,
0xc2, 0x2f, 0x42, 0x8f, 0xc6, 0xe9, 0x0e, 0x5b, 0xd7, 0x2b, 0x0d, 0x0f, 0xf9, 0x61, 0x89, 0xe4,
0xd8, 0xf8, 0x0c, 0x8e, 0x5e, 0xee, 0x7c, 0x5f, 0x92, 0x88, 0xbe, 0xe0, 0x37, 0x5b, 0x39, 0xdb,
0xf9, 0xbe, 0x93, 0x30, 0x86, 0x25, 0x52, 0x14, 0xe1, 0x11, 0x98, 0x19, 0x24, 0xc2, 0x80, 0x0b,
0x96, 0x5c, 0xa8, 0x7b, 0x37, 0x16, 0xd2, 0x94, 0x61, 0x89, 0xbc, 0x21, 0x7b, 0x56, 0x87, 0xea,
0x0f, 0xb2, 0x2f, 0xfb, 0x0f, 0x04, 0x90, 0x19, 0xc7, 0x77, 0xa0, 0x2a, 0x8d, 0x0b, 0x0b, 0x75,
0x2b, 0xfd, 0x06, 0xd1, 0x01, 0xee, 0x43, 0x3d, 0x19, 0xab, 0x56, 0xb9, 0x5b, 0xe9, 0x37, 0x07,
0x6d, 0x87, 0xba, 0xbe, 0x43, 0xe8, 0x8f, 0x27, 0x0a, 0x26, 0x69, 0x1a, 0xbf, 0x0f, 0x35, 0x39,
0x4f, 0x93, 0xa9, 0xd5, 0x20, 0x49, 0x84, 0x7b, 0xd0, 0x12, 0x9c, 0x86, 0x62, 0x1d, 0xc4, 0x53,
0x1a, 0xaf, 0x2d, 0x43, 0x95, 0xbf, 0x86, 0xe1, 0xc7, 0x00, 0x92, 0xad, 0x07, 0xa7, 0x9a, 0x57,
0xcd, 0x81, 0xe9, 0xa8, 0xf1, 0x3c, 0x3f, 0xe0, 0x24, 0xc7, 0xb1, 0xff, 0x2b, 0x83, 0x21, 0x7b,
0xb5, 0x7f, 0x43, 0x50, 0x4f, 0x77, 0xe9, 0xdd, 0x6a, 0xe1, 0x77, 0x04, 0xb7, 0xd2, 0x53, 0x79,
0xb7, 0xac, 0x3f, 0x3c, 0x87, 0x66, 0xee, 0x37, 0x07, 0xdf, 0x85, 0xf7, 0x72, 0xa1, 0x9e, 0x8b,
0x66, 0x09, 0xdf, 0x83, 0x0f, 0xf2, 0x70, 0x6e, 0x74, 0x98, 0x08, 0xdf, 0x86, 0xa3, 0x6b, 0x1a,
0xee, 0x9a, 0xe5, 0x67, 0x0f, 0xfe, 0xbc, 0xec, 0xa0, 0xd7, 0x97, 0x1d, 0xf4, 0xf7, 0x65, 0x07,
0xbd, 0xba, 0xea, 0x94, 0x5e, 0x5f, 0x75, 0x4a, 0x7f, 0x5d, 0x75, 0x4a, 0xdf, 0xc1, 0xf1, 0xe1,
0x5f, 0xc2, 0xb2, 0xa6, 0x3e, 0x3e, 0xfb, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x7a, 0xad, 0xe7, 0x46,
0x39, 0x08, 0x00, 0x00,
// 799 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x55, 0xd1, 0x6e, 0xe3, 0x44,
0x14, 0xf5, 0x24, 0x4e, 0xbc, 0xbe, 0xae, 0x5a, 0x33, 0xdb, 0x45, 0xc6, 0xac, 0x22, 0x2b, 0x02,
0x61, 0x01, 0x72, 0x57, 0x81, 0x15, 0x12, 0x6f, 0xbb, 0xa1, 0x55, 0x22, 0x20, 0x89, 0x26, 0x49,
0x91, 0x78, 0x59, 0x4d, 0xed, 0xd9, 0x24, 0x8a, 0x3b, 0x36, 0x1e, 0x07, 0xc8, 0x5f, 0xec, 0x37,
0xf0, 0x0d, 0x48, 0x20, 0xbe, 0x80, 0xc7, 0x7d, 0xe4, 0x11, 0xb5, 0x12, 0x1f, 0x01, 0x2f, 0x68,
0xc6, 0x76, 0xe2, 0x66, 0xfb, 0x03, 0xfb, 0xd0, 0x66, 0xee, 0xb9, 0xe7, 0x5c, 0x9f, 0x1b, 0xdf,
0xb9, 0x01, 0x57, 0x6c, 0x79, 0x98, 0x66, 0x49, 0x9e, 0x9c, 0x15, 0xff, 0x65, 0x1c, 0xa8, 0x23,
0x36, 0x28, 0xdf, 0xe6, 0xdb, 0x94, 0xb9, 0x4f, 0xd2, 0xf5, 0xe2, 0x8c, 0x86, 0xb1, 0xfc, 0x0b,
0x97, 0x94, 0x2f, 0x98, 0x90, 0xc7, 0xf4, 0xaa, 0xd0, 0x88, 0x1a, 0x5e, 0x48, 0xdd, 0x4f, 0x2b,
0x45, 0x9e, 0x31, 0x26, 0xf2, 0x24, 0xa3, 0x0b, 0xa6, 0xce, 0x7b, 0x8d, 0x8c, 0x0a, 0x76, 0xf7,
0x02, 0x8c, 0x6f, 0x99, 0x10, 0x74, 0xc1, 0xf0, 0x47, 0xd0, 0x5e, 0x32, 0x1a, 0xb1, 0xcc, 0x41,
0x1e, 0xf2, 0xad, 0xde, 0x49, 0x50, 0x9a, 0x08, 0x06, 0x0a, 0x26, 0x65, 0x1a, 0x63, 0xd0, 0x23,
0x9a, 0x53, 0xa7, 0xe1, 0x21, 0xff, 0x88, 0xa8, 0x73, 0xf7, 0x17, 0x04, 0xed, 0x82, 0x86, 0x1d,
0x30, 0xf2, 0x8c, 0x86, 0x6c, 0x18, 0xa9, 0x42, 0x47, 0xa4, 0x0a, 0xf1, 0x63, 0x30, 0x33, 0xf6,
0xc3, 0x86, 0x89, 0x7c, 0x18, 0x29, 0xb5, 0x4e, 0xf6, 0x80, 0xd4, 0x65, 0x2c, 0x8d, 0xb7, 0xc3,
0xc8, 0x69, 0xaa, 0x5c, 0x15, 0x62, 0x1f, 0x74, 0xe9, 0xc3, 0xd1, 0x3d, 0xe4, 0x1f, 0xf7, 0x4e,
0x77, 0xbe, 0x4a, 0xe7, 0xb3, 0x6d, 0xca, 0x88, 0x62, 0xc8, 0x27, 0x44, 0xec, 0x6a, 0xb3, 0x18,
0xf2, 0x97, 0x89, 0xd3, 0xf2, 0x90, 0x6f, 0x92, 0x3d, 0xd0, 0xfd, 0xb5, 0x09, 0xed, 0xe9, 0x56,
0xe4, 0xec, 0x1a, 0x7f, 0x01, 0xe6, 0x92, 0xf2, 0x48, 0x2c, 0xe9, 0x9a, 0x95, 0xfd, 0xbe, 0xb7,
0xab, 0x5b, 0x70, 0x82, 0x41, 0x45, 0x20, 0x7b, 0xae, 0xf4, 0x92, 0xae, 0xf8, 0x42, 0xd9, 0xb7,
0x6a, 0x5e, 0x4a, 0xcd, 0x64, 0xc5, 0x17, 0x44, 0x31, 0xf0, 0x87, 0xd0, 0xa4, 0xe1, 0x5a, 0xf5,
0x62, 0xf5, 0x1e, 0x1e, 0x12, 0x9f, 0x85, 0x6b, 0x22, 0xf3, 0xee, 0x53, 0x30, 0x07, 0xb5, 0xea,
0x27, 0xea, 0xbd, 0x84, 0x49, 0x7c, 0xc9, 0x32, 0xb1, 0x4a, 0xb8, 0x32, 0x67, 0x92, 0x43, 0xd8,
0xed, 0x82, 0x2e, 0x9f, 0x85, 0x5d, 0x78, 0xb0, 0xe1, 0xab, 0x9f, 0x67, 0xab, 0xeb, 0xa2, 0x0f,
0x9d, 0xec, 0x62, 0xb7, 0x07, 0xcd, 0x67, 0xe1, 0x1a, 0x7f, 0x02, 0x2d, 0x96, 0x65, 0x49, 0x56,
0x7a, 0x7e, 0x74, 0x68, 0xe5, 0x5c, 0x26, 0x49, 0xc1, 0x71, 0x5f, 0x21, 0x68, 0x29, 0x00, 0x07,
0xa0, 0x87, 0x49, 0x54, 0x54, 0x3d, 0xee, 0xb9, 0xf7, 0xaa, 0x82, 0x7e, 0x12, 0x31, 0xa2, 0x78,
0xd8, 0x03, 0x2b, 0x62, 0x22, 0xcc, 0x56, 0x69, 0x2e, 0x7d, 0x37, 0x94, 0xef, 0x3a, 0xd4, 0x7d,
0x0a, 0xba, 0xe4, 0x63, 0x0b, 0x8c, 0xf9, 0xe8, 0xeb, 0xd1, 0xf8, 0xbb, 0x91, 0xad, 0x61, 0x0f,
0x1e, 0xcf, 0x47, 0xd3, 0xf9, 0x64, 0x32, 0x26, 0xb3, 0xf3, 0xaf, 0x5e, 0x4c, 0xc8, 0x78, 0x36,
0xee, 0x8f, 0xbf, 0x79, 0x71, 0x79, 0x4e, 0xa6, 0xc3, 0xf1, 0xc8, 0x86, 0xee, 0xbf, 0x6d, 0xd0,
0xa7, 0x5b, 0x1e, 0xca, 0x09, 0x11, 0xe9, 0x7e, 0xb2, 0x4c, 0x52, 0x85, 0xf8, 0x73, 0x30, 0xae,
0x8b, 0x61, 0x28, 0x9b, 0xac, 0xdb, 0xe5, 0x61, 0xd0, 0x4f, 0x78, 0xce, 0x78, 0x7e, 0x49, 0xe3,
0x0d, 0x23, 0x15, 0xd5, 0xfd, 0x07, 0xc1, 0x51, 0x3d, 0x83, 0xbf, 0x04, 0x90, 0x33, 0x3e, 0x4f,
0x23, 0x9a, 0x57, 0x63, 0xe1, 0xdc, 0xad, 0x34, 0xd8, 0xe5, 0x07, 0x1a, 0xa9, 0xb1, 0xf1, 0x05,
0x9c, 0xbc, 0xdc, 0xc4, 0xb1, 0x24, 0x91, 0x62, 0xa6, 0xef, 0xb7, 0x72, 0xb1, 0x89, 0xe3, 0xa0,
0x64, 0x0c, 0x34, 0x72, 0x28, 0xc2, 0x43, 0xb0, 0xf7, 0x90, 0x48, 0x13, 0x2e, 0x58, 0x39, 0x43,
0xef, 0xdf, 0x5b, 0xa8, 0xa0, 0x0c, 0x34, 0xf2, 0x86, 0xec, 0xb9, 0x01, 0xad, 0x1f, 0x65, 0x5f,
0xee, 0x1f, 0x08, 0x60, 0x6f, 0x1c, 0x9f, 0x42, 0x4b, 0x1a, 0x17, 0x0e, 0xf2, 0x9a, 0xbe, 0x49,
0x8a, 0x00, 0xfb, 0x60, 0x94, 0x9b, 0xc4, 0x69, 0x78, 0x4d, 0xdf, 0xea, 0x1d, 0x07, 0x34, 0x8c,
0x03, 0x42, 0x7f, 0xea, 0x2b, 0x98, 0x54, 0x69, 0xfc, 0x2e, 0xb4, 0xe5, 0x0a, 0x29, 0x2f, 0xaa,
0x49, 0xca, 0x08, 0x77, 0xe1, 0x48, 0x70, 0x9a, 0x8a, 0x65, 0x92, 0x4f, 0x68, 0xbe, 0x74, 0x74,
0x55, 0xfe, 0x0e, 0x86, 0x9f, 0x00, 0x48, 0x76, 0xb1, 0x2b, 0xd4, 0x15, 0xb5, 0x7a, 0x76, 0xa0,
0x36, 0xd2, 0x6c, 0x87, 0x93, 0x1a, 0xc7, 0xfd, 0xaf, 0x01, 0xba, 0xec, 0xd5, 0xfd, 0x0d, 0x81,
0x51, 0x7d, 0x4b, 0x6f, 0x57, 0x0b, 0xbf, 0x23, 0x78, 0x50, 0xbd, 0x95, 0xb7, 0xcb, 0xfa, 0xc7,
0x63, 0xb0, 0x6a, 0x6b, 0x16, 0x3f, 0x82, 0x77, 0x6a, 0x61, 0xb1, 0x0a, 0x6c, 0x0d, 0x9f, 0x82,
0x5d, 0x87, 0xe5, 0xad, 0xb4, 0x11, 0x7e, 0x08, 0x27, 0x77, 0xc8, 0x3c, 0xb4, 0x1b, 0xcf, 0x3f,
0xf8, 0xf3, 0xa6, 0x83, 0x5e, 0xdf, 0x74, 0xd0, 0xdf, 0x37, 0x1d, 0xf4, 0xea, 0xb6, 0xa3, 0xbd,
0xbe, 0xed, 0x68, 0x7f, 0xdd, 0x76, 0xb4, 0xef, 0xe1, 0x6c, 0xf7, 0x8b, 0x78, 0xd5, 0x56, 0x1f,
0x9f, 0xfd, 0x1f, 0x00, 0x00, 0xff, 0xff, 0x71, 0xf4, 0xb0, 0xb9, 0x25, 0x07, 0x00, 0x00,
}
func (m *Message) Marshal() (dAtA []byte, err error) {
@ -1367,113 +1219,6 @@ func (m *SystemError) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *Subscription) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Subscription) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Subscription) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.UnsubscribeSpace != nil {
{
size, err := m.UnsubscribeSpace.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSync(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
if m.SubscribeSpace != nil {
{
size, err := m.SubscribeSpace.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSync(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SubscriptionSubscribeSpace) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SubscriptionSubscribeSpace) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SubscriptionSubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.SpaceId) > 0 {
i -= len(m.SpaceId)
copy(dAtA[i:], m.SpaceId)
i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SubscriptionUnsubscribeSpace) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SubscriptionUnsubscribeSpace) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SubscriptionUnsubscribeSpace) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.SpaceId) > 0 {
i -= len(m.SpaceId)
copy(dAtA[i:], m.SpaceId)
i = encodeVarintSync(dAtA, i, uint64(len(m.SpaceId)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Sync) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -1985,49 +1730,6 @@ func (m *SystemError) Size() (n int) {
return n
}
func (m *Subscription) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.SubscribeSpace != nil {
l = m.SubscribeSpace.Size()
n += 1 + l + sovSync(uint64(l))
}
if m.UnsubscribeSpace != nil {
l = m.UnsubscribeSpace.Size()
n += 1 + l + sovSync(uint64(l))
}
return n
}
func (m *SubscriptionSubscribeSpace) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.SpaceId)
if l > 0 {
n += 1 + l + sovSync(uint64(l))
}
return n
}
func (m *SubscriptionUnsubscribeSpace) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.SpaceId)
if l > 0 {
n += 1 + l + sovSync(uint64(l))
}
return n
}
func (m *Sync) Size() (n int) {
if m == nil {
return 0
@ -3002,292 +2704,6 @@ func (m *SystemError) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *Subscription) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Subscription: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Subscription: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SubscribeSpace", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSync
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.SubscribeSpace == nil {
m.SubscribeSpace = &SubscriptionSubscribeSpace{}
}
if err := m.SubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field UnsubscribeSpace", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSync
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.UnsubscribeSpace == nil {
m.UnsubscribeSpace = &SubscriptionUnsubscribeSpace{}
}
if err := m.UnsubscribeSpace.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSync(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SubscriptionSubscribeSpace) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SubscribeSpace: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSync
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SpaceId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSync(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SubscriptionUnsubscribeSpace) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: UnsubscribeSpace: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: UnsubscribeSpace: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SpaceId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSync
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SpaceId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSync(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Sync) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0