diff --git a/app/logger/ctxfiled.go b/app/logger/ctxfiled.go new file mode 100644 index 00000000..83d8bba4 --- /dev/null +++ b/app/logger/ctxfiled.go @@ -0,0 +1,55 @@ +package logger + +import ( + "context" + "go.uber.org/zap" +) + +type ctxKey uint + +const ( + ctxKeyFields ctxKey = iota +) + +func WithCtx(ctx context.Context, l *zap.Logger) *zap.Logger { + return l.With(CtxGetFields(ctx)...) +} + +func CtxWithFields(ctx context.Context, fields ...zap.Field) context.Context { + existingFields := CtxGetFields(ctx) + if existingFields != nil { + existingFields = append(existingFields, fields...) + } + return context.WithValue(ctx, ctxKeyFields, fields) +} + +func CtxGetFields(ctx context.Context) (fields []zap.Field) { + if v := ctx.Value(ctxKeyFields); v != nil { + return v.([]zap.Field) + } + return +} + +type CtxLogger struct { + *zap.Logger +} + +func (cl CtxLogger) DebugCtx(ctx context.Context, msg string, fields ...zap.Field) { + cl.Logger.Debug(msg, append(CtxGetFields(ctx), fields...)...) +} + +func (cl CtxLogger) InfoCtx(ctx context.Context, msg string, fields ...zap.Field) { + cl.Logger.Info(msg, append(CtxGetFields(ctx), fields...)...) +} + +func (cl CtxLogger) WarnCtx(ctx context.Context, msg string, fields ...zap.Field) { + cl.Logger.Warn(msg, append(CtxGetFields(ctx), fields...)...) +} + +func (cl CtxLogger) ErrorCtx(ctx context.Context, msg string, fields ...zap.Field) { + cl.Logger.Error(msg, append(CtxGetFields(ctx), fields...)...) +} + +func (cl CtxLogger) With(fields ...zap.Field) CtxLogger { + return CtxLogger{cl.Logger.With(fields...)} +} diff --git a/app/logger/log.go b/app/logger/log.go index a9f30252..028c9988 100644 --- a/app/logger/log.go +++ b/app/logger/log.go @@ -9,7 +9,7 @@ var ( mu sync.Mutex defaultLogger *zap.Logger levels = make(map[string]zap.AtomicLevel) - loggers = make(map[string]*zap.Logger) + loggers = make(map[string]CtxLogger) ) func init() { @@ -22,7 +22,7 @@ func SetDefault(l *zap.Logger) { defer mu.Unlock() *defaultLogger = *l for name, l := range loggers { - *l = *defaultLogger.Named(name) + *l.Logger = *defaultLogger.Named(name) } } @@ -38,13 +38,14 @@ func Default() *zap.Logger { return defaultLogger } -func NewNamed(name string, fields ...zap.Field) *zap.Logger { +func NewNamed(name string, fields ...zap.Field) CtxLogger { mu.Lock() defer mu.Unlock() l := defaultLogger.Named(name) if len(fields) > 0 { l = l.With(fields...) } - loggers[name] = l - return l + ctxL := CtxLogger{l} + loggers[name] = ctxL + return ctxL } diff --git a/commonspace/commongetter.go b/commonspace/commongetter.go index d5f67538..ee7e4b03 100644 --- a/commonspace/commongetter.go +++ b/commonspace/commongetter.go @@ -22,6 +22,9 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter } func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) { + if object == nil { + panic("nil object") + } c.reservedObjects = append(c.reservedObjects, object) } diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 76a3cae9..6186c458 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/anytypeio/any-sync/app/ldiff" + "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/treegetter" @@ -32,7 +33,7 @@ func newDiffSyncer( storage spacestorage.SpaceStorage, clientFactory spacesyncproto.ClientFactory, syncStatus syncstatus.StatusUpdater, - log *zap.Logger) DiffSyncer { + log logger.CtxLogger) DiffSyncer { return &diffSyncer{ diff: diff, spaceId: spaceId, @@ -52,7 +53,7 @@ type diffSyncer struct { cache treegetter.TreeGetter storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory - log *zap.Logger + log logger.CtxLogger deletionState deletionstate.DeletionState syncStatus syncstatus.StatusUpdater } @@ -96,7 +97,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error { d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) } } - d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) + d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) return nil } @@ -131,18 +132,23 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds)), - zap.Int("already deleted ids", totalLen-len(filteredIds))) + zap.Int("already deleted ids", totalLen-len(filteredIds)), + zap.String("peerId", p.Id()), + ) return } func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { + ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees")) for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { + d.log.InfoCtx(ctx, "can't load tree", zap.Error(err)) continue } syncTree, ok := tree.(synctree.SyncTree) if !ok { + d.log.InfoCtx(ctx, "not a sync tree", zap.String("objectId", tId)) continue } // the idea why we call it directly is that if we try to get it from cache diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index f72f66db..0dc4fa89 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -4,6 +4,7 @@ package headsync import ( "context" "github.com/anytypeio/any-sync/app/ldiff" + "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/treegetter" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" @@ -38,7 +39,7 @@ type headSync struct { periodicSync periodicsync.PeriodicSync storage spacestorage.SpaceStorage diff ldiff.Diff - log *zap.Logger + log logger.CtxLogger syncer DiffSyncer syncPeriod int @@ -51,7 +52,7 @@ func NewHeadSync( confConnector confconnector.ConfConnector, cache treegetter.TreeGetter, syncStatus syncstatus.StatusUpdater, - log *zap.Logger) HeadSync { + log logger.CtxLogger) HeadSync { diff := ldiff.New(16, 16) l := log.With(zap.String("spaceId", spaceId)) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index b517397f..8302d2a5 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -49,7 +49,7 @@ type syncTree struct { isDeleted bool } -var log = logger.NewNamed("commonspace.synctree").Sugar() +var log = logger.NewNamed("commonspace.synctree") var buildObjectTree = objecttree.BuildObjectTree var createSyncClient = newSyncClient @@ -145,7 +145,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t } // basically building tree with in-memory storage and validating that it was without errors - log.With(zap.String("id", id)).Debug("validating tree") + log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree") err = objecttree.ValidateRawTree(payload, deps.AclList) if err != nil { return @@ -195,7 +195,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil { - log.Error("broadcast error", zap.Error(e)) + log.ErrorCtx(ctx, "broadcast error", zap.Error(e)) } } return @@ -261,7 +261,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. } func (s *syncTree) Delete() (err error) { - log.With("id", s.Id()).Debug("deleting sync tree") + log.With(zap.String("id", s.Id())).Debug("deleting sync tree") s.Lock() defer s.Unlock() if err = s.checkAlive(); err != nil { @@ -276,7 +276,7 @@ func (s *syncTree) Delete() (err error) { } func (s *syncTree) Close() (err error) { - log.With("id", s.Id()).Debug("closing sync tree") + log.With(zap.String("id", s.Id())).Debug("closing sync tree") s.Lock() defer s.Unlock() if s.isClosed { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 11dbb235..7f134381 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -82,29 +82,30 @@ func (s *syncTreeHandler) handleHeadUpdate( objTree = s.objTree ) - log := log.With("senderId", senderId). - With("heads", objTree.Heads()). - With("treeId", objTree.Id()) - log.Debug("received head update message") + log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id())) + log.DebugCtx(ctx, "received head update message") defer func() { if err != nil { log.With(zap.Error(err)).Debug("head update finished with error") } else if fullRequest != nil { - log.Debug("sending full sync request") + log.DebugCtx(ctx, "sending full sync request") } else { if !isEmptyUpdate { - log.Debug("head update finished correctly") + log.DebugCtx(ctx, "head update finished correctly") } } }() // isEmptyUpdate is sent when the tree is brought up from cache if isEmptyUpdate { - log.With("treeId", objTree.Id()).Debug("is empty update") - if slice.UnsortedEquals(objTree.Heads(), update.Heads) { + + headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) + log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) + if headEquals { return } + // we need to sync in any case fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) if err != nil { @@ -149,20 +150,17 @@ func (s *syncTreeHandler) handleFullSyncRequest( objTree = s.objTree ) - log := log.With("senderId", senderId). - With("heads", request.Heads). - With("treeId", s.objTree.Id()). - With("replyId", replyId) - log.Debug("received full sync request message") + log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId)) + log.DebugCtx(ctx, "received full sync request message") defer func() { if err != nil { - log.With(zap.Error(err)).Debug("full sync request finished with error") + log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error") s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId) return } else if fullResponse != nil { - log.Debug("full sync response sent") + log.DebugCtx(ctx, "full sync response sent") } }() @@ -190,16 +188,14 @@ func (s *syncTreeHandler) handleFullSyncResponse( var ( objTree = s.objTree ) - log := log.With("senderId", senderId). - With("heads", response.Heads). - With("treeId", s.objTree.Id()) - log.Debug("received full sync response message") + log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id())) + log.DebugCtx(ctx, "received full sync response message") defer func() { if err != nil { - log.With(zap.Error(err)).Debug("full sync response failed") + log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed") } else { - log.Debug("full sync response succeeded") + log.DebugCtx(ctx, "full sync response succeeded") } }() diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 4e612b97..bca09e66 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -59,7 +59,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn defer cancel() newCounter := s.counter.Add(1) msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter) - log.Info("mpool sendSync", zap.String("replyId", msg.ReplyId)) + log.InfoCtx(ctx, "mpool sendSync", zap.String("replyId", msg.ReplyId)) s.waitersMx.Lock() waiter := responseWaiter{ ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), @@ -77,7 +77,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn delete(s.waiters, msg.ReplyId) s.waitersMx.Unlock() - log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting") + log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting") err = ctx.Err() case reply = <-waiter.ch: // success @@ -87,42 +87,27 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - select { - case <-ctx.Done(): - log.Warn("ctx.Done") - default: - } return s.StreamManager.SendPeer(ctx, peerId, msg) } func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - select { - case <-ctx.Done(): - log.Warn("ctx.Done") - default: - } return s.StreamManager.SendResponsible(ctx, msg) } func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() - select { - case <-ctx.Done(): - log.Warn("ctx.Done") - default: - } return s.StreamManager.Broadcast(ctx, msg) } func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() if msg.ReplyId != "" { - log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId)) + log.InfoCtx(ctx, "mpool receive reply", zap.String("replyId", msg.ReplyId)) // we got reply, send it to waiter if s.stopWaiter(msg) { return } - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") + log.DebugCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId)) } return s.messageHandler(ctx, senderId, msg) } diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index e8ad33d3..20dd0869 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -81,7 +81,7 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message } func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).Debug("handling message") + log.With(zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).DebugCtx(ctx, "handling message") obj, err := s.objectGetter.GetObject(ctx, message.ObjectId) if err != nil { return diff --git a/commonspace/spacesyncproto/protos/spacesync.proto b/commonspace/spacesyncproto/protos/spacesync.proto index b11b182b..f2e4b9fc 100644 --- a/commonspace/spacesyncproto/protos/spacesync.proto +++ b/commonspace/spacesyncproto/protos/spacesync.proto @@ -59,8 +59,8 @@ message ObjectSyncMessage { string replyId = 2; bytes payload = 3; string objectId = 4; -// string identity = 5; -// string peerSignature = 6; + // string identity = 5; + // string peerSignature = 6; } // SpacePushRequest is a request to add space on a node containing only one acl record @@ -134,3 +134,13 @@ message SettingsData { SpaceSettingsSnapshot snapshot = 2; } +// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space +enum SpaceSubscriptionAction { + Subscribe = 0; + Unsubscribe = 1; +} + +message SpaceSubscription { + repeated string spaceIds = 1; + SpaceSubscriptionAction action = 2; +} diff --git a/commonspace/spacesyncproto/spacesync.pb.go b/commonspace/spacesyncproto/spacesync.pb.go index a8ffc521..30dba04b 100644 --- a/commonspace/spacesyncproto/spacesync.pb.go +++ b/commonspace/spacesyncproto/spacesync.pb.go @@ -56,6 +56,32 @@ func (ErrCodes) EnumDescriptor() ([]byte, []int) { return fileDescriptor_80e49f1f4ac27799, []int{0} } +// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space +type SpaceSubscriptionAction int32 + +const ( + SpaceSubscriptionAction_Subscribe SpaceSubscriptionAction = 0 + SpaceSubscriptionAction_Unsubscribe SpaceSubscriptionAction = 1 +) + +var SpaceSubscriptionAction_name = map[int32]string{ + 0: "Subscribe", + 1: "Unsubscribe", +} + +var SpaceSubscriptionAction_value = map[string]int32{ + "Subscribe": 0, + "Unsubscribe": 1, +} + +func (x SpaceSubscriptionAction) String() string { + return proto.EnumName(SpaceSubscriptionAction_name, int32(x)) +} + +func (SpaceSubscriptionAction) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_80e49f1f4ac27799, []int{1} +} + // HeadSyncRange presenting a request for one range type HeadSyncRange struct { From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` @@ -1047,8 +1073,61 @@ func (m *SettingsData) GetSnapshot() *SpaceSettingsSnapshot { return nil } +type SpaceSubscription struct { + SpaceIds []string `protobuf:"bytes,1,rep,name=spaceIds,proto3" json:"spaceIds,omitempty"` + Action SpaceSubscriptionAction `protobuf:"varint,2,opt,name=action,proto3,enum=spacesync.SpaceSubscriptionAction" json:"action,omitempty"` +} + +func (m *SpaceSubscription) Reset() { *m = SpaceSubscription{} } +func (m *SpaceSubscription) String() string { return proto.CompactTextString(m) } +func (*SpaceSubscription) ProtoMessage() {} +func (*SpaceSubscription) Descriptor() ([]byte, []int) { + return fileDescriptor_80e49f1f4ac27799, []int{18} +} +func (m *SpaceSubscription) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SpaceSubscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SpaceSubscription.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SpaceSubscription) XXX_Merge(src proto.Message) { + xxx_messageInfo_SpaceSubscription.Merge(m, src) +} +func (m *SpaceSubscription) XXX_Size() int { + return m.Size() +} +func (m *SpaceSubscription) XXX_DiscardUnknown() { + xxx_messageInfo_SpaceSubscription.DiscardUnknown(m) +} + +var xxx_messageInfo_SpaceSubscription proto.InternalMessageInfo + +func (m *SpaceSubscription) GetSpaceIds() []string { + if m != nil { + return m.SpaceIds + } + return nil +} + +func (m *SpaceSubscription) GetAction() SpaceSubscriptionAction { + if m != nil { + return m.Action + } + return SpaceSubscriptionAction_Subscribe +} + func init() { proto.RegisterEnum("spacesync.ErrCodes", ErrCodes_name, ErrCodes_value) + proto.RegisterEnum("spacesync.SpaceSubscriptionAction", SpaceSubscriptionAction_name, SpaceSubscriptionAction_value) proto.RegisterType((*HeadSyncRange)(nil), "spacesync.HeadSyncRange") proto.RegisterType((*HeadSyncResult)(nil), "spacesync.HeadSyncResult") proto.RegisterType((*HeadSyncResultElement)(nil), "spacesync.HeadSyncResultElement") @@ -1067,6 +1146,7 @@ func init() { proto.RegisterType((*ObjectDelete)(nil), "spacesync.ObjectDelete") proto.RegisterType((*SpaceSettingsSnapshot)(nil), "spacesync.SpaceSettingsSnapshot") proto.RegisterType((*SettingsData)(nil), "spacesync.SettingsData") + proto.RegisterType((*SpaceSubscription)(nil), "spacesync.SpaceSubscription") } func init() { @@ -1074,64 +1154,68 @@ func init() { } var fileDescriptor_80e49f1f4ac27799 = []byte{ - // 903 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0x1b, 0x45, - 0x14, 0xf7, 0x6e, 0x9c, 0x26, 0x7e, 0xd9, 0x3a, 0xdb, 0x69, 0x0a, 0x8b, 0x1b, 0xb9, 0xd6, 0x1e, - 0x50, 0xc4, 0xa1, 0x7f, 0x52, 0x04, 0x42, 0xc0, 0x81, 0x26, 0x2e, 0x5d, 0xa1, 0x92, 0x6a, 0x0c, - 0x42, 0x42, 0x02, 0x69, 0xba, 0xfb, 0x62, 0x2f, 0x5a, 0xcf, 0x2c, 0x3b, 0x63, 0x1a, 0x1f, 0x38, - 0x70, 0xe2, 0xca, 0x57, 0xe0, 0x3b, 0xf0, 0x21, 0x38, 0xf6, 0xc8, 0x11, 0x25, 0x5f, 0x04, 0xcd, - 0xec, 0x5f, 0xdb, 0x9b, 0x1c, 0xb8, 0x38, 0x33, 0xef, 0xcf, 0xef, 0xfd, 0xde, 0x9b, 0x99, 0xdf, - 0x06, 0x9e, 0x84, 0x62, 0x3e, 0x17, 0x5c, 0xa6, 0x2c, 0xc4, 0x47, 0xe6, 0x57, 0x2e, 0x79, 0x98, - 0x66, 0x42, 0x89, 0x47, 0xe6, 0x57, 0xd6, 0xd6, 0x87, 0xc6, 0x40, 0x7a, 0x95, 0xc1, 0x0f, 0xe0, - 0xf6, 0x0b, 0x64, 0xd1, 0x64, 0xc9, 0x43, 0xca, 0xf8, 0x14, 0x09, 0x81, 0xee, 0x79, 0x26, 0xe6, - 0x9e, 0x35, 0xb2, 0x8e, 0xba, 0xd4, 0xac, 0x49, 0x1f, 0x6c, 0x25, 0x3c, 0xdb, 0x58, 0x6c, 0x25, - 0xc8, 0x01, 0x6c, 0x27, 0xf1, 0x3c, 0x56, 0xde, 0xd6, 0xc8, 0x3a, 0xba, 0x4d, 0xf3, 0x8d, 0x7f, - 0x01, 0xfd, 0x0a, 0x0a, 0xe5, 0x22, 0x51, 0x1a, 0x6b, 0xc6, 0xe4, 0xcc, 0x60, 0x39, 0xd4, 0xac, - 0xc9, 0x67, 0xb0, 0x8b, 0x09, 0xce, 0x91, 0x2b, 0xe9, 0xd9, 0xa3, 0xad, 0xa3, 0xbd, 0xe3, 0xd1, - 0xc3, 0x9a, 0xdf, 0x2a, 0xc0, 0x38, 0x0f, 0xa4, 0x55, 0x86, 0xae, 0x1c, 0x8a, 0x05, 0xaf, 0x2a, - 0x9b, 0x8d, 0xff, 0x29, 0xdc, 0x6b, 0x4d, 0xd4, 0xc4, 0xe3, 0xc8, 0x94, 0xef, 0x51, 0x3b, 0x8e, - 0x0c, 0x21, 0x64, 0x91, 0x69, 0xa5, 0x47, 0xcd, 0xda, 0xff, 0x01, 0xf6, 0xeb, 0xe4, 0x9f, 0x17, - 0x28, 0x15, 0xf1, 0x60, 0xc7, 0x50, 0x0a, 0xca, 0xdc, 0x72, 0x4b, 0x1e, 0xc3, 0xad, 0x4c, 0x8f, - 0xa9, 0xe4, 0xee, 0xb5, 0x71, 0xd7, 0x01, 0xb4, 0x88, 0xf3, 0xbf, 0x04, 0xb7, 0xc1, 0x2d, 0x15, - 0x5c, 0x22, 0x79, 0x0a, 0x3b, 0x99, 0xe1, 0x29, 0x3d, 0xcb, 0xc0, 0xbc, 0x77, 0xed, 0x08, 0x68, - 0x19, 0xe9, 0xff, 0x0a, 0x77, 0xce, 0x5e, 0xff, 0x84, 0xa1, 0xd2, 0xce, 0x97, 0x28, 0x25, 0x9b, - 0xe2, 0x0d, 0x4c, 0x3d, 0x5d, 0x23, 0x4d, 0x96, 0x41, 0xd9, 0x6d, 0xb9, 0xd5, 0x9e, 0x94, 0x2d, - 0x13, 0xc1, 0x22, 0x33, 0x45, 0x87, 0x96, 0x5b, 0x32, 0x80, 0x5d, 0x61, 0x4a, 0x04, 0x91, 0xd7, - 0x35, 0x49, 0xd5, 0xde, 0x1f, 0x83, 0x3b, 0xd1, 0xd0, 0xaf, 0x16, 0x72, 0x56, 0xce, 0xe9, 0x49, - 0x8d, 0xa4, 0xab, 0xef, 0x1d, 0xbf, 0xdb, 0xe8, 0x23, 0x8f, 0xce, 0xdd, 0x55, 0x09, 0xff, 0x2e, - 0xdc, 0x69, 0xc0, 0xe4, 0xf3, 0xf0, 0xfd, 0x0a, 0x3b, 0x49, 0x4a, 0xec, 0xb5, 0xa3, 0xf3, 0x9f, - 0x57, 0x89, 0x3a, 0xa6, 0x18, 0xe4, 0xff, 0x20, 0xf0, 0x9b, 0x0d, 0x4e, 0xd3, 0x43, 0xbe, 0x80, - 0x3d, 0x93, 0xa3, 0xe7, 0x8e, 0x59, 0x81, 0xf3, 0xa0, 0x81, 0x43, 0xd9, 0x9b, 0x49, 0x1d, 0xf0, - 0x5d, 0xac, 0x66, 0x41, 0x44, 0x9b, 0x39, 0x64, 0x08, 0xc0, 0xc2, 0xa4, 0x00, 0x34, 0xe3, 0x76, - 0x68, 0xc3, 0x42, 0x7c, 0x70, 0xea, 0x5d, 0x90, 0x8f, 0xbd, 0x47, 0x57, 0x6c, 0xe4, 0x18, 0x0e, - 0x0c, 0xe4, 0x04, 0x95, 0x8a, 0xf9, 0x54, 0x96, 0x68, 0x5d, 0x83, 0xd6, 0xea, 0x23, 0x1f, 0xc1, - 0x3b, 0x6d, 0xf6, 0x20, 0xf2, 0xb6, 0x4d, 0x85, 0x6b, 0xbc, 0xfe, 0x9f, 0x16, 0xec, 0x35, 0x5a, - 0xd2, 0xe7, 0x1e, 0x47, 0xc8, 0x55, 0xac, 0x96, 0xc5, 0x5b, 0xad, 0xf6, 0xe4, 0x10, 0x7a, 0x2a, - 0x9e, 0xa3, 0x54, 0x6c, 0x9e, 0x9a, 0xd6, 0xb6, 0x68, 0x6d, 0xd0, 0x5e, 0x53, 0xe3, 0x9b, 0x65, - 0x8a, 0x45, 0x5b, 0xb5, 0x81, 0xbc, 0x0f, 0x7d, 0x7d, 0xe9, 0xe2, 0x90, 0xa9, 0x58, 0xf0, 0xaf, - 0x70, 0x69, 0xba, 0xe9, 0xd2, 0x35, 0xab, 0x7e, 0x96, 0x12, 0x31, 0x67, 0xed, 0x50, 0xb3, 0xf6, - 0x5f, 0x41, 0x7f, 0x75, 0xf0, 0x64, 0xb4, 0x79, 0x50, 0xce, 0xea, 0x39, 0x68, 0x36, 0xf1, 0x94, - 0x33, 0xb5, 0xc8, 0xb0, 0x38, 0x86, 0xda, 0xe0, 0x9f, 0xc2, 0x41, 0xdb, 0x51, 0xea, 0xac, 0x8c, - 0xbd, 0x59, 0x41, 0xad, 0x0d, 0xc5, 0x3d, 0xb4, 0xab, 0x7b, 0xf8, 0x23, 0x1c, 0x4c, 0x9a, 0x53, - 0x3d, 0x11, 0x5c, 0x69, 0xa9, 0xf9, 0x1c, 0x9c, 0xfc, 0xad, 0x9c, 0x62, 0x82, 0x0a, 0x5b, 0xee, - 0xe3, 0x59, 0xc3, 0xfd, 0xa2, 0x43, 0x57, 0xc2, 0x9f, 0xed, 0xc0, 0xf6, 0x2f, 0x2c, 0x59, 0xa0, - 0x3f, 0x04, 0xa7, 0x19, 0xb8, 0xf1, 0x0e, 0x3e, 0x86, 0x7b, 0x2b, 0xf5, 0x27, 0x9c, 0xa5, 0x72, - 0x26, 0x94, 0xbe, 0x84, 0x91, 0x49, 0x89, 0x82, 0x28, 0xd7, 0x95, 0x1e, 0x6d, 0x58, 0xfc, 0xdf, - 0x2d, 0x70, 0xca, 0xa4, 0x53, 0xa6, 0x18, 0xf9, 0x04, 0x76, 0xc2, 0x9c, 0x7c, 0xa1, 0x42, 0x0f, - 0xd6, 0x1f, 0xcf, 0x5a, 0x8f, 0xb4, 0x8c, 0xd7, 0x22, 0x2e, 0x8b, 0xba, 0x66, 0x34, 0xab, 0x22, - 0xde, 0xca, 0x8f, 0x56, 0x19, 0x1f, 0x84, 0xb0, 0x3b, 0xce, 0xb2, 0x13, 0x11, 0xa1, 0x24, 0x7d, - 0x80, 0x6f, 0x39, 0x5e, 0xa4, 0x18, 0x2a, 0x8c, 0xdc, 0x0e, 0x71, 0x8b, 0xd7, 0xf9, 0x32, 0x96, - 0x32, 0xe6, 0x53, 0xd7, 0x22, 0xfb, 0xc5, 0x5d, 0x1d, 0x5f, 0xc4, 0x52, 0x49, 0xd7, 0x26, 0x77, - 0x61, 0xdf, 0x18, 0xbe, 0x16, 0x2a, 0xe0, 0x27, 0x2c, 0x9c, 0xa1, 0xbb, 0xa5, 0xa3, 0xc6, 0x59, - 0x26, 0xb2, 0xb3, 0xf3, 0x73, 0x89, 0xca, 0x8d, 0x8e, 0xff, 0xb2, 0xa1, 0x97, 0x13, 0x59, 0xf2, - 0x90, 0x9c, 0xc0, 0x6e, 0xa9, 0xab, 0x64, 0xd0, 0x2a, 0xb6, 0x46, 0x75, 0x06, 0xf7, 0xdb, 0x85, - 0x38, 0x57, 0x9b, 0xe7, 0x05, 0xa2, 0xd6, 0x2e, 0x72, 0x7f, 0x43, 0x69, 0x6a, 0x61, 0x1c, 0x1c, - 0xb6, 0x3b, 0x37, 0x70, 0x92, 0xa4, 0x0d, 0xa7, 0x12, 0xc1, 0x36, 0x9c, 0x86, 0xfa, 0x51, 0x70, - 0xeb, 0x2f, 0xc2, 0x44, 0x65, 0xc8, 0xe6, 0xe4, 0x70, 0xe3, 0xc2, 0x35, 0x3e, 0x17, 0x83, 0x1b, - 0xbd, 0x47, 0xd6, 0x63, 0xeb, 0xd9, 0x87, 0x7f, 0x5f, 0x0e, 0xad, 0xb7, 0x97, 0x43, 0xeb, 0xdf, - 0xcb, 0xa1, 0xf5, 0xc7, 0xd5, 0xb0, 0xf3, 0xf6, 0x6a, 0xd8, 0xf9, 0xe7, 0x6a, 0xd8, 0xf9, 0x7e, - 0x70, 0xfd, 0xff, 0x19, 0xaf, 0x6f, 0x99, 0x3f, 0x4f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x24, - 0x0c, 0x9c, 0xee, 0x8c, 0x08, 0x00, 0x00, + // 966 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x6e, 0xdb, 0x46, + 0x10, 0x16, 0x69, 0xf9, 0x47, 0x63, 0x5a, 0x66, 0x36, 0x4e, 0xc3, 0x2a, 0x86, 0x22, 0xec, 0xa1, + 0x30, 0x72, 0xc8, 0x8f, 0x52, 0xb4, 0x48, 0x7f, 0x0e, 0x89, 0xad, 0x34, 0x44, 0x91, 0x3a, 0x58, + 0x35, 0x28, 0x50, 0xa0, 0x05, 0xd6, 0xe4, 0x5a, 0x62, 0x4b, 0x2d, 0x59, 0xee, 0xaa, 0xb1, 0x0e, + 0x3d, 0xf4, 0xd4, 0x6b, 0x5f, 0xa1, 0xef, 0xd0, 0x87, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x45, + 0x8a, 0x5d, 0x2e, 0x7f, 0x64, 0x51, 0x39, 0xe4, 0x22, 0x73, 0xbe, 0x99, 0xf9, 0xe6, 0xdb, 0xd9, + 0xdd, 0x59, 0xc3, 0xa3, 0x20, 0x99, 0xcd, 0x12, 0x2e, 0x52, 0x1a, 0xb0, 0x07, 0xfa, 0x57, 0x2c, + 0x78, 0x90, 0x66, 0x89, 0x4c, 0x1e, 0xe8, 0x5f, 0x51, 0xa1, 0xf7, 0x35, 0x80, 0x3a, 0x25, 0x80, + 0x7d, 0xd8, 0x7b, 0xc1, 0x68, 0x38, 0x5e, 0xf0, 0x80, 0x50, 0x3e, 0x61, 0x08, 0x41, 0xfb, 0x3c, + 0x4b, 0x66, 0x9e, 0x35, 0xb0, 0x8e, 0xda, 0x44, 0x7f, 0xa3, 0x2e, 0xd8, 0x32, 0xf1, 0x6c, 0x8d, + 0xd8, 0x32, 0x41, 0x07, 0xb0, 0x19, 0x47, 0xb3, 0x48, 0x7a, 0x1b, 0x03, 0xeb, 0x68, 0x8f, 0xe4, + 0x06, 0xbe, 0x80, 0x6e, 0x49, 0xc5, 0xc4, 0x3c, 0x96, 0x8a, 0x6b, 0x4a, 0xc5, 0x54, 0x73, 0x39, + 0x44, 0x7f, 0xa3, 0x2f, 0x60, 0x87, 0xc5, 0x6c, 0xc6, 0xb8, 0x14, 0x9e, 0x3d, 0xd8, 0x38, 0xda, + 0x1d, 0x0e, 0xee, 0x57, 0xfa, 0x96, 0x09, 0x46, 0x79, 0x20, 0x29, 0x33, 0x54, 0xe5, 0x20, 0x99, + 0xf3, 0xb2, 0xb2, 0x36, 0xf0, 0xe7, 0x70, 0xab, 0x31, 0x51, 0x09, 0x8f, 0x42, 0x5d, 0xbe, 0x43, + 0xec, 0x28, 0xd4, 0x82, 0x18, 0x0d, 0xf5, 0x52, 0x3a, 0x44, 0x7f, 0xe3, 0x1f, 0x60, 0xbf, 0x4a, + 0xfe, 0x65, 0xce, 0x84, 0x44, 0x1e, 0x6c, 0x6b, 0x49, 0x7e, 0x91, 0x5b, 0x98, 0xe8, 0x21, 0x6c, + 0x65, 0xaa, 0x4d, 0x85, 0x76, 0xaf, 0x49, 0xbb, 0x0a, 0x20, 0x26, 0x0e, 0x7f, 0x05, 0x6e, 0x4d, + 0x5b, 0x9a, 0x70, 0xc1, 0xd0, 0x63, 0xd8, 0xce, 0xb4, 0x4e, 0xe1, 0x59, 0x9a, 0xe6, 0xc3, 0xb5, + 0x2d, 0x20, 0x45, 0x24, 0xfe, 0x0d, 0x6e, 0x9c, 0x9e, 0xfd, 0xc4, 0x02, 0xa9, 0x9c, 0x2f, 0x99, + 0x10, 0x74, 0xc2, 0xde, 0xa1, 0xd4, 0x53, 0x35, 0xd2, 0x78, 0xe1, 0x17, 0xab, 0x2d, 0x4c, 0xe5, + 0x49, 0xe9, 0x22, 0x4e, 0x68, 0xa8, 0xbb, 0xe8, 0x90, 0xc2, 0x44, 0x3d, 0xd8, 0x49, 0x74, 0x09, + 0x3f, 0xf4, 0xda, 0x3a, 0xa9, 0xb4, 0xf1, 0x08, 0xdc, 0xb1, 0xa2, 0x7e, 0x35, 0x17, 0xd3, 0xa2, + 0x4f, 0x8f, 0x2a, 0x26, 0x55, 0x7d, 0x77, 0x78, 0xbb, 0xb6, 0x8e, 0x3c, 0x3a, 0x77, 0x97, 0x25, + 0xf0, 0x4d, 0xb8, 0x51, 0xa3, 0xc9, 0xfb, 0x81, 0x71, 0xc9, 0x1d, 0xc7, 0x05, 0xf7, 0xb5, 0xad, + 0xc3, 0xcf, 0xcb, 0x44, 0x15, 0x63, 0x1a, 0xf9, 0x1e, 0x02, 0x7e, 0xb7, 0xc1, 0xa9, 0x7b, 0xd0, + 0x53, 0xd8, 0xd5, 0x39, 0xaa, 0xef, 0x2c, 0x33, 0x3c, 0x77, 0x6b, 0x3c, 0x84, 0xbe, 0x19, 0x57, + 0x01, 0xdf, 0x45, 0x72, 0xea, 0x87, 0xa4, 0x9e, 0x83, 0xfa, 0x00, 0x34, 0x88, 0x0d, 0xa1, 0x6e, + 0xb7, 0x43, 0x6a, 0x08, 0xc2, 0xe0, 0x54, 0x96, 0x9f, 0xb7, 0xbd, 0x43, 0x96, 0x30, 0x34, 0x84, + 0x03, 0x4d, 0x39, 0x66, 0x52, 0x46, 0x7c, 0x22, 0x0a, 0xb6, 0xb6, 0x66, 0x6b, 0xf4, 0xa1, 0x4f, + 0xe0, 0x83, 0x26, 0xdc, 0x0f, 0xbd, 0x4d, 0x5d, 0x61, 0x8d, 0x17, 0xff, 0x65, 0xc1, 0x6e, 0x6d, + 0x49, 0x6a, 0xdf, 0xa3, 0x90, 0x71, 0x19, 0xc9, 0x85, 0xb9, 0xab, 0xa5, 0x8d, 0x0e, 0xa1, 0x23, + 0xa3, 0x19, 0x13, 0x92, 0xce, 0x52, 0xbd, 0xb4, 0x0d, 0x52, 0x01, 0xca, 0xab, 0x6b, 0x7c, 0xbb, + 0x48, 0x99, 0x59, 0x56, 0x05, 0xa0, 0x8f, 0xa0, 0xab, 0x0e, 0x5d, 0x14, 0x50, 0x19, 0x25, 0xfc, + 0x6b, 0xb6, 0xd0, 0xab, 0x69, 0x93, 0x6b, 0xa8, 0xba, 0x96, 0x82, 0xb1, 0x5c, 0xb5, 0x43, 0xf4, + 0x37, 0x7e, 0x05, 0xdd, 0xe5, 0xc6, 0xa3, 0xc1, 0xea, 0x46, 0x39, 0xcb, 0xfb, 0xa0, 0xd4, 0x44, + 0x13, 0x4e, 0xe5, 0x3c, 0x63, 0x66, 0x1b, 0x2a, 0x00, 0x9f, 0xc0, 0x41, 0xd3, 0x56, 0xaa, 0xac, + 0x8c, 0xbe, 0x59, 0x62, 0xad, 0x00, 0x73, 0x0e, 0xed, 0xf2, 0x1c, 0xfe, 0x08, 0x07, 0xe3, 0x7a, + 0x57, 0x8f, 0x13, 0x2e, 0xd5, 0xa8, 0xf9, 0x12, 0x9c, 0xfc, 0xae, 0x9c, 0xb0, 0x98, 0x49, 0xd6, + 0x70, 0x1e, 0x4f, 0x6b, 0xee, 0x17, 0x2d, 0xb2, 0x14, 0xfe, 0x6c, 0x1b, 0x36, 0x7f, 0xa5, 0xf1, + 0x9c, 0xe1, 0x3e, 0x38, 0xf5, 0xc0, 0x95, 0x7b, 0xf0, 0x29, 0xdc, 0x5a, 0xaa, 0x3f, 0xe6, 0x34, + 0x15, 0xd3, 0x44, 0xaa, 0x43, 0x18, 0xea, 0x94, 0xd0, 0x0f, 0xf3, 0xb9, 0xd2, 0x21, 0x35, 0x04, + 0xff, 0x61, 0x81, 0x53, 0x24, 0x9d, 0x50, 0x49, 0xd1, 0x13, 0xd8, 0x0e, 0x72, 0xf1, 0x66, 0x0a, + 0xdd, 0xbd, 0x7e, 0x79, 0xae, 0xad, 0x91, 0x14, 0xf1, 0x6a, 0x88, 0x0b, 0x53, 0x57, 0xb7, 0x66, + 0x79, 0x88, 0x37, 0xea, 0x23, 0x65, 0x06, 0xfe, 0xd9, 0x5c, 0xe5, 0xf1, 0xfc, 0x4c, 0x04, 0x59, + 0x94, 0xaa, 0x63, 0xa0, 0xce, 0xa0, 0x19, 0x5d, 0x85, 0xf8, 0xd2, 0x46, 0x9f, 0xc1, 0x16, 0x0d, + 0x54, 0x94, 0x2e, 0xd6, 0x1d, 0xe2, 0x95, 0x62, 0x35, 0xa6, 0xa7, 0x3a, 0x92, 0x98, 0x8c, 0x7b, + 0x01, 0xec, 0x8c, 0xb2, 0xec, 0x38, 0x09, 0x99, 0x40, 0x5d, 0x80, 0xd7, 0x9c, 0x5d, 0xa4, 0x2c, + 0x90, 0x2c, 0x74, 0x5b, 0xc8, 0x35, 0xa3, 0xe0, 0x65, 0x24, 0x44, 0xc4, 0x27, 0xae, 0x85, 0xf6, + 0xcd, 0xc5, 0x18, 0x5d, 0x44, 0x42, 0x0a, 0xd7, 0x46, 0x37, 0x61, 0x5f, 0x03, 0xdf, 0x24, 0xd2, + 0xe7, 0xc7, 0x34, 0x98, 0x32, 0x77, 0x43, 0x45, 0x8d, 0xb2, 0x2c, 0xc9, 0x4e, 0xcf, 0xcf, 0x05, + 0x93, 0x6e, 0x78, 0xef, 0x09, 0xdc, 0x5e, 0xa3, 0x03, 0xed, 0x41, 0xc7, 0xa0, 0x67, 0xcc, 0x6d, + 0xa9, 0xd4, 0xd7, 0x5c, 0x94, 0x80, 0x35, 0xfc, 0xdb, 0x86, 0x4e, 0x9e, 0xbb, 0xe0, 0x01, 0x3a, + 0x86, 0x9d, 0x62, 0xfe, 0xa3, 0x5e, 0xe3, 0xa3, 0xa0, 0xa7, 0x63, 0xef, 0x4e, 0xf3, 0x83, 0x91, + 0x4f, 0xc5, 0xe7, 0x86, 0x51, 0xcd, 0x58, 0x74, 0x67, 0x65, 0x22, 0x56, 0x03, 0xbc, 0x77, 0xd8, + 0xec, 0x5c, 0xe1, 0x89, 0xe3, 0x26, 0x9e, 0x72, 0x58, 0x37, 0xf1, 0xd4, 0xa6, 0x34, 0x01, 0xb7, + 0x7a, 0xb9, 0xc6, 0x32, 0x63, 0x74, 0x86, 0x0e, 0x57, 0x2e, 0x46, 0xed, 0x59, 0xeb, 0xbd, 0xd3, + 0x7b, 0x64, 0x3d, 0xb4, 0x9e, 0x7d, 0xfc, 0xcf, 0x65, 0xdf, 0x7a, 0x7b, 0xd9, 0xb7, 0xfe, 0xbb, + 0xec, 0x5b, 0x7f, 0x5e, 0xf5, 0x5b, 0x6f, 0xaf, 0xfa, 0xad, 0x7f, 0xaf, 0xfa, 0xad, 0xef, 0x7b, + 0xeb, 0xff, 0x1f, 0x3a, 0xdb, 0xd2, 0x7f, 0x1e, 0xff, 0x1f, 0x00, 0x00, 0xff, 0xff, 0x00, 0xa7, + 0x75, 0x0a, 0x34, 0x09, 0x00, 0x00, } func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { @@ -1868,6 +1952,43 @@ func (m *SettingsData) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *SpaceSubscription) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SpaceSubscription) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SpaceSubscription) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Action != 0 { + i = encodeVarintSpacesync(dAtA, i, uint64(m.Action)) + i-- + dAtA[i] = 0x10 + } + if len(m.SpaceIds) > 0 { + for iNdEx := len(m.SpaceIds) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.SpaceIds[iNdEx]) + copy(dAtA[i:], m.SpaceIds[iNdEx]) + i = encodeVarintSpacesync(dAtA, i, uint64(len(m.SpaceIds[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int { offset -= sovSpacesync(v) base := offset @@ -2204,6 +2325,24 @@ func (m *SettingsData) Size() (n int) { return n } +func (m *SpaceSubscription) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.SpaceIds) > 0 { + for _, s := range m.SpaceIds { + l = len(s) + n += 1 + l + sovSpacesync(uint64(l)) + } + } + if m.Action != 0 { + n += 1 + sovSpacesync(uint64(m.Action)) + } + return n +} + func sovSpacesync(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -4261,6 +4400,107 @@ func (m *SettingsData) Unmarshal(dAtA []byte) error { } return nil } +func (m *SpaceSubscription) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SpaceSubscription: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SpaceSubscription: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SpaceIds", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpacesync + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSpacesync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SpaceIds = append(m.SpaceIds, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType) + } + m.Action = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Action |= SpaceSubscriptionAction(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSpacesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthSpacesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipSpacesync(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 82fff97a..24718286 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -74,11 +74,11 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro conn drpc.Conn sc sec.SecureConn ) - log.Warn("dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) + log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) for _, addr := range addrs { conn, sc, err = d.handshake(ctx, addr) if err != nil { - log.Info("can't connect to host", zap.String("addr", addr), zap.Error(err)) + log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err)) } else { break } diff --git a/net/streampool/context.go b/net/streampool/context.go new file mode 100644 index 00000000..72203db3 --- /dev/null +++ b/net/streampool/context.go @@ -0,0 +1,17 @@ +package streampool + +import ( + "context" + "github.com/anytypeio/any-sync/net/peer" +) + +type streamCtxKey uint + +const ( + streamCtxKeyStreamId streamCtxKey = iota +) + +func streamCtx(ctx context.Context, streamId uint32, peerId string) context.Context { + ctx = peer.CtxWithPeerId(ctx, peerId) + return context.WithValue(ctx, streamCtxKeyStreamId, streamId) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 45416e34..56e221bd 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -1,6 +1,8 @@ package streampool import ( + "context" + "github.com/anytypeio/any-sync/app/logger" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -12,7 +14,7 @@ type stream struct { pool *streamPool streamId uint32 closed atomic.Bool - l *zap.Logger + l logger.CtxLogger tags []string } @@ -34,7 +36,9 @@ func (sr *stream) readLoop() error { sr.l.Info("msg receive error", zap.Error(err)) return err } - if err := sr.pool.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil { + ctx := streamCtx(context.Background(), sr.streamId, sr.peerId) + ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId)) + if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil { sr.l.Info("msg handle error", zap.Error(err)) return err } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 9e6652f6..f346fb30 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -1,6 +1,7 @@ package streampool import ( + "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" "github.com/cheggaaa/mb/v3" @@ -33,6 +34,10 @@ type StreamPool interface { SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) // Broadcast sends a message to all peers with given tags. Works async. Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) + // AddTagsCtx adds tags to stream, stream will be extracted from ctx + AddTagsCtx(ctx context.Context, tags ...string) error + // RemoveTagsCtx removes tags from stream, stream will be extracted from ctx + RemoveTagsCtx(ctx context.Context, tags ...string) error // Close closes all streams Close() error } @@ -142,7 +147,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) } for _, st := range streams { if err = st.write(msg); err != nil { - st.l.Info("sendOne write error", zap.Error(err), zap.Int("streams", len(streams))) + st.l.InfoCtx(ctx, "sendOne write error", zap.Error(err), zap.Int("streams", len(streams))) // continue with next stream continue } else { @@ -224,7 +229,7 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st for _, st := range streams { funcs = append(funcs, func() { if e := st.write(msg); e != nil { - log.Debug("broadcast write error", zap.Error(e)) + log.DebugCtx(ctx, "broadcast write error", zap.Error(e)) } }) } @@ -234,6 +239,58 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st return s.exec.Add(ctx, funcs...) } +func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error { + streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32) + if !ok { + return fmt.Errorf("context without streamId") + } + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamId] + if !ok { + return fmt.Errorf("stream not found") + } + var newTags = make([]string, 0, len(tags)) + for _, newTag := range tags { + if !slices.Contains(st.tags, newTag) { + newTags = append(newTags, newTag) + } + } + st.tags = append(st.tags, newTags...) + for _, newTag := range tags { + s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId) + } + return nil +} + +func (s *streamPool) RemoveTagsCtx(ctx context.Context, tags ...string) error { + streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32) + if !ok { + return fmt.Errorf("context without streamId") + } + s.mu.Lock() + defer s.mu.Unlock() + st, ok := s.streams[streamId] + if !ok { + return fmt.Errorf("stream not found") + } + + var filtered = st.tags[:0] + var toRemove = make([]string, 0, len(tags)) + for _, t := range st.tags { + if slices.Contains(tags, t) { + toRemove = append(toRemove, t) + } else { + filtered = append(filtered, t) + } + } + st.tags = filtered + for _, t := range toRemove { + removeStream(s.streamIdsByTag, t, streamId) + } + return nil +} + func (s *streamPool) removeStream(streamId uint32) { s.mu.Lock() defer s.mu.Unlock() @@ -242,23 +299,9 @@ func (s *streamPool) removeStream(streamId uint32) { log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId)) } - var removeStream = func(m map[string][]uint32, key string) { - streamIds := m[key] - idx := slices.Index(streamIds, streamId) - if idx == -1 { - log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId)) - } - streamIds = slices.Delete(streamIds, idx, idx+1) - if len(streamIds) == 0 { - delete(m, key) - } else { - m[key] = streamIds - } - } - - removeStream(s.streamIdsByPeer, st.peerId) + removeStream(s.streamIdsByPeer, st.peerId, streamId) for _, tag := range st.tags { - removeStream(s.streamIdsByTag, tag) + removeStream(s.streamIdsByTag, tag, streamId) } delete(s.streams, streamId) @@ -279,8 +322,8 @@ func (s *streamPool) handleMessageLoop() { if err != nil { return } - if err = s.handler.HandleMessage(context.Background(), hm.peerId, hm.msg); err != nil { - log.Warn("handle message error", zap.Error(err)) + if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { + log.WarnCtx(hm.ctx, "handle message error", zap.Error(err)) } } } @@ -288,3 +331,17 @@ func (s *streamPool) handleMessageLoop() { func (s *streamPool) Close() (err error) { return s.exec.Close() } + +func removeStream(m map[string][]uint32, key string, streamId uint32) { + streamIds := m[key] + idx := slices.Index(streamIds, streamId) + if idx == -1 { + log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId)) + } + streamIds = slices.Delete(streamIds, idx, idx+1) + if len(streamIds) == 0 { + delete(m, key) + } else { + m[key] = streamIds + } +} diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index f6e478c1..eb5eb50f 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -165,6 +165,28 @@ func TestStreamPool_SendById(t *testing.T) { assert.Equal(t, "test", msg.ReqData) } +func TestStreamPool_Tags(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + s1, _ := newClientStream(t, fx, "p1") + defer s1.Close() + fx.AddStream("p1", s1, "t1") + + s2, _ := newClientStream(t, fx, "p2") + defer s1.Close() + fx.AddStream("p2", s2, "t2") + + err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3") + require.NoError(t, err) + assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"]) + + err = fx.RemoveTagsCtx(streamCtx(ctx, 2, "p2"), "t2") + require.NoError(t, err) + assert.Len(t, fx.StreamPool.(*streamPool).streamIdsByTag["t2"], 0) + +} + func newFixture(t *testing.T) *fixture { fx := &fixture{} ts := rpctest.NewTestServer() diff --git a/util/periodicsync/periodicsync.go b/util/periodicsync/periodicsync.go index cce08842..3ce74cdd 100644 --- a/util/periodicsync/periodicsync.go +++ b/util/periodicsync/periodicsync.go @@ -3,6 +3,7 @@ package periodicsync import ( "context" + "github.com/anytypeio/any-sync/app/logger" "go.uber.org/zap" "time" ) @@ -14,8 +15,9 @@ type PeriodicSync interface { type SyncerFunc func(ctx context.Context) error -func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync { +func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l logger.CtxLogger) PeriodicSync { ctx, cancel := context.WithCancel(context.Background()) + ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "periodicSync")) return &periodicSync{ syncer: syncer, log: l, @@ -28,7 +30,7 @@ func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc } type periodicSync struct { - log *zap.Logger + log logger.CtxLogger syncer SyncerFunc syncCtx context.Context syncCancel context.CancelFunc