ctx logger + streapool add/removeTags + space subscribe message
This commit is contained in:
parent
09aee68bd7
commit
ddd20ae5b5
55
app/logger/ctxfiled.go
Normal file
55
app/logger/ctxfiled.go
Normal file
@ -0,0 +1,55 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ctxKey uint
|
||||
|
||||
const (
|
||||
ctxKeyFields ctxKey = iota
|
||||
)
|
||||
|
||||
func WithCtx(ctx context.Context, l *zap.Logger) *zap.Logger {
|
||||
return l.With(CtxGetFields(ctx)...)
|
||||
}
|
||||
|
||||
func CtxWithFields(ctx context.Context, fields ...zap.Field) context.Context {
|
||||
existingFields := CtxGetFields(ctx)
|
||||
if existingFields != nil {
|
||||
existingFields = append(existingFields, fields...)
|
||||
}
|
||||
return context.WithValue(ctx, ctxKeyFields, fields)
|
||||
}
|
||||
|
||||
func CtxGetFields(ctx context.Context) (fields []zap.Field) {
|
||||
if v := ctx.Value(ctxKeyFields); v != nil {
|
||||
return v.([]zap.Field)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type CtxLogger struct {
|
||||
*zap.Logger
|
||||
}
|
||||
|
||||
func (cl CtxLogger) DebugCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
cl.Logger.Debug(msg, append(CtxGetFields(ctx), fields...)...)
|
||||
}
|
||||
|
||||
func (cl CtxLogger) InfoCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
cl.Logger.Info(msg, append(CtxGetFields(ctx), fields...)...)
|
||||
}
|
||||
|
||||
func (cl CtxLogger) WarnCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
cl.Logger.Warn(msg, append(CtxGetFields(ctx), fields...)...)
|
||||
}
|
||||
|
||||
func (cl CtxLogger) ErrorCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||
cl.Logger.Error(msg, append(CtxGetFields(ctx), fields...)...)
|
||||
}
|
||||
|
||||
func (cl CtxLogger) With(fields ...zap.Field) CtxLogger {
|
||||
return CtxLogger{cl.Logger.With(fields...)}
|
||||
}
|
||||
@ -9,7 +9,7 @@ var (
|
||||
mu sync.Mutex
|
||||
defaultLogger *zap.Logger
|
||||
levels = make(map[string]zap.AtomicLevel)
|
||||
loggers = make(map[string]*zap.Logger)
|
||||
loggers = make(map[string]CtxLogger)
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -22,7 +22,7 @@ func SetDefault(l *zap.Logger) {
|
||||
defer mu.Unlock()
|
||||
*defaultLogger = *l
|
||||
for name, l := range loggers {
|
||||
*l = *defaultLogger.Named(name)
|
||||
*l.Logger = *defaultLogger.Named(name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,13 +38,14 @@ func Default() *zap.Logger {
|
||||
return defaultLogger
|
||||
}
|
||||
|
||||
func NewNamed(name string, fields ...zap.Field) *zap.Logger {
|
||||
func NewNamed(name string, fields ...zap.Field) CtxLogger {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
l := defaultLogger.Named(name)
|
||||
if len(fields) > 0 {
|
||||
l = l.With(fields...)
|
||||
}
|
||||
loggers[name] = l
|
||||
return l
|
||||
ctxL := CtxLogger{l}
|
||||
loggers[name] = ctxL
|
||||
return ctxL
|
||||
}
|
||||
|
||||
@ -22,6 +22,9 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter
|
||||
}
|
||||
|
||||
func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
|
||||
if object == nil {
|
||||
panic("nil object")
|
||||
}
|
||||
c.reservedObjects = append(c.reservedObjects, object)
|
||||
}
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/ldiff"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||
@ -32,7 +33,7 @@ func newDiffSyncer(
|
||||
storage spacestorage.SpaceStorage,
|
||||
clientFactory spacesyncproto.ClientFactory,
|
||||
syncStatus syncstatus.StatusUpdater,
|
||||
log *zap.Logger) DiffSyncer {
|
||||
log logger.CtxLogger) DiffSyncer {
|
||||
return &diffSyncer{
|
||||
diff: diff,
|
||||
spaceId: spaceId,
|
||||
@ -52,7 +53,7 @@ type diffSyncer struct {
|
||||
cache treegetter.TreeGetter
|
||||
storage spacestorage.SpaceStorage
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
log *zap.Logger
|
||||
log logger.CtxLogger
|
||||
deletionState deletionstate.DeletionState
|
||||
syncStatus syncstatus.StatusUpdater
|
||||
}
|
||||
@ -96,7 +97,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -131,18 +132,23 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||
zap.Int("changedIds", len(changedIds)),
|
||||
zap.Int("removedIds", len(removedIds)),
|
||||
zap.Int("already deleted ids", totalLen-len(filteredIds)))
|
||||
zap.Int("already deleted ids", totalLen-len(filteredIds)),
|
||||
zap.String("peerId", p.Id()),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||
ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees"))
|
||||
for _, tId := range trees {
|
||||
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
||||
if err != nil {
|
||||
d.log.InfoCtx(ctx, "can't load tree", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
syncTree, ok := tree.(synctree.SyncTree)
|
||||
if !ok {
|
||||
d.log.InfoCtx(ctx, "not a sync tree", zap.String("objectId", tId))
|
||||
continue
|
||||
}
|
||||
// the idea why we call it directly is that if we try to get it from cache
|
||||
|
||||
@ -4,6 +4,7 @@ package headsync
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app/ldiff"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
||||
@ -38,7 +39,7 @@ type headSync struct {
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
storage spacestorage.SpaceStorage
|
||||
diff ldiff.Diff
|
||||
log *zap.Logger
|
||||
log logger.CtxLogger
|
||||
syncer DiffSyncer
|
||||
|
||||
syncPeriod int
|
||||
@ -51,7 +52,7 @@ func NewHeadSync(
|
||||
confConnector confconnector.ConfConnector,
|
||||
cache treegetter.TreeGetter,
|
||||
syncStatus syncstatus.StatusUpdater,
|
||||
log *zap.Logger) HeadSync {
|
||||
log logger.CtxLogger) HeadSync {
|
||||
|
||||
diff := ldiff.New(16, 16)
|
||||
l := log.With(zap.String("spaceId", spaceId))
|
||||
|
||||
@ -54,7 +54,7 @@ type syncTree struct {
|
||||
isDeleted bool
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
||||
var log = logger.NewNamed("commonspace.synctree")
|
||||
|
||||
var buildObjectTree = objecttree.BuildObjectTree
|
||||
var createSyncClient = newSyncClient
|
||||
@ -150,7 +150,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
||||
}
|
||||
|
||||
// basically building tree with in-memory storage and validating that it was without errors
|
||||
log.With(zap.String("id", id)).Debug("validating tree")
|
||||
log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree")
|
||||
err = objecttree.ValidateRawTree(payload, deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
@ -200,7 +200,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||
// send to everybody, because everybody should know that the node or client got new tree
|
||||
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil {
|
||||
log.Error("broadcast error", zap.Error(e))
|
||||
log.ErrorCtx(ctx, "broadcast error", zap.Error(e))
|
||||
}
|
||||
}
|
||||
return
|
||||
@ -271,7 +271,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
|
||||
}
|
||||
|
||||
func (s *syncTree) Delete() (err error) {
|
||||
log.With("id", s.Id()).Debug("deleting sync tree")
|
||||
log.With(zap.String("id", s.Id())).Debug("deleting sync tree")
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if err = s.checkAlive(); err != nil {
|
||||
@ -286,7 +286,7 @@ func (s *syncTree) Delete() (err error) {
|
||||
}
|
||||
|
||||
func (s *syncTree) Close() (err error) {
|
||||
log.With("id", s.Id()).Debug("closing sync tree")
|
||||
log.With(zap.String("id", s.Id())).Debug("closing sync tree")
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.isClosed {
|
||||
|
||||
@ -82,29 +82,30 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
||||
objTree = s.objTree
|
||||
)
|
||||
|
||||
log := log.With("senderId", senderId).
|
||||
With("heads", objTree.Heads()).
|
||||
With("treeId", objTree.Id())
|
||||
log.Debug("received head update message")
|
||||
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()))
|
||||
log.DebugCtx(ctx, "received head update message")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("head update finished with error")
|
||||
} else if fullRequest != nil {
|
||||
log.Debug("sending full sync request")
|
||||
log.DebugCtx(ctx, "sending full sync request")
|
||||
} else {
|
||||
if !isEmptyUpdate {
|
||||
log.Debug("head update finished correctly")
|
||||
log.DebugCtx(ctx, "head update finished correctly")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// isEmptyUpdate is sent when the tree is brought up from cache
|
||||
if isEmptyUpdate {
|
||||
log.With("treeId", objTree.Id()).Debug("is empty update")
|
||||
if slice.UnsortedEquals(objTree.Heads(), update.Heads) {
|
||||
|
||||
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
|
||||
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
|
||||
if headEquals {
|
||||
return
|
||||
}
|
||||
|
||||
// we need to sync in any case
|
||||
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
|
||||
if err != nil {
|
||||
@ -149,20 +150,17 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
||||
objTree = s.objTree
|
||||
)
|
||||
|
||||
log := log.With("senderId", senderId).
|
||||
With("heads", request.Heads).
|
||||
With("treeId", s.objTree.Id()).
|
||||
With("replyId", replyId)
|
||||
log.Debug("received full sync request message")
|
||||
log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId))
|
||||
log.DebugCtx(ctx, "received full sync request message")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("full sync request finished with error")
|
||||
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
||||
|
||||
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
|
||||
return
|
||||
} else if fullResponse != nil {
|
||||
log.Debug("full sync response sent")
|
||||
log.DebugCtx(ctx, "full sync response sent")
|
||||
}
|
||||
}()
|
||||
|
||||
@ -190,16 +188,14 @@ func (s *syncTreeHandler) handleFullSyncResponse(
|
||||
var (
|
||||
objTree = s.objTree
|
||||
)
|
||||
log := log.With("senderId", senderId).
|
||||
With("heads", response.Heads).
|
||||
With("treeId", s.objTree.Id())
|
||||
log.Debug("received full sync response message")
|
||||
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()))
|
||||
log.DebugCtx(ctx, "received full sync response message")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("full sync response failed")
|
||||
log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed")
|
||||
} else {
|
||||
log.Debug("full sync response succeeded")
|
||||
log.DebugCtx(ctx, "full sync response succeeded")
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@ -59,7 +59,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
||||
defer cancel()
|
||||
newCounter := s.counter.Add(1)
|
||||
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||
log.Info("mpool sendSync", zap.String("replyId", msg.ReplyId))
|
||||
log.InfoCtx(ctx, "mpool sendSync", zap.String("replyId", msg.ReplyId))
|
||||
s.waitersMx.Lock()
|
||||
waiter := responseWaiter{
|
||||
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||
@ -77,7 +77,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting")
|
||||
log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting")
|
||||
err = ctx.Err()
|
||||
case reply = <-waiter.ch:
|
||||
// success
|
||||
@ -87,42 +87,27 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
||||
|
||||
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.updateLastUsage()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx.Done")
|
||||
default:
|
||||
}
|
||||
return s.StreamManager.SendPeer(ctx, peerId, msg)
|
||||
}
|
||||
|
||||
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.updateLastUsage()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx.Done")
|
||||
default:
|
||||
}
|
||||
return s.StreamManager.SendResponsible(ctx, msg)
|
||||
}
|
||||
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.updateLastUsage()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx.Done")
|
||||
default:
|
||||
}
|
||||
return s.StreamManager.Broadcast(ctx, msg)
|
||||
}
|
||||
|
||||
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.updateLastUsage()
|
||||
if msg.ReplyId != "" {
|
||||
log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId))
|
||||
log.InfoCtx(ctx, "mpool receive reply", zap.String("replyId", msg.ReplyId))
|
||||
// we got reply, send it to waiter
|
||||
if s.stopWaiter(msg) {
|
||||
return
|
||||
}
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
||||
log.DebugCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId))
|
||||
}
|
||||
return s.messageHandler(ctx, senderId, msg)
|
||||
}
|
||||
|
||||
@ -81,7 +81,7 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
|
||||
}
|
||||
|
||||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).Debug("handling message")
|
||||
log.With(zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).DebugCtx(ctx, "handling message")
|
||||
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@ -59,8 +59,8 @@ message ObjectSyncMessage {
|
||||
string replyId = 2;
|
||||
bytes payload = 3;
|
||||
string objectId = 4;
|
||||
// string identity = 5;
|
||||
// string peerSignature = 6;
|
||||
// string identity = 5;
|
||||
// string peerSignature = 6;
|
||||
}
|
||||
|
||||
// SpacePushRequest is a request to add space on a node containing only one acl record
|
||||
@ -134,3 +134,13 @@ message SettingsData {
|
||||
SpaceSettingsSnapshot snapshot = 2;
|
||||
}
|
||||
|
||||
// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space
|
||||
enum SpaceSubscriptionAction {
|
||||
Subscribe = 0;
|
||||
Unsubscribe = 1;
|
||||
}
|
||||
|
||||
message SpaceSubscription {
|
||||
repeated string spaceIds = 1;
|
||||
SpaceSubscriptionAction action = 2;
|
||||
}
|
||||
|
||||
@ -56,6 +56,32 @@ func (ErrCodes) EnumDescriptor() ([]byte, []int) {
|
||||
return fileDescriptor_80e49f1f4ac27799, []int{0}
|
||||
}
|
||||
|
||||
// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space
|
||||
type SpaceSubscriptionAction int32
|
||||
|
||||
const (
|
||||
SpaceSubscriptionAction_Subscribe SpaceSubscriptionAction = 0
|
||||
SpaceSubscriptionAction_Unsubscribe SpaceSubscriptionAction = 1
|
||||
)
|
||||
|
||||
var SpaceSubscriptionAction_name = map[int32]string{
|
||||
0: "Subscribe",
|
||||
1: "Unsubscribe",
|
||||
}
|
||||
|
||||
var SpaceSubscriptionAction_value = map[string]int32{
|
||||
"Subscribe": 0,
|
||||
"Unsubscribe": 1,
|
||||
}
|
||||
|
||||
func (x SpaceSubscriptionAction) String() string {
|
||||
return proto.EnumName(SpaceSubscriptionAction_name, int32(x))
|
||||
}
|
||||
|
||||
func (SpaceSubscriptionAction) EnumDescriptor() ([]byte, []int) {
|
||||
return fileDescriptor_80e49f1f4ac27799, []int{1}
|
||||
}
|
||||
|
||||
// HeadSyncRange presenting a request for one range
|
||||
type HeadSyncRange struct {
|
||||
From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"`
|
||||
@ -1047,8 +1073,61 @@ func (m *SettingsData) GetSnapshot() *SpaceSettingsSnapshot {
|
||||
return nil
|
||||
}
|
||||
|
||||
type SpaceSubscription struct {
|
||||
SpaceIds []string `protobuf:"bytes,1,rep,name=spaceIds,proto3" json:"spaceIds,omitempty"`
|
||||
Action SpaceSubscriptionAction `protobuf:"varint,2,opt,name=action,proto3,enum=spacesync.SpaceSubscriptionAction" json:"action,omitempty"`
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) Reset() { *m = SpaceSubscription{} }
|
||||
func (m *SpaceSubscription) String() string { return proto.CompactTextString(m) }
|
||||
func (*SpaceSubscription) ProtoMessage() {}
|
||||
func (*SpaceSubscription) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_80e49f1f4ac27799, []int{18}
|
||||
}
|
||||
func (m *SpaceSubscription) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *SpaceSubscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_SpaceSubscription.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *SpaceSubscription) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_SpaceSubscription.Merge(m, src)
|
||||
}
|
||||
func (m *SpaceSubscription) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *SpaceSubscription) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_SpaceSubscription.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_SpaceSubscription proto.InternalMessageInfo
|
||||
|
||||
func (m *SpaceSubscription) GetSpaceIds() []string {
|
||||
if m != nil {
|
||||
return m.SpaceIds
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) GetAction() SpaceSubscriptionAction {
|
||||
if m != nil {
|
||||
return m.Action
|
||||
}
|
||||
return SpaceSubscriptionAction_Subscribe
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("spacesync.ErrCodes", ErrCodes_name, ErrCodes_value)
|
||||
proto.RegisterEnum("spacesync.SpaceSubscriptionAction", SpaceSubscriptionAction_name, SpaceSubscriptionAction_value)
|
||||
proto.RegisterType((*HeadSyncRange)(nil), "spacesync.HeadSyncRange")
|
||||
proto.RegisterType((*HeadSyncResult)(nil), "spacesync.HeadSyncResult")
|
||||
proto.RegisterType((*HeadSyncResultElement)(nil), "spacesync.HeadSyncResultElement")
|
||||
@ -1067,6 +1146,7 @@ func init() {
|
||||
proto.RegisterType((*ObjectDelete)(nil), "spacesync.ObjectDelete")
|
||||
proto.RegisterType((*SpaceSettingsSnapshot)(nil), "spacesync.SpaceSettingsSnapshot")
|
||||
proto.RegisterType((*SettingsData)(nil), "spacesync.SettingsData")
|
||||
proto.RegisterType((*SpaceSubscription)(nil), "spacesync.SpaceSubscription")
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -1074,64 +1154,68 @@ func init() {
|
||||
}
|
||||
|
||||
var fileDescriptor_80e49f1f4ac27799 = []byte{
|
||||
// 903 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
|
||||
0x14, 0xf7, 0x6e, 0x9c, 0x26, 0x7e, 0xd9, 0x3a, 0xdb, 0x69, 0x0a, 0x8b, 0x1b, 0xb9, 0xd6, 0x1e,
|
||||
0x50, 0xc4, 0xa1, 0x7f, 0x52, 0x04, 0x42, 0xc0, 0x81, 0x26, 0x2e, 0x5d, 0xa1, 0x92, 0x6a, 0x0c,
|
||||
0x42, 0x42, 0x02, 0x69, 0xba, 0xfb, 0x62, 0x2f, 0x5a, 0xcf, 0x2c, 0x3b, 0x63, 0x1a, 0x1f, 0x38,
|
||||
0x70, 0xe2, 0xca, 0x57, 0xe0, 0x3b, 0xf0, 0x21, 0x38, 0xf6, 0xc8, 0x11, 0x25, 0x5f, 0x04, 0xcd,
|
||||
0xec, 0x5f, 0xdb, 0x9b, 0x1c, 0xb8, 0x38, 0x33, 0xef, 0xcf, 0xef, 0xfd, 0xde, 0x9b, 0x99, 0xdf,
|
||||
0x06, 0x9e, 0x84, 0x62, 0x3e, 0x17, 0x5c, 0xa6, 0x2c, 0xc4, 0x47, 0xe6, 0x57, 0x2e, 0x79, 0x98,
|
||||
0x66, 0x42, 0x89, 0x47, 0xe6, 0x57, 0xd6, 0xd6, 0x87, 0xc6, 0x40, 0x7a, 0x95, 0xc1, 0x0f, 0xe0,
|
||||
0xf6, 0x0b, 0x64, 0xd1, 0x64, 0xc9, 0x43, 0xca, 0xf8, 0x14, 0x09, 0x81, 0xee, 0x79, 0x26, 0xe6,
|
||||
0x9e, 0x35, 0xb2, 0x8e, 0xba, 0xd4, 0xac, 0x49, 0x1f, 0x6c, 0x25, 0x3c, 0xdb, 0x58, 0x6c, 0x25,
|
||||
0xc8, 0x01, 0x6c, 0x27, 0xf1, 0x3c, 0x56, 0xde, 0xd6, 0xc8, 0x3a, 0xba, 0x4d, 0xf3, 0x8d, 0x7f,
|
||||
0x01, 0xfd, 0x0a, 0x0a, 0xe5, 0x22, 0x51, 0x1a, 0x6b, 0xc6, 0xe4, 0xcc, 0x60, 0x39, 0xd4, 0xac,
|
||||
0xc9, 0x67, 0xb0, 0x8b, 0x09, 0xce, 0x91, 0x2b, 0xe9, 0xd9, 0xa3, 0xad, 0xa3, 0xbd, 0xe3, 0xd1,
|
||||
0xc3, 0x9a, 0xdf, 0x2a, 0xc0, 0x38, 0x0f, 0xa4, 0x55, 0x86, 0xae, 0x1c, 0x8a, 0x05, 0xaf, 0x2a,
|
||||
0x9b, 0x8d, 0xff, 0x29, 0xdc, 0x6b, 0x4d, 0xd4, 0xc4, 0xe3, 0xc8, 0x94, 0xef, 0x51, 0x3b, 0x8e,
|
||||
0x0c, 0x21, 0x64, 0x91, 0x69, 0xa5, 0x47, 0xcd, 0xda, 0xff, 0x01, 0xf6, 0xeb, 0xe4, 0x9f, 0x17,
|
||||
0x28, 0x15, 0xf1, 0x60, 0xc7, 0x50, 0x0a, 0xca, 0xdc, 0x72, 0x4b, 0x1e, 0xc3, 0xad, 0x4c, 0x8f,
|
||||
0xa9, 0xe4, 0xee, 0xb5, 0x71, 0xd7, 0x01, 0xb4, 0x88, 0xf3, 0xbf, 0x04, 0xb7, 0xc1, 0x2d, 0x15,
|
||||
0x5c, 0x22, 0x79, 0x0a, 0x3b, 0x99, 0xe1, 0x29, 0x3d, 0xcb, 0xc0, 0xbc, 0x77, 0xed, 0x08, 0x68,
|
||||
0x19, 0xe9, 0xff, 0x0a, 0x77, 0xce, 0x5e, 0xff, 0x84, 0xa1, 0xd2, 0xce, 0x97, 0x28, 0x25, 0x9b,
|
||||
0xe2, 0x0d, 0x4c, 0x3d, 0x5d, 0x23, 0x4d, 0x96, 0x41, 0xd9, 0x6d, 0xb9, 0xd5, 0x9e, 0x94, 0x2d,
|
||||
0x13, 0xc1, 0x22, 0x33, 0x45, 0x87, 0x96, 0x5b, 0x32, 0x80, 0x5d, 0x61, 0x4a, 0x04, 0x91, 0xd7,
|
||||
0x35, 0x49, 0xd5, 0xde, 0x1f, 0x83, 0x3b, 0xd1, 0xd0, 0xaf, 0x16, 0x72, 0x56, 0xce, 0xe9, 0x49,
|
||||
0x8d, 0xa4, 0xab, 0xef, 0x1d, 0xbf, 0xdb, 0xe8, 0x23, 0x8f, 0xce, 0xdd, 0x55, 0x09, 0xff, 0x2e,
|
||||
0xdc, 0x69, 0xc0, 0xe4, 0xf3, 0xf0, 0xfd, 0x0a, 0x3b, 0x49, 0x4a, 0xec, 0xb5, 0xa3, 0xf3, 0x9f,
|
||||
0x57, 0x89, 0x3a, 0xa6, 0x18, 0xe4, 0xff, 0x20, 0xf0, 0x9b, 0x0d, 0x4e, 0xd3, 0x43, 0xbe, 0x80,
|
||||
0x3d, 0x93, 0xa3, 0xe7, 0x8e, 0x59, 0x81, 0xf3, 0xa0, 0x81, 0x43, 0xd9, 0x9b, 0x49, 0x1d, 0xf0,
|
||||
0x5d, 0xac, 0x66, 0x41, 0x44, 0x9b, 0x39, 0x64, 0x08, 0xc0, 0xc2, 0xa4, 0x00, 0x34, 0xe3, 0x76,
|
||||
0x68, 0xc3, 0x42, 0x7c, 0x70, 0xea, 0x5d, 0x90, 0x8f, 0xbd, 0x47, 0x57, 0x6c, 0xe4, 0x18, 0x0e,
|
||||
0x0c, 0xe4, 0x04, 0x95, 0x8a, 0xf9, 0x54, 0x96, 0x68, 0x5d, 0x83, 0xd6, 0xea, 0x23, 0x1f, 0xc1,
|
||||
0x3b, 0x6d, 0xf6, 0x20, 0xf2, 0xb6, 0x4d, 0x85, 0x6b, 0xbc, 0xfe, 0x9f, 0x16, 0xec, 0x35, 0x5a,
|
||||
0xd2, 0xe7, 0x1e, 0x47, 0xc8, 0x55, 0xac, 0x96, 0xc5, 0x5b, 0xad, 0xf6, 0xe4, 0x10, 0x7a, 0x2a,
|
||||
0x9e, 0xa3, 0x54, 0x6c, 0x9e, 0x9a, 0xd6, 0xb6, 0x68, 0x6d, 0xd0, 0x5e, 0x53, 0xe3, 0x9b, 0x65,
|
||||
0x8a, 0x45, 0x5b, 0xb5, 0x81, 0xbc, 0x0f, 0x7d, 0x7d, 0xe9, 0xe2, 0x90, 0xa9, 0x58, 0xf0, 0xaf,
|
||||
0x70, 0x69, 0xba, 0xe9, 0xd2, 0x35, 0xab, 0x7e, 0x96, 0x12, 0x31, 0x67, 0xed, 0x50, 0xb3, 0xf6,
|
||||
0x5f, 0x41, 0x7f, 0x75, 0xf0, 0x64, 0xb4, 0x79, 0x50, 0xce, 0xea, 0x39, 0x68, 0x36, 0xf1, 0x94,
|
||||
0x33, 0xb5, 0xc8, 0xb0, 0x38, 0x86, 0xda, 0xe0, 0x9f, 0xc2, 0x41, 0xdb, 0x51, 0xea, 0xac, 0x8c,
|
||||
0xbd, 0x59, 0x41, 0xad, 0x0d, 0xc5, 0x3d, 0xb4, 0xab, 0x7b, 0xf8, 0x23, 0x1c, 0x4c, 0x9a, 0x53,
|
||||
0x3d, 0x11, 0x5c, 0x69, 0xa9, 0xf9, 0x1c, 0x9c, 0xfc, 0xad, 0x9c, 0x62, 0x82, 0x0a, 0x5b, 0xee,
|
||||
0xe3, 0x59, 0xc3, 0xfd, 0xa2, 0x43, 0x57, 0xc2, 0x9f, 0xed, 0xc0, 0xf6, 0x2f, 0x2c, 0x59, 0xa0,
|
||||
0x3f, 0x04, 0xa7, 0x19, 0xb8, 0xf1, 0x0e, 0x3e, 0x86, 0x7b, 0x2b, 0xf5, 0x27, 0x9c, 0xa5, 0x72,
|
||||
0x26, 0x94, 0xbe, 0x84, 0x91, 0x49, 0x89, 0x82, 0x28, 0xd7, 0x95, 0x1e, 0x6d, 0x58, 0xfc, 0xdf,
|
||||
0x2d, 0x70, 0xca, 0xa4, 0x53, 0xa6, 0x18, 0xf9, 0x04, 0x76, 0xc2, 0x9c, 0x7c, 0xa1, 0x42, 0x0f,
|
||||
0xd6, 0x1f, 0xcf, 0x5a, 0x8f, 0xb4, 0x8c, 0xd7, 0x22, 0x2e, 0x8b, 0xba, 0x66, 0x34, 0xab, 0x22,
|
||||
0xde, 0xca, 0x8f, 0x56, 0x19, 0x1f, 0x84, 0xb0, 0x3b, 0xce, 0xb2, 0x13, 0x11, 0xa1, 0x24, 0x7d,
|
||||
0x80, 0x6f, 0x39, 0x5e, 0xa4, 0x18, 0x2a, 0x8c, 0xdc, 0x0e, 0x71, 0x8b, 0xd7, 0xf9, 0x32, 0x96,
|
||||
0x32, 0xe6, 0x53, 0xd7, 0x22, 0xfb, 0xc5, 0x5d, 0x1d, 0x5f, 0xc4, 0x52, 0x49, 0xd7, 0x26, 0x77,
|
||||
0x61, 0xdf, 0x18, 0xbe, 0x16, 0x2a, 0xe0, 0x27, 0x2c, 0x9c, 0xa1, 0xbb, 0xa5, 0xa3, 0xc6, 0x59,
|
||||
0x26, 0xb2, 0xb3, 0xf3, 0x73, 0x89, 0xca, 0x8d, 0x8e, 0xff, 0xb2, 0xa1, 0x97, 0x13, 0x59, 0xf2,
|
||||
0x90, 0x9c, 0xc0, 0x6e, 0xa9, 0xab, 0x64, 0xd0, 0x2a, 0xb6, 0x46, 0x75, 0x06, 0xf7, 0xdb, 0x85,
|
||||
0x38, 0x57, 0x9b, 0xe7, 0x05, 0xa2, 0xd6, 0x2e, 0x72, 0x7f, 0x43, 0x69, 0x6a, 0x61, 0x1c, 0x1c,
|
||||
0xb6, 0x3b, 0x37, 0x70, 0x92, 0xa4, 0x0d, 0xa7, 0x12, 0xc1, 0x36, 0x9c, 0x86, 0xfa, 0x51, 0x70,
|
||||
0xeb, 0x2f, 0xc2, 0x44, 0x65, 0xc8, 0xe6, 0xe4, 0x70, 0xe3, 0xc2, 0x35, 0x3e, 0x17, 0x83, 0x1b,
|
||||
0xbd, 0x47, 0xd6, 0x63, 0xeb, 0xd9, 0x87, 0x7f, 0x5f, 0x0e, 0xad, 0xb7, 0x97, 0x43, 0xeb, 0xdf,
|
||||
0xcb, 0xa1, 0xf5, 0xc7, 0xd5, 0xb0, 0xf3, 0xf6, 0x6a, 0xd8, 0xf9, 0xe7, 0x6a, 0xd8, 0xf9, 0x7e,
|
||||
0x70, 0xfd, 0xff, 0x19, 0xaf, 0x6f, 0x99, 0x3f, 0x4f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x24,
|
||||
0x0c, 0x9c, 0xee, 0x8c, 0x08, 0x00, 0x00,
|
||||
// 966 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x6e, 0xdb, 0x46,
|
||||
0x10, 0x16, 0x69, 0xf9, 0x47, 0x63, 0x5a, 0x66, 0x36, 0x4e, 0xc3, 0x2a, 0x86, 0x22, 0xec, 0xa1,
|
||||
0x30, 0x72, 0xc8, 0x8f, 0x52, 0xb4, 0x48, 0x7f, 0x0e, 0x89, 0xad, 0x34, 0x44, 0x91, 0x3a, 0x58,
|
||||
0x35, 0x28, 0x50, 0xa0, 0x05, 0xd6, 0xe4, 0x5a, 0x62, 0x4b, 0x2d, 0x59, 0xee, 0xaa, 0xb1, 0x0e,
|
||||
0x3d, 0xf4, 0xd4, 0x6b, 0x5f, 0xa1, 0xef, 0xd0, 0x87, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x45,
|
||||
0x8a, 0x5d, 0x2e, 0x7f, 0x64, 0x51, 0x39, 0xe4, 0x22, 0x73, 0xbe, 0x99, 0xf9, 0xe6, 0xdb, 0xd9,
|
||||
0xdd, 0x59, 0xc3, 0xa3, 0x20, 0x99, 0xcd, 0x12, 0x2e, 0x52, 0x1a, 0xb0, 0x07, 0xfa, 0x57, 0x2c,
|
||||
0x78, 0x90, 0x66, 0x89, 0x4c, 0x1e, 0xe8, 0x5f, 0x51, 0xa1, 0xf7, 0x35, 0x80, 0x3a, 0x25, 0x80,
|
||||
0x7d, 0xd8, 0x7b, 0xc1, 0x68, 0x38, 0x5e, 0xf0, 0x80, 0x50, 0x3e, 0x61, 0x08, 0x41, 0xfb, 0x3c,
|
||||
0x4b, 0x66, 0x9e, 0x35, 0xb0, 0x8e, 0xda, 0x44, 0x7f, 0xa3, 0x2e, 0xd8, 0x32, 0xf1, 0x6c, 0x8d,
|
||||
0xd8, 0x32, 0x41, 0x07, 0xb0, 0x19, 0x47, 0xb3, 0x48, 0x7a, 0x1b, 0x03, 0xeb, 0x68, 0x8f, 0xe4,
|
||||
0x06, 0xbe, 0x80, 0x6e, 0x49, 0xc5, 0xc4, 0x3c, 0x96, 0x8a, 0x6b, 0x4a, 0xc5, 0x54, 0x73, 0x39,
|
||||
0x44, 0x7f, 0xa3, 0x2f, 0x60, 0x87, 0xc5, 0x6c, 0xc6, 0xb8, 0x14, 0x9e, 0x3d, 0xd8, 0x38, 0xda,
|
||||
0x1d, 0x0e, 0xee, 0x57, 0xfa, 0x96, 0x09, 0x46, 0x79, 0x20, 0x29, 0x33, 0x54, 0xe5, 0x20, 0x99,
|
||||
0xf3, 0xb2, 0xb2, 0x36, 0xf0, 0xe7, 0x70, 0xab, 0x31, 0x51, 0x09, 0x8f, 0x42, 0x5d, 0xbe, 0x43,
|
||||
0xec, 0x28, 0xd4, 0x82, 0x18, 0x0d, 0xf5, 0x52, 0x3a, 0x44, 0x7f, 0xe3, 0x1f, 0x60, 0xbf, 0x4a,
|
||||
0xfe, 0x65, 0xce, 0x84, 0x44, 0x1e, 0x6c, 0x6b, 0x49, 0x7e, 0x91, 0x5b, 0x98, 0xe8, 0x21, 0x6c,
|
||||
0x65, 0xaa, 0x4d, 0x85, 0x76, 0xaf, 0x49, 0xbb, 0x0a, 0x20, 0x26, 0x0e, 0x7f, 0x05, 0x6e, 0x4d,
|
||||
0x5b, 0x9a, 0x70, 0xc1, 0xd0, 0x63, 0xd8, 0xce, 0xb4, 0x4e, 0xe1, 0x59, 0x9a, 0xe6, 0xc3, 0xb5,
|
||||
0x2d, 0x20, 0x45, 0x24, 0xfe, 0x0d, 0x6e, 0x9c, 0x9e, 0xfd, 0xc4, 0x02, 0xa9, 0x9c, 0x2f, 0x99,
|
||||
0x10, 0x74, 0xc2, 0xde, 0xa1, 0xd4, 0x53, 0x35, 0xd2, 0x78, 0xe1, 0x17, 0xab, 0x2d, 0x4c, 0xe5,
|
||||
0x49, 0xe9, 0x22, 0x4e, 0x68, 0xa8, 0xbb, 0xe8, 0x90, 0xc2, 0x44, 0x3d, 0xd8, 0x49, 0x74, 0x09,
|
||||
0x3f, 0xf4, 0xda, 0x3a, 0xa9, 0xb4, 0xf1, 0x08, 0xdc, 0xb1, 0xa2, 0x7e, 0x35, 0x17, 0xd3, 0xa2,
|
||||
0x4f, 0x8f, 0x2a, 0x26, 0x55, 0x7d, 0x77, 0x78, 0xbb, 0xb6, 0x8e, 0x3c, 0x3a, 0x77, 0x97, 0x25,
|
||||
0xf0, 0x4d, 0xb8, 0x51, 0xa3, 0xc9, 0xfb, 0x81, 0x71, 0xc9, 0x1d, 0xc7, 0x05, 0xf7, 0xb5, 0xad,
|
||||
0xc3, 0xcf, 0xcb, 0x44, 0x15, 0x63, 0x1a, 0xf9, 0x1e, 0x02, 0x7e, 0xb7, 0xc1, 0xa9, 0x7b, 0xd0,
|
||||
0x53, 0xd8, 0xd5, 0x39, 0xaa, 0xef, 0x2c, 0x33, 0x3c, 0x77, 0x6b, 0x3c, 0x84, 0xbe, 0x19, 0x57,
|
||||
0x01, 0xdf, 0x45, 0x72, 0xea, 0x87, 0xa4, 0x9e, 0x83, 0xfa, 0x00, 0x34, 0x88, 0x0d, 0xa1, 0x6e,
|
||||
0xb7, 0x43, 0x6a, 0x08, 0xc2, 0xe0, 0x54, 0x96, 0x9f, 0xb7, 0xbd, 0x43, 0x96, 0x30, 0x34, 0x84,
|
||||
0x03, 0x4d, 0x39, 0x66, 0x52, 0x46, 0x7c, 0x22, 0x0a, 0xb6, 0xb6, 0x66, 0x6b, 0xf4, 0xa1, 0x4f,
|
||||
0xe0, 0x83, 0x26, 0xdc, 0x0f, 0xbd, 0x4d, 0x5d, 0x61, 0x8d, 0x17, 0xff, 0x65, 0xc1, 0x6e, 0x6d,
|
||||
0x49, 0x6a, 0xdf, 0xa3, 0x90, 0x71, 0x19, 0xc9, 0x85, 0xb9, 0xab, 0xa5, 0x8d, 0x0e, 0xa1, 0x23,
|
||||
0xa3, 0x19, 0x13, 0x92, 0xce, 0x52, 0xbd, 0xb4, 0x0d, 0x52, 0x01, 0xca, 0xab, 0x6b, 0x7c, 0xbb,
|
||||
0x48, 0x99, 0x59, 0x56, 0x05, 0xa0, 0x8f, 0xa0, 0xab, 0x0e, 0x5d, 0x14, 0x50, 0x19, 0x25, 0xfc,
|
||||
0x6b, 0xb6, 0xd0, 0xab, 0x69, 0x93, 0x6b, 0xa8, 0xba, 0x96, 0x82, 0xb1, 0x5c, 0xb5, 0x43, 0xf4,
|
||||
0x37, 0x7e, 0x05, 0xdd, 0xe5, 0xc6, 0xa3, 0xc1, 0xea, 0x46, 0x39, 0xcb, 0xfb, 0xa0, 0xd4, 0x44,
|
||||
0x13, 0x4e, 0xe5, 0x3c, 0x63, 0x66, 0x1b, 0x2a, 0x00, 0x9f, 0xc0, 0x41, 0xd3, 0x56, 0xaa, 0xac,
|
||||
0x8c, 0xbe, 0x59, 0x62, 0xad, 0x00, 0x73, 0x0e, 0xed, 0xf2, 0x1c, 0xfe, 0x08, 0x07, 0xe3, 0x7a,
|
||||
0x57, 0x8f, 0x13, 0x2e, 0xd5, 0xa8, 0xf9, 0x12, 0x9c, 0xfc, 0xae, 0x9c, 0xb0, 0x98, 0x49, 0xd6,
|
||||
0x70, 0x1e, 0x4f, 0x6b, 0xee, 0x17, 0x2d, 0xb2, 0x14, 0xfe, 0x6c, 0x1b, 0x36, 0x7f, 0xa5, 0xf1,
|
||||
0x9c, 0xe1, 0x3e, 0x38, 0xf5, 0xc0, 0x95, 0x7b, 0xf0, 0x29, 0xdc, 0x5a, 0xaa, 0x3f, 0xe6, 0x34,
|
||||
0x15, 0xd3, 0x44, 0xaa, 0x43, 0x18, 0xea, 0x94, 0xd0, 0x0f, 0xf3, 0xb9, 0xd2, 0x21, 0x35, 0x04,
|
||||
0xff, 0x61, 0x81, 0x53, 0x24, 0x9d, 0x50, 0x49, 0xd1, 0x13, 0xd8, 0x0e, 0x72, 0xf1, 0x66, 0x0a,
|
||||
0xdd, 0xbd, 0x7e, 0x79, 0xae, 0xad, 0x91, 0x14, 0xf1, 0x6a, 0x88, 0x0b, 0x53, 0x57, 0xb7, 0x66,
|
||||
0x79, 0x88, 0x37, 0xea, 0x23, 0x65, 0x06, 0xfe, 0xd9, 0x5c, 0xe5, 0xf1, 0xfc, 0x4c, 0x04, 0x59,
|
||||
0x94, 0xaa, 0x63, 0xa0, 0xce, 0xa0, 0x19, 0x5d, 0x85, 0xf8, 0xd2, 0x46, 0x9f, 0xc1, 0x16, 0x0d,
|
||||
0x54, 0x94, 0x2e, 0xd6, 0x1d, 0xe2, 0x95, 0x62, 0x35, 0xa6, 0xa7, 0x3a, 0x92, 0x98, 0x8c, 0x7b,
|
||||
0x01, 0xec, 0x8c, 0xb2, 0xec, 0x38, 0x09, 0x99, 0x40, 0x5d, 0x80, 0xd7, 0x9c, 0x5d, 0xa4, 0x2c,
|
||||
0x90, 0x2c, 0x74, 0x5b, 0xc8, 0x35, 0xa3, 0xe0, 0x65, 0x24, 0x44, 0xc4, 0x27, 0xae, 0x85, 0xf6,
|
||||
0xcd, 0xc5, 0x18, 0x5d, 0x44, 0x42, 0x0a, 0xd7, 0x46, 0x37, 0x61, 0x5f, 0x03, 0xdf, 0x24, 0xd2,
|
||||
0xe7, 0xc7, 0x34, 0x98, 0x32, 0x77, 0x43, 0x45, 0x8d, 0xb2, 0x2c, 0xc9, 0x4e, 0xcf, 0xcf, 0x05,
|
||||
0x93, 0x6e, 0x78, 0xef, 0x09, 0xdc, 0x5e, 0xa3, 0x03, 0xed, 0x41, 0xc7, 0xa0, 0x67, 0xcc, 0x6d,
|
||||
0xa9, 0xd4, 0xd7, 0x5c, 0x94, 0x80, 0x35, 0xfc, 0xdb, 0x86, 0x4e, 0x9e, 0xbb, 0xe0, 0x01, 0x3a,
|
||||
0x86, 0x9d, 0x62, 0xfe, 0xa3, 0x5e, 0xe3, 0xa3, 0xa0, 0xa7, 0x63, 0xef, 0x4e, 0xf3, 0x83, 0x91,
|
||||
0x4f, 0xc5, 0xe7, 0x86, 0x51, 0xcd, 0x58, 0x74, 0x67, 0x65, 0x22, 0x56, 0x03, 0xbc, 0x77, 0xd8,
|
||||
0xec, 0x5c, 0xe1, 0x89, 0xe3, 0x26, 0x9e, 0x72, 0x58, 0x37, 0xf1, 0xd4, 0xa6, 0x34, 0x01, 0xb7,
|
||||
0x7a, 0xb9, 0xc6, 0x32, 0x63, 0x74, 0x86, 0x0e, 0x57, 0x2e, 0x46, 0xed, 0x59, 0xeb, 0xbd, 0xd3,
|
||||
0x7b, 0x64, 0x3d, 0xb4, 0x9e, 0x7d, 0xfc, 0xcf, 0x65, 0xdf, 0x7a, 0x7b, 0xd9, 0xb7, 0xfe, 0xbb,
|
||||
0xec, 0x5b, 0x7f, 0x5e, 0xf5, 0x5b, 0x6f, 0xaf, 0xfa, 0xad, 0x7f, 0xaf, 0xfa, 0xad, 0xef, 0x7b,
|
||||
0xeb, 0xff, 0x1f, 0x3a, 0xdb, 0xd2, 0x7f, 0x1e, 0xff, 0x1f, 0x00, 0x00, 0xff, 0xff, 0x00, 0xa7,
|
||||
0x75, 0x0a, 0x34, 0x09, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
|
||||
@ -1868,6 +1952,43 @@ func (m *SettingsData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.Action != 0 {
|
||||
i = encodeVarintSpacesync(dAtA, i, uint64(m.Action))
|
||||
i--
|
||||
dAtA[i] = 0x10
|
||||
}
|
||||
if len(m.SpaceIds) > 0 {
|
||||
for iNdEx := len(m.SpaceIds) - 1; iNdEx >= 0; iNdEx-- {
|
||||
i -= len(m.SpaceIds[iNdEx])
|
||||
copy(dAtA[i:], m.SpaceIds[iNdEx])
|
||||
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.SpaceIds[iNdEx])))
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovSpacesync(v)
|
||||
base := offset
|
||||
@ -2204,6 +2325,24 @@ func (m *SettingsData) Size() (n int) {
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *SpaceSubscription) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.SpaceIds) > 0 {
|
||||
for _, s := range m.SpaceIds {
|
||||
l = len(s)
|
||||
n += 1 + l + sovSpacesync(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.Action != 0 {
|
||||
n += 1 + sovSpacesync(uint64(m.Action))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovSpacesync(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
@ -4261,6 +4400,107 @@ func (m *SettingsData) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *SpaceSubscription) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowSpacesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: SpaceSubscription: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: SpaceSubscription: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field SpaceIds", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowSpacesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthSpacesync
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthSpacesync
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.SpaceIds = append(m.SpaceIds, string(dAtA[iNdEx:postIndex]))
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType)
|
||||
}
|
||||
m.Action = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowSpacesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Action |= SpaceSubscriptionAction(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipSpacesync(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthSpacesync
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipSpacesync(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
|
||||
@ -74,11 +74,11 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
|
||||
conn drpc.Conn
|
||||
sc sec.SecureConn
|
||||
)
|
||||
log.Warn("dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
||||
log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
||||
for _, addr := range addrs {
|
||||
conn, sc, err = d.handshake(ctx, addr)
|
||||
if err != nil {
|
||||
log.Info("can't connect to host", zap.String("addr", addr), zap.Error(err))
|
||||
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
17
net/streampool/context.go
Normal file
17
net/streampool/context.go
Normal file
@ -0,0 +1,17 @@
|
||||
package streampool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
)
|
||||
|
||||
type streamCtxKey uint
|
||||
|
||||
const (
|
||||
streamCtxKeyStreamId streamCtxKey = iota
|
||||
)
|
||||
|
||||
func streamCtx(ctx context.Context, streamId uint32, peerId string) context.Context {
|
||||
ctx = peer.CtxWithPeerId(ctx, peerId)
|
||||
return context.WithValue(ctx, streamCtxKeyStreamId, streamId)
|
||||
}
|
||||
@ -1,6 +1,8 @@
|
||||
package streampool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
@ -12,7 +14,7 @@ type stream struct {
|
||||
pool *streamPool
|
||||
streamId uint32
|
||||
closed atomic.Bool
|
||||
l *zap.Logger
|
||||
l logger.CtxLogger
|
||||
tags []string
|
||||
}
|
||||
|
||||
@ -34,7 +36,9 @@ func (sr *stream) readLoop() error {
|
||||
sr.l.Info("msg receive error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if err := sr.pool.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil {
|
||||
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
|
||||
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId))
|
||||
if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
||||
sr.l.Info("msg handle error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package streampool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
@ -33,6 +34,10 @@ type StreamPool interface {
|
||||
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
||||
// Broadcast sends a message to all peers with given tags. Works async.
|
||||
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
|
||||
// AddTagsCtx adds tags to stream, stream will be extracted from ctx
|
||||
AddTagsCtx(ctx context.Context, tags ...string) error
|
||||
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
|
||||
RemoveTagsCtx(ctx context.Context, tags ...string) error
|
||||
// Close closes all streams
|
||||
Close() error
|
||||
}
|
||||
@ -142,7 +147,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message)
|
||||
}
|
||||
for _, st := range streams {
|
||||
if err = st.write(msg); err != nil {
|
||||
st.l.Info("sendOne write error", zap.Error(err), zap.Int("streams", len(streams)))
|
||||
st.l.InfoCtx(ctx, "sendOne write error", zap.Error(err), zap.Int("streams", len(streams)))
|
||||
// continue with next stream
|
||||
continue
|
||||
} else {
|
||||
@ -224,7 +229,7 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
||||
for _, st := range streams {
|
||||
funcs = append(funcs, func() {
|
||||
if e := st.write(msg); e != nil {
|
||||
log.Debug("broadcast write error", zap.Error(e))
|
||||
log.DebugCtx(ctx, "broadcast write error", zap.Error(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -234,6 +239,58 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
||||
return s.exec.Add(ctx, funcs...)
|
||||
}
|
||||
|
||||
func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {
|
||||
streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32)
|
||||
if !ok {
|
||||
return fmt.Errorf("context without streamId")
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
st, ok := s.streams[streamId]
|
||||
if !ok {
|
||||
return fmt.Errorf("stream not found")
|
||||
}
|
||||
var newTags = make([]string, 0, len(tags))
|
||||
for _, newTag := range tags {
|
||||
if !slices.Contains(st.tags, newTag) {
|
||||
newTags = append(newTags, newTag)
|
||||
}
|
||||
}
|
||||
st.tags = append(st.tags, newTags...)
|
||||
for _, newTag := range tags {
|
||||
s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) RemoveTagsCtx(ctx context.Context, tags ...string) error {
|
||||
streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32)
|
||||
if !ok {
|
||||
return fmt.Errorf("context without streamId")
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
st, ok := s.streams[streamId]
|
||||
if !ok {
|
||||
return fmt.Errorf("stream not found")
|
||||
}
|
||||
|
||||
var filtered = st.tags[:0]
|
||||
var toRemove = make([]string, 0, len(tags))
|
||||
for _, t := range st.tags {
|
||||
if slices.Contains(tags, t) {
|
||||
toRemove = append(toRemove, t)
|
||||
} else {
|
||||
filtered = append(filtered, t)
|
||||
}
|
||||
}
|
||||
st.tags = filtered
|
||||
for _, t := range toRemove {
|
||||
removeStream(s.streamIdsByTag, t, streamId)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) removeStream(streamId uint32) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@ -242,23 +299,9 @@ func (s *streamPool) removeStream(streamId uint32) {
|
||||
log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId))
|
||||
}
|
||||
|
||||
var removeStream = func(m map[string][]uint32, key string) {
|
||||
streamIds := m[key]
|
||||
idx := slices.Index(streamIds, streamId)
|
||||
if idx == -1 {
|
||||
log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId))
|
||||
}
|
||||
streamIds = slices.Delete(streamIds, idx, idx+1)
|
||||
if len(streamIds) == 0 {
|
||||
delete(m, key)
|
||||
} else {
|
||||
m[key] = streamIds
|
||||
}
|
||||
}
|
||||
|
||||
removeStream(s.streamIdsByPeer, st.peerId)
|
||||
removeStream(s.streamIdsByPeer, st.peerId, streamId)
|
||||
for _, tag := range st.tags {
|
||||
removeStream(s.streamIdsByTag, tag)
|
||||
removeStream(s.streamIdsByTag, tag, streamId)
|
||||
}
|
||||
|
||||
delete(s.streams, streamId)
|
||||
@ -279,8 +322,8 @@ func (s *streamPool) handleMessageLoop() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = s.handler.HandleMessage(context.Background(), hm.peerId, hm.msg); err != nil {
|
||||
log.Warn("handle message error", zap.Error(err))
|
||||
if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil {
|
||||
log.WarnCtx(hm.ctx, "handle message error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -288,3 +331,17 @@ func (s *streamPool) handleMessageLoop() {
|
||||
func (s *streamPool) Close() (err error) {
|
||||
return s.exec.Close()
|
||||
}
|
||||
|
||||
func removeStream(m map[string][]uint32, key string, streamId uint32) {
|
||||
streamIds := m[key]
|
||||
idx := slices.Index(streamIds, streamId)
|
||||
if idx == -1 {
|
||||
log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId))
|
||||
}
|
||||
streamIds = slices.Delete(streamIds, idx, idx+1)
|
||||
if len(streamIds) == 0 {
|
||||
delete(m, key)
|
||||
} else {
|
||||
m[key] = streamIds
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,6 +165,28 @@ func TestStreamPool_SendById(t *testing.T) {
|
||||
assert.Equal(t, "test", msg.ReqData)
|
||||
}
|
||||
|
||||
func TestStreamPool_Tags(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish(t)
|
||||
|
||||
s1, _ := newClientStream(t, fx, "p1")
|
||||
defer s1.Close()
|
||||
fx.AddStream("p1", s1, "t1")
|
||||
|
||||
s2, _ := newClientStream(t, fx, "p2")
|
||||
defer s1.Close()
|
||||
fx.AddStream("p2", s2, "t2")
|
||||
|
||||
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"])
|
||||
|
||||
err = fx.RemoveTagsCtx(streamCtx(ctx, 2, "p2"), "t2")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, fx.StreamPool.(*streamPool).streamIdsByTag["t2"], 0)
|
||||
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
fx := &fixture{}
|
||||
ts := rpctest.NewTestServer()
|
||||
|
||||
@ -3,6 +3,7 @@ package periodicsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
@ -14,8 +15,9 @@ type PeriodicSync interface {
|
||||
|
||||
type SyncerFunc func(ctx context.Context) error
|
||||
|
||||
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
|
||||
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l logger.CtxLogger) PeriodicSync {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "periodicSync"))
|
||||
return &periodicSync{
|
||||
syncer: syncer,
|
||||
log: l,
|
||||
@ -28,7 +30,7 @@ func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc
|
||||
}
|
||||
|
||||
type periodicSync struct {
|
||||
log *zap.Logger
|
||||
log logger.CtxLogger
|
||||
syncer SyncerFunc
|
||||
syncCtx context.Context
|
||||
syncCancel context.CancelFunc
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user