ctx logger + streapool add/removeTags + space subscribe message
This commit is contained in:
parent
cc89d5919e
commit
81bdd3fefb
55
app/logger/ctxfiled.go
Normal file
55
app/logger/ctxfiled.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package logger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ctxKey uint
|
||||||
|
|
||||||
|
const (
|
||||||
|
ctxKeyFields ctxKey = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
func WithCtx(ctx context.Context, l *zap.Logger) *zap.Logger {
|
||||||
|
return l.With(CtxGetFields(ctx)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CtxWithFields(ctx context.Context, fields ...zap.Field) context.Context {
|
||||||
|
existingFields := CtxGetFields(ctx)
|
||||||
|
if existingFields != nil {
|
||||||
|
existingFields = append(existingFields, fields...)
|
||||||
|
}
|
||||||
|
return context.WithValue(ctx, ctxKeyFields, fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CtxGetFields(ctx context.Context) (fields []zap.Field) {
|
||||||
|
if v := ctx.Value(ctxKeyFields); v != nil {
|
||||||
|
return v.([]zap.Field)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type CtxLogger struct {
|
||||||
|
*zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl CtxLogger) DebugCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
|
cl.Logger.Debug(msg, append(CtxGetFields(ctx), fields...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl CtxLogger) InfoCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
|
cl.Logger.Info(msg, append(CtxGetFields(ctx), fields...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl CtxLogger) WarnCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
|
cl.Logger.Warn(msg, append(CtxGetFields(ctx), fields...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl CtxLogger) ErrorCtx(ctx context.Context, msg string, fields ...zap.Field) {
|
||||||
|
cl.Logger.Error(msg, append(CtxGetFields(ctx), fields...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl CtxLogger) With(fields ...zap.Field) CtxLogger {
|
||||||
|
return CtxLogger{cl.Logger.With(fields...)}
|
||||||
|
}
|
||||||
@ -9,7 +9,7 @@ var (
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
defaultLogger *zap.Logger
|
defaultLogger *zap.Logger
|
||||||
levels = make(map[string]zap.AtomicLevel)
|
levels = make(map[string]zap.AtomicLevel)
|
||||||
loggers = make(map[string]*zap.Logger)
|
loggers = make(map[string]CtxLogger)
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -22,7 +22,7 @@ func SetDefault(l *zap.Logger) {
|
|||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
*defaultLogger = *l
|
*defaultLogger = *l
|
||||||
for name, l := range loggers {
|
for name, l := range loggers {
|
||||||
*l = *defaultLogger.Named(name)
|
*l.Logger = *defaultLogger.Named(name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,13 +38,14 @@ func Default() *zap.Logger {
|
|||||||
return defaultLogger
|
return defaultLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNamed(name string, fields ...zap.Field) *zap.Logger {
|
func NewNamed(name string, fields ...zap.Field) CtxLogger {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
l := defaultLogger.Named(name)
|
l := defaultLogger.Named(name)
|
||||||
if len(fields) > 0 {
|
if len(fields) > 0 {
|
||||||
l = l.With(fields...)
|
l = l.With(fields...)
|
||||||
}
|
}
|
||||||
loggers[name] = l
|
ctxL := CtxLogger{l}
|
||||||
return l
|
loggers[name] = ctxL
|
||||||
|
return ctxL
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,9 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
|
func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
|
||||||
|
if object == nil {
|
||||||
|
panic("nil object")
|
||||||
|
}
|
||||||
c.reservedObjects = append(c.reservedObjects, object)
|
c.reservedObjects = append(c.reservedObjects, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anytypeio/any-sync/app/ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff"
|
||||||
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
@ -32,7 +33,7 @@ func newDiffSyncer(
|
|||||||
storage spacestorage.SpaceStorage,
|
storage spacestorage.SpaceStorage,
|
||||||
clientFactory spacesyncproto.ClientFactory,
|
clientFactory spacesyncproto.ClientFactory,
|
||||||
syncStatus syncstatus.StatusUpdater,
|
syncStatus syncstatus.StatusUpdater,
|
||||||
log *zap.Logger) DiffSyncer {
|
log logger.CtxLogger) DiffSyncer {
|
||||||
return &diffSyncer{
|
return &diffSyncer{
|
||||||
diff: diff,
|
diff: diff,
|
||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
@ -52,7 +53,7 @@ type diffSyncer struct {
|
|||||||
cache treegetter.TreeGetter
|
cache treegetter.TreeGetter
|
||||||
storage spacestorage.SpaceStorage
|
storage spacestorage.SpaceStorage
|
||||||
clientFactory spacesyncproto.ClientFactory
|
clientFactory spacesyncproto.ClientFactory
|
||||||
log *zap.Logger
|
log logger.CtxLogger
|
||||||
deletionState deletionstate.DeletionState
|
deletionState deletionstate.DeletionState
|
||||||
syncStatus syncstatus.StatusUpdater
|
syncStatus syncstatus.StatusUpdater
|
||||||
}
|
}
|
||||||
@ -96,7 +97,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
|||||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.log.Info("synced", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
d.log.Info("diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,18 +132,23 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||||
zap.Int("changedIds", len(changedIds)),
|
zap.Int("changedIds", len(changedIds)),
|
||||||
zap.Int("removedIds", len(removedIds)),
|
zap.Int("removedIds", len(removedIds)),
|
||||||
zap.Int("already deleted ids", totalLen-len(filteredIds)))
|
zap.Int("already deleted ids", totalLen-len(filteredIds)),
|
||||||
|
zap.String("peerId", p.Id()),
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||||
|
ctx = logger.CtxWithFields(ctx, zap.String("op", "pingTrees"))
|
||||||
for _, tId := range trees {
|
for _, tId := range trees {
|
||||||
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
d.log.InfoCtx(ctx, "can't load tree", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
syncTree, ok := tree.(synctree.SyncTree)
|
syncTree, ok := tree.(synctree.SyncTree)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
d.log.InfoCtx(ctx, "not a sync tree", zap.String("objectId", tId))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// the idea why we call it directly is that if we try to get it from cache
|
// the idea why we call it directly is that if we try to get it from cache
|
||||||
|
|||||||
@ -4,6 +4,7 @@ package headsync
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anytypeio/any-sync/app/ldiff"
|
"github.com/anytypeio/any-sync/app/ldiff"
|
||||||
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||||
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
|
||||||
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
|
||||||
@ -38,7 +39,7 @@ type headSync struct {
|
|||||||
periodicSync periodicsync.PeriodicSync
|
periodicSync periodicsync.PeriodicSync
|
||||||
storage spacestorage.SpaceStorage
|
storage spacestorage.SpaceStorage
|
||||||
diff ldiff.Diff
|
diff ldiff.Diff
|
||||||
log *zap.Logger
|
log logger.CtxLogger
|
||||||
syncer DiffSyncer
|
syncer DiffSyncer
|
||||||
|
|
||||||
syncPeriod int
|
syncPeriod int
|
||||||
@ -51,7 +52,7 @@ func NewHeadSync(
|
|||||||
confConnector confconnector.ConfConnector,
|
confConnector confconnector.ConfConnector,
|
||||||
cache treegetter.TreeGetter,
|
cache treegetter.TreeGetter,
|
||||||
syncStatus syncstatus.StatusUpdater,
|
syncStatus syncstatus.StatusUpdater,
|
||||||
log *zap.Logger) HeadSync {
|
log logger.CtxLogger) HeadSync {
|
||||||
|
|
||||||
diff := ldiff.New(16, 16)
|
diff := ldiff.New(16, 16)
|
||||||
l := log.With(zap.String("spaceId", spaceId))
|
l := log.With(zap.String("spaceId", spaceId))
|
||||||
|
|||||||
@ -49,7 +49,7 @@ type syncTree struct {
|
|||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
var log = logger.NewNamed("commonspace.synctree")
|
||||||
|
|
||||||
var buildObjectTree = objecttree.BuildObjectTree
|
var buildObjectTree = objecttree.BuildObjectTree
|
||||||
var createSyncClient = newSyncClient
|
var createSyncClient = newSyncClient
|
||||||
@ -145,7 +145,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// basically building tree with in-memory storage and validating that it was without errors
|
// basically building tree with in-memory storage and validating that it was without errors
|
||||||
log.With(zap.String("id", id)).Debug("validating tree")
|
log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree")
|
||||||
err = objecttree.ValidateRawTree(payload, deps.AclList)
|
err = objecttree.ValidateRawTree(payload, deps.AclList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -195,7 +195,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
|||||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||||
// send to everybody, because everybody should know that the node or client got new tree
|
// send to everybody, because everybody should know that the node or client got new tree
|
||||||
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil {
|
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil {
|
||||||
log.Error("broadcast error", zap.Error(e))
|
log.ErrorCtx(ctx, "broadcast error", zap.Error(e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -261,7 +261,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTree) Delete() (err error) {
|
func (s *syncTree) Delete() (err error) {
|
||||||
log.With("id", s.Id()).Debug("deleting sync tree")
|
log.With(zap.String("id", s.Id())).Debug("deleting sync tree")
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
if err = s.checkAlive(); err != nil {
|
if err = s.checkAlive(); err != nil {
|
||||||
@ -276,7 +276,7 @@ func (s *syncTree) Delete() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTree) Close() (err error) {
|
func (s *syncTree) Close() (err error) {
|
||||||
log.With("id", s.Id()).Debug("closing sync tree")
|
log.With(zap.String("id", s.Id())).Debug("closing sync tree")
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
if s.isClosed {
|
if s.isClosed {
|
||||||
|
|||||||
@ -82,29 +82,30 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
)
|
)
|
||||||
|
|
||||||
log := log.With("senderId", senderId).
|
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()))
|
||||||
With("heads", objTree.Heads()).
|
log.DebugCtx(ctx, "received head update message")
|
||||||
With("treeId", objTree.Id())
|
|
||||||
log.Debug("received head update message")
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With(zap.Error(err)).Debug("head update finished with error")
|
log.With(zap.Error(err)).Debug("head update finished with error")
|
||||||
} else if fullRequest != nil {
|
} else if fullRequest != nil {
|
||||||
log.Debug("sending full sync request")
|
log.DebugCtx(ctx, "sending full sync request")
|
||||||
} else {
|
} else {
|
||||||
if !isEmptyUpdate {
|
if !isEmptyUpdate {
|
||||||
log.Debug("head update finished correctly")
|
log.DebugCtx(ctx, "head update finished correctly")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// isEmptyUpdate is sent when the tree is brought up from cache
|
// isEmptyUpdate is sent when the tree is brought up from cache
|
||||||
if isEmptyUpdate {
|
if isEmptyUpdate {
|
||||||
log.With("treeId", objTree.Id()).Debug("is empty update")
|
|
||||||
if slice.UnsortedEquals(objTree.Heads(), update.Heads) {
|
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
|
||||||
|
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
|
||||||
|
if headEquals {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// we need to sync in any case
|
// we need to sync in any case
|
||||||
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
|
fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -149,20 +150,17 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
)
|
)
|
||||||
|
|
||||||
log := log.With("senderId", senderId).
|
log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId))
|
||||||
With("heads", request.Heads).
|
log.DebugCtx(ctx, "received full sync request message")
|
||||||
With("treeId", s.objTree.Id()).
|
|
||||||
With("replyId", replyId)
|
|
||||||
log.Debug("received full sync request message")
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With(zap.Error(err)).Debug("full sync request finished with error")
|
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
||||||
|
|
||||||
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
|
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
|
||||||
return
|
return
|
||||||
} else if fullResponse != nil {
|
} else if fullResponse != nil {
|
||||||
log.Debug("full sync response sent")
|
log.DebugCtx(ctx, "full sync response sent")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -190,16 +188,14 @@ func (s *syncTreeHandler) handleFullSyncResponse(
|
|||||||
var (
|
var (
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
)
|
)
|
||||||
log := log.With("senderId", senderId).
|
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()))
|
||||||
With("heads", response.Heads).
|
log.DebugCtx(ctx, "received full sync response message")
|
||||||
With("treeId", s.objTree.Id())
|
|
||||||
log.Debug("received full sync response message")
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With(zap.Error(err)).Debug("full sync response failed")
|
log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed")
|
||||||
} else {
|
} else {
|
||||||
log.Debug("full sync response succeeded")
|
log.DebugCtx(ctx, "full sync response succeeded")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@ -59,7 +59,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
newCounter := s.counter.Add(1)
|
newCounter := s.counter.Add(1)
|
||||||
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||||
log.Info("mpool sendSync", zap.String("replyId", msg.ReplyId))
|
log.InfoCtx(ctx, "mpool sendSync", zap.String("replyId", msg.ReplyId))
|
||||||
s.waitersMx.Lock()
|
s.waitersMx.Lock()
|
||||||
waiter := responseWaiter{
|
waiter := responseWaiter{
|
||||||
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||||
@ -77,7 +77,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||||||
delete(s.waiters, msg.ReplyId)
|
delete(s.waiters, msg.ReplyId)
|
||||||
s.waitersMx.Unlock()
|
s.waitersMx.Unlock()
|
||||||
|
|
||||||
log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting")
|
log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting")
|
||||||
err = ctx.Err()
|
err = ctx.Err()
|
||||||
case reply = <-waiter.ch:
|
case reply = <-waiter.ch:
|
||||||
// success
|
// success
|
||||||
@ -87,42 +87,27 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
|
|||||||
|
|
||||||
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Warn("ctx.Done")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return s.StreamManager.SendPeer(ctx, peerId, msg)
|
return s.StreamManager.SendPeer(ctx, peerId, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Warn("ctx.Done")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return s.StreamManager.SendResponsible(ctx, msg)
|
return s.StreamManager.SendResponsible(ctx, msg)
|
||||||
}
|
}
|
||||||
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Warn("ctx.Done")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return s.StreamManager.Broadcast(ctx, msg)
|
return s.StreamManager.Broadcast(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
if msg.ReplyId != "" {
|
if msg.ReplyId != "" {
|
||||||
log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId))
|
log.InfoCtx(ctx, "mpool receive reply", zap.String("replyId", msg.ReplyId))
|
||||||
// we got reply, send it to waiter
|
// we got reply, send it to waiter
|
||||||
if s.stopWaiter(msg) {
|
if s.stopWaiter(msg) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
log.DebugCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId))
|
||||||
}
|
}
|
||||||
return s.messageHandler(ctx, senderId, msg)
|
return s.messageHandler(ctx, senderId, msg)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -81,7 +81,7 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
func (s *objectSync) handleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||||
log.With(zap.String("peerId", senderId), zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).Debug("handling message")
|
log.With(zap.String("objectId", message.ObjectId), zap.String("replyId", message.ReplyId)).DebugCtx(ctx, "handling message")
|
||||||
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
|
obj, err := s.objectGetter.GetObject(ctx, message.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|||||||
@ -134,3 +134,13 @@ message SettingsData {
|
|||||||
SpaceSettingsSnapshot snapshot = 2;
|
SpaceSettingsSnapshot snapshot = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space
|
||||||
|
enum SpaceSubscriptionAction {
|
||||||
|
Subscribe = 0;
|
||||||
|
Unsubscribe = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SpaceSubscription {
|
||||||
|
repeated string spaceIds = 1;
|
||||||
|
SpaceSubscriptionAction action = 2;
|
||||||
|
}
|
||||||
|
|||||||
@ -56,6 +56,32 @@ func (ErrCodes) EnumDescriptor() ([]byte, []int) {
|
|||||||
return fileDescriptor_80e49f1f4ac27799, []int{0}
|
return fileDescriptor_80e49f1f4ac27799, []int{0}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SpaceSubscription contains in ObjectSyncMessage.Payload and indicates that we need to subscribe or unsubscribe the current stream to this space
|
||||||
|
type SpaceSubscriptionAction int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
SpaceSubscriptionAction_Subscribe SpaceSubscriptionAction = 0
|
||||||
|
SpaceSubscriptionAction_Unsubscribe SpaceSubscriptionAction = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
var SpaceSubscriptionAction_name = map[int32]string{
|
||||||
|
0: "Subscribe",
|
||||||
|
1: "Unsubscribe",
|
||||||
|
}
|
||||||
|
|
||||||
|
var SpaceSubscriptionAction_value = map[string]int32{
|
||||||
|
"Subscribe": 0,
|
||||||
|
"Unsubscribe": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x SpaceSubscriptionAction) String() string {
|
||||||
|
return proto.EnumName(SpaceSubscriptionAction_name, int32(x))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (SpaceSubscriptionAction) EnumDescriptor() ([]byte, []int) {
|
||||||
|
return fileDescriptor_80e49f1f4ac27799, []int{1}
|
||||||
|
}
|
||||||
|
|
||||||
// HeadSyncRange presenting a request for one range
|
// HeadSyncRange presenting a request for one range
|
||||||
type HeadSyncRange struct {
|
type HeadSyncRange struct {
|
||||||
From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"`
|
From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"`
|
||||||
@ -1047,8 +1073,61 @@ func (m *SettingsData) GetSnapshot() *SpaceSettingsSnapshot {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SpaceSubscription struct {
|
||||||
|
SpaceIds []string `protobuf:"bytes,1,rep,name=spaceIds,proto3" json:"spaceIds,omitempty"`
|
||||||
|
Action SpaceSubscriptionAction `protobuf:"varint,2,opt,name=action,proto3,enum=spacesync.SpaceSubscriptionAction" json:"action,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) Reset() { *m = SpaceSubscription{} }
|
||||||
|
func (m *SpaceSubscription) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*SpaceSubscription) ProtoMessage() {}
|
||||||
|
func (*SpaceSubscription) Descriptor() ([]byte, []int) {
|
||||||
|
return fileDescriptor_80e49f1f4ac27799, []int{18}
|
||||||
|
}
|
||||||
|
func (m *SpaceSubscription) XXX_Unmarshal(b []byte) error {
|
||||||
|
return m.Unmarshal(b)
|
||||||
|
}
|
||||||
|
func (m *SpaceSubscription) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||||
|
if deterministic {
|
||||||
|
return xxx_messageInfo_SpaceSubscription.Marshal(b, m, deterministic)
|
||||||
|
} else {
|
||||||
|
b = b[:cap(b)]
|
||||||
|
n, err := m.MarshalToSizedBuffer(b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return b[:n], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (m *SpaceSubscription) XXX_Merge(src proto.Message) {
|
||||||
|
xxx_messageInfo_SpaceSubscription.Merge(m, src)
|
||||||
|
}
|
||||||
|
func (m *SpaceSubscription) XXX_Size() int {
|
||||||
|
return m.Size()
|
||||||
|
}
|
||||||
|
func (m *SpaceSubscription) XXX_DiscardUnknown() {
|
||||||
|
xxx_messageInfo_SpaceSubscription.DiscardUnknown(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
var xxx_messageInfo_SpaceSubscription proto.InternalMessageInfo
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) GetSpaceIds() []string {
|
||||||
|
if m != nil {
|
||||||
|
return m.SpaceIds
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) GetAction() SpaceSubscriptionAction {
|
||||||
|
if m != nil {
|
||||||
|
return m.Action
|
||||||
|
}
|
||||||
|
return SpaceSubscriptionAction_Subscribe
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
proto.RegisterEnum("spacesync.ErrCodes", ErrCodes_name, ErrCodes_value)
|
proto.RegisterEnum("spacesync.ErrCodes", ErrCodes_name, ErrCodes_value)
|
||||||
|
proto.RegisterEnum("spacesync.SpaceSubscriptionAction", SpaceSubscriptionAction_name, SpaceSubscriptionAction_value)
|
||||||
proto.RegisterType((*HeadSyncRange)(nil), "spacesync.HeadSyncRange")
|
proto.RegisterType((*HeadSyncRange)(nil), "spacesync.HeadSyncRange")
|
||||||
proto.RegisterType((*HeadSyncResult)(nil), "spacesync.HeadSyncResult")
|
proto.RegisterType((*HeadSyncResult)(nil), "spacesync.HeadSyncResult")
|
||||||
proto.RegisterType((*HeadSyncResultElement)(nil), "spacesync.HeadSyncResultElement")
|
proto.RegisterType((*HeadSyncResultElement)(nil), "spacesync.HeadSyncResultElement")
|
||||||
@ -1067,6 +1146,7 @@ func init() {
|
|||||||
proto.RegisterType((*ObjectDelete)(nil), "spacesync.ObjectDelete")
|
proto.RegisterType((*ObjectDelete)(nil), "spacesync.ObjectDelete")
|
||||||
proto.RegisterType((*SpaceSettingsSnapshot)(nil), "spacesync.SpaceSettingsSnapshot")
|
proto.RegisterType((*SpaceSettingsSnapshot)(nil), "spacesync.SpaceSettingsSnapshot")
|
||||||
proto.RegisterType((*SettingsData)(nil), "spacesync.SettingsData")
|
proto.RegisterType((*SettingsData)(nil), "spacesync.SettingsData")
|
||||||
|
proto.RegisterType((*SpaceSubscription)(nil), "spacesync.SpaceSubscription")
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -1074,64 +1154,68 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var fileDescriptor_80e49f1f4ac27799 = []byte{
|
var fileDescriptor_80e49f1f4ac27799 = []byte{
|
||||||
// 903 bytes of a gzipped FileDescriptorProto
|
// 966 bytes of a gzipped FileDescriptorProto
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x6e, 0xdb, 0x46,
|
||||||
0x14, 0xf7, 0x6e, 0x9c, 0x26, 0x7e, 0xd9, 0x3a, 0xdb, 0x69, 0x0a, 0x8b, 0x1b, 0xb9, 0xd6, 0x1e,
|
0x10, 0x16, 0x69, 0xf9, 0x47, 0x63, 0x5a, 0x66, 0x36, 0x4e, 0xc3, 0x2a, 0x86, 0x22, 0xec, 0xa1,
|
||||||
0x50, 0xc4, 0xa1, 0x7f, 0x52, 0x04, 0x42, 0xc0, 0x81, 0x26, 0x2e, 0x5d, 0xa1, 0x92, 0x6a, 0x0c,
|
0x30, 0x72, 0xc8, 0x8f, 0x52, 0xb4, 0x48, 0x7f, 0x0e, 0x89, 0xad, 0x34, 0x44, 0x91, 0x3a, 0x58,
|
||||||
0x42, 0x42, 0x02, 0x69, 0xba, 0xfb, 0x62, 0x2f, 0x5a, 0xcf, 0x2c, 0x3b, 0x63, 0x1a, 0x1f, 0x38,
|
0x35, 0x28, 0x50, 0xa0, 0x05, 0xd6, 0xe4, 0x5a, 0x62, 0x4b, 0x2d, 0x59, 0xee, 0xaa, 0xb1, 0x0e,
|
||||||
0x70, 0xe2, 0xca, 0x57, 0xe0, 0x3b, 0xf0, 0x21, 0x38, 0xf6, 0xc8, 0x11, 0x25, 0x5f, 0x04, 0xcd,
|
0x3d, 0xf4, 0xd4, 0x6b, 0x5f, 0xa1, 0xef, 0xd0, 0x87, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x45,
|
||||||
0xec, 0x5f, 0xdb, 0x9b, 0x1c, 0xb8, 0x38, 0x33, 0xef, 0xcf, 0xef, 0xfd, 0xde, 0x9b, 0x99, 0xdf,
|
0x8a, 0x5d, 0x2e, 0x7f, 0x64, 0x51, 0x39, 0xe4, 0x22, 0x73, 0xbe, 0x99, 0xf9, 0xe6, 0xdb, 0xd9,
|
||||||
0x06, 0x9e, 0x84, 0x62, 0x3e, 0x17, 0x5c, 0xa6, 0x2c, 0xc4, 0x47, 0xe6, 0x57, 0x2e, 0x79, 0x98,
|
0xdd, 0x59, 0xc3, 0xa3, 0x20, 0x99, 0xcd, 0x12, 0x2e, 0x52, 0x1a, 0xb0, 0x07, 0xfa, 0x57, 0x2c,
|
||||||
0x66, 0x42, 0x89, 0x47, 0xe6, 0x57, 0xd6, 0xd6, 0x87, 0xc6, 0x40, 0x7a, 0x95, 0xc1, 0x0f, 0xe0,
|
0x78, 0x90, 0x66, 0x89, 0x4c, 0x1e, 0xe8, 0x5f, 0x51, 0xa1, 0xf7, 0x35, 0x80, 0x3a, 0x25, 0x80,
|
||||||
0xf6, 0x0b, 0x64, 0xd1, 0x64, 0xc9, 0x43, 0xca, 0xf8, 0x14, 0x09, 0x81, 0xee, 0x79, 0x26, 0xe6,
|
0x7d, 0xd8, 0x7b, 0xc1, 0x68, 0x38, 0x5e, 0xf0, 0x80, 0x50, 0x3e, 0x61, 0x08, 0x41, 0xfb, 0x3c,
|
||||||
0x9e, 0x35, 0xb2, 0x8e, 0xba, 0xd4, 0xac, 0x49, 0x1f, 0x6c, 0x25, 0x3c, 0xdb, 0x58, 0x6c, 0x25,
|
0x4b, 0x66, 0x9e, 0x35, 0xb0, 0x8e, 0xda, 0x44, 0x7f, 0xa3, 0x2e, 0xd8, 0x32, 0xf1, 0x6c, 0x8d,
|
||||||
0xc8, 0x01, 0x6c, 0x27, 0xf1, 0x3c, 0x56, 0xde, 0xd6, 0xc8, 0x3a, 0xba, 0x4d, 0xf3, 0x8d, 0x7f,
|
0xd8, 0x32, 0x41, 0x07, 0xb0, 0x19, 0x47, 0xb3, 0x48, 0x7a, 0x1b, 0x03, 0xeb, 0x68, 0x8f, 0xe4,
|
||||||
0x01, 0xfd, 0x0a, 0x0a, 0xe5, 0x22, 0x51, 0x1a, 0x6b, 0xc6, 0xe4, 0xcc, 0x60, 0x39, 0xd4, 0xac,
|
0x06, 0xbe, 0x80, 0x6e, 0x49, 0xc5, 0xc4, 0x3c, 0x96, 0x8a, 0x6b, 0x4a, 0xc5, 0x54, 0x73, 0x39,
|
||||||
0xc9, 0x67, 0xb0, 0x8b, 0x09, 0xce, 0x91, 0x2b, 0xe9, 0xd9, 0xa3, 0xad, 0xa3, 0xbd, 0xe3, 0xd1,
|
0x44, 0x7f, 0xa3, 0x2f, 0x60, 0x87, 0xc5, 0x6c, 0xc6, 0xb8, 0x14, 0x9e, 0x3d, 0xd8, 0x38, 0xda,
|
||||||
0xc3, 0x9a, 0xdf, 0x2a, 0xc0, 0x38, 0x0f, 0xa4, 0x55, 0x86, 0xae, 0x1c, 0x8a, 0x05, 0xaf, 0x2a,
|
0x1d, 0x0e, 0xee, 0x57, 0xfa, 0x96, 0x09, 0x46, 0x79, 0x20, 0x29, 0x33, 0x54, 0xe5, 0x20, 0x99,
|
||||||
0x9b, 0x8d, 0xff, 0x29, 0xdc, 0x6b, 0x4d, 0xd4, 0xc4, 0xe3, 0xc8, 0x94, 0xef, 0x51, 0x3b, 0x8e,
|
0xf3, 0xb2, 0xb2, 0x36, 0xf0, 0xe7, 0x70, 0xab, 0x31, 0x51, 0x09, 0x8f, 0x42, 0x5d, 0xbe, 0x43,
|
||||||
0x0c, 0x21, 0x64, 0x91, 0x69, 0xa5, 0x47, 0xcd, 0xda, 0xff, 0x01, 0xf6, 0xeb, 0xe4, 0x9f, 0x17,
|
0xec, 0x28, 0xd4, 0x82, 0x18, 0x0d, 0xf5, 0x52, 0x3a, 0x44, 0x7f, 0xe3, 0x1f, 0x60, 0xbf, 0x4a,
|
||||||
0x28, 0x15, 0xf1, 0x60, 0xc7, 0x50, 0x0a, 0xca, 0xdc, 0x72, 0x4b, 0x1e, 0xc3, 0xad, 0x4c, 0x8f,
|
0xfe, 0x65, 0xce, 0x84, 0x44, 0x1e, 0x6c, 0x6b, 0x49, 0x7e, 0x91, 0x5b, 0x98, 0xe8, 0x21, 0x6c,
|
||||||
0xa9, 0xe4, 0xee, 0xb5, 0x71, 0xd7, 0x01, 0xb4, 0x88, 0xf3, 0xbf, 0x04, 0xb7, 0xc1, 0x2d, 0x15,
|
0x65, 0xaa, 0x4d, 0x85, 0x76, 0xaf, 0x49, 0xbb, 0x0a, 0x20, 0x26, 0x0e, 0x7f, 0x05, 0x6e, 0x4d,
|
||||||
0x5c, 0x22, 0x79, 0x0a, 0x3b, 0x99, 0xe1, 0x29, 0x3d, 0xcb, 0xc0, 0xbc, 0x77, 0xed, 0x08, 0x68,
|
0x5b, 0x9a, 0x70, 0xc1, 0xd0, 0x63, 0xd8, 0xce, 0xb4, 0x4e, 0xe1, 0x59, 0x9a, 0xe6, 0xc3, 0xb5,
|
||||||
0x19, 0xe9, 0xff, 0x0a, 0x77, 0xce, 0x5e, 0xff, 0x84, 0xa1, 0xd2, 0xce, 0x97, 0x28, 0x25, 0x9b,
|
0x2d, 0x20, 0x45, 0x24, 0xfe, 0x0d, 0x6e, 0x9c, 0x9e, 0xfd, 0xc4, 0x02, 0xa9, 0x9c, 0x2f, 0x99,
|
||||||
0xe2, 0x0d, 0x4c, 0x3d, 0x5d, 0x23, 0x4d, 0x96, 0x41, 0xd9, 0x6d, 0xb9, 0xd5, 0x9e, 0x94, 0x2d,
|
0x10, 0x74, 0xc2, 0xde, 0xa1, 0xd4, 0x53, 0x35, 0xd2, 0x78, 0xe1, 0x17, 0xab, 0x2d, 0x4c, 0xe5,
|
||||||
0x13, 0xc1, 0x22, 0x33, 0x45, 0x87, 0x96, 0x5b, 0x32, 0x80, 0x5d, 0x61, 0x4a, 0x04, 0x91, 0xd7,
|
0x49, 0xe9, 0x22, 0x4e, 0x68, 0xa8, 0xbb, 0xe8, 0x90, 0xc2, 0x44, 0x3d, 0xd8, 0x49, 0x74, 0x09,
|
||||||
0x35, 0x49, 0xd5, 0xde, 0x1f, 0x83, 0x3b, 0xd1, 0xd0, 0xaf, 0x16, 0x72, 0x56, 0xce, 0xe9, 0x49,
|
0x3f, 0xf4, 0xda, 0x3a, 0xa9, 0xb4, 0xf1, 0x08, 0xdc, 0xb1, 0xa2, 0x7e, 0x35, 0x17, 0xd3, 0xa2,
|
||||||
0x8d, 0xa4, 0xab, 0xef, 0x1d, 0xbf, 0xdb, 0xe8, 0x23, 0x8f, 0xce, 0xdd, 0x55, 0x09, 0xff, 0x2e,
|
0x4f, 0x8f, 0x2a, 0x26, 0x55, 0x7d, 0x77, 0x78, 0xbb, 0xb6, 0x8e, 0x3c, 0x3a, 0x77, 0x97, 0x25,
|
||||||
0xdc, 0x69, 0xc0, 0xe4, 0xf3, 0xf0, 0xfd, 0x0a, 0x3b, 0x49, 0x4a, 0xec, 0xb5, 0xa3, 0xf3, 0x9f,
|
0xf0, 0x4d, 0xb8, 0x51, 0xa3, 0xc9, 0xfb, 0x81, 0x71, 0xc9, 0x1d, 0xc7, 0x05, 0xf7, 0xb5, 0xad,
|
||||||
0x57, 0x89, 0x3a, 0xa6, 0x18, 0xe4, 0xff, 0x20, 0xf0, 0x9b, 0x0d, 0x4e, 0xd3, 0x43, 0xbe, 0x80,
|
0xc3, 0xcf, 0xcb, 0x44, 0x15, 0x63, 0x1a, 0xf9, 0x1e, 0x02, 0x7e, 0xb7, 0xc1, 0xa9, 0x7b, 0xd0,
|
||||||
0x3d, 0x93, 0xa3, 0xe7, 0x8e, 0x59, 0x81, 0xf3, 0xa0, 0x81, 0x43, 0xd9, 0x9b, 0x49, 0x1d, 0xf0,
|
0x53, 0xd8, 0xd5, 0x39, 0xaa, 0xef, 0x2c, 0x33, 0x3c, 0x77, 0x6b, 0x3c, 0x84, 0xbe, 0x19, 0x57,
|
||||||
0x5d, 0xac, 0x66, 0x41, 0x44, 0x9b, 0x39, 0x64, 0x08, 0xc0, 0xc2, 0xa4, 0x00, 0x34, 0xe3, 0x76,
|
0x01, 0xdf, 0x45, 0x72, 0xea, 0x87, 0xa4, 0x9e, 0x83, 0xfa, 0x00, 0x34, 0x88, 0x0d, 0xa1, 0x6e,
|
||||||
0x68, 0xc3, 0x42, 0x7c, 0x70, 0xea, 0x5d, 0x90, 0x8f, 0xbd, 0x47, 0x57, 0x6c, 0xe4, 0x18, 0x0e,
|
0xb7, 0x43, 0x6a, 0x08, 0xc2, 0xe0, 0x54, 0x96, 0x9f, 0xb7, 0xbd, 0x43, 0x96, 0x30, 0x34, 0x84,
|
||||||
0x0c, 0xe4, 0x04, 0x95, 0x8a, 0xf9, 0x54, 0x96, 0x68, 0x5d, 0x83, 0xd6, 0xea, 0x23, 0x1f, 0xc1,
|
0x03, 0x4d, 0x39, 0x66, 0x52, 0x46, 0x7c, 0x22, 0x0a, 0xb6, 0xb6, 0x66, 0x6b, 0xf4, 0xa1, 0x4f,
|
||||||
0x3b, 0x6d, 0xf6, 0x20, 0xf2, 0xb6, 0x4d, 0x85, 0x6b, 0xbc, 0xfe, 0x9f, 0x16, 0xec, 0x35, 0x5a,
|
0xe0, 0x83, 0x26, 0xdc, 0x0f, 0xbd, 0x4d, 0x5d, 0x61, 0x8d, 0x17, 0xff, 0x65, 0xc1, 0x6e, 0x6d,
|
||||||
0xd2, 0xe7, 0x1e, 0x47, 0xc8, 0x55, 0xac, 0x96, 0xc5, 0x5b, 0xad, 0xf6, 0xe4, 0x10, 0x7a, 0x2a,
|
0x49, 0x6a, 0xdf, 0xa3, 0x90, 0x71, 0x19, 0xc9, 0x85, 0xb9, 0xab, 0xa5, 0x8d, 0x0e, 0xa1, 0x23,
|
||||||
0x9e, 0xa3, 0x54, 0x6c, 0x9e, 0x9a, 0xd6, 0xb6, 0x68, 0x6d, 0xd0, 0x5e, 0x53, 0xe3, 0x9b, 0x65,
|
0xa3, 0x19, 0x13, 0x92, 0xce, 0x52, 0xbd, 0xb4, 0x0d, 0x52, 0x01, 0xca, 0xab, 0x6b, 0x7c, 0xbb,
|
||||||
0x8a, 0x45, 0x5b, 0xb5, 0x81, 0xbc, 0x0f, 0x7d, 0x7d, 0xe9, 0xe2, 0x90, 0xa9, 0x58, 0xf0, 0xaf,
|
0x48, 0x99, 0x59, 0x56, 0x05, 0xa0, 0x8f, 0xa0, 0xab, 0x0e, 0x5d, 0x14, 0x50, 0x19, 0x25, 0xfc,
|
||||||
0x70, 0x69, 0xba, 0xe9, 0xd2, 0x35, 0xab, 0x7e, 0x96, 0x12, 0x31, 0x67, 0xed, 0x50, 0xb3, 0xf6,
|
0x6b, 0xb6, 0xd0, 0xab, 0x69, 0x93, 0x6b, 0xa8, 0xba, 0x96, 0x82, 0xb1, 0x5c, 0xb5, 0x43, 0xf4,
|
||||||
0x5f, 0x41, 0x7f, 0x75, 0xf0, 0x64, 0xb4, 0x79, 0x50, 0xce, 0xea, 0x39, 0x68, 0x36, 0xf1, 0x94,
|
0x37, 0x7e, 0x05, 0xdd, 0xe5, 0xc6, 0xa3, 0xc1, 0xea, 0x46, 0x39, 0xcb, 0xfb, 0xa0, 0xd4, 0x44,
|
||||||
0x33, 0xb5, 0xc8, 0xb0, 0x38, 0x86, 0xda, 0xe0, 0x9f, 0xc2, 0x41, 0xdb, 0x51, 0xea, 0xac, 0x8c,
|
0x13, 0x4e, 0xe5, 0x3c, 0x63, 0x66, 0x1b, 0x2a, 0x00, 0x9f, 0xc0, 0x41, 0xd3, 0x56, 0xaa, 0xac,
|
||||||
0xbd, 0x59, 0x41, 0xad, 0x0d, 0xc5, 0x3d, 0xb4, 0xab, 0x7b, 0xf8, 0x23, 0x1c, 0x4c, 0x9a, 0x53,
|
0x8c, 0xbe, 0x59, 0x62, 0xad, 0x00, 0x73, 0x0e, 0xed, 0xf2, 0x1c, 0xfe, 0x08, 0x07, 0xe3, 0x7a,
|
||||||
0x3d, 0x11, 0x5c, 0x69, 0xa9, 0xf9, 0x1c, 0x9c, 0xfc, 0xad, 0x9c, 0x62, 0x82, 0x0a, 0x5b, 0xee,
|
0x57, 0x8f, 0x13, 0x2e, 0xd5, 0xa8, 0xf9, 0x12, 0x9c, 0xfc, 0xae, 0x9c, 0xb0, 0x98, 0x49, 0xd6,
|
||||||
0xe3, 0x59, 0xc3, 0xfd, 0xa2, 0x43, 0x57, 0xc2, 0x9f, 0xed, 0xc0, 0xf6, 0x2f, 0x2c, 0x59, 0xa0,
|
0x70, 0x1e, 0x4f, 0x6b, 0xee, 0x17, 0x2d, 0xb2, 0x14, 0xfe, 0x6c, 0x1b, 0x36, 0x7f, 0xa5, 0xf1,
|
||||||
0x3f, 0x04, 0xa7, 0x19, 0xb8, 0xf1, 0x0e, 0x3e, 0x86, 0x7b, 0x2b, 0xf5, 0x27, 0x9c, 0xa5, 0x72,
|
0x9c, 0xe1, 0x3e, 0x38, 0xf5, 0xc0, 0x95, 0x7b, 0xf0, 0x29, 0xdc, 0x5a, 0xaa, 0x3f, 0xe6, 0x34,
|
||||||
0x26, 0x94, 0xbe, 0x84, 0x91, 0x49, 0x89, 0x82, 0x28, 0xd7, 0x95, 0x1e, 0x6d, 0x58, 0xfc, 0xdf,
|
0x15, 0xd3, 0x44, 0xaa, 0x43, 0x18, 0xea, 0x94, 0xd0, 0x0f, 0xf3, 0xb9, 0xd2, 0x21, 0x35, 0x04,
|
||||||
0x2d, 0x70, 0xca, 0xa4, 0x53, 0xa6, 0x18, 0xf9, 0x04, 0x76, 0xc2, 0x9c, 0x7c, 0xa1, 0x42, 0x0f,
|
0xff, 0x61, 0x81, 0x53, 0x24, 0x9d, 0x50, 0x49, 0xd1, 0x13, 0xd8, 0x0e, 0x72, 0xf1, 0x66, 0x0a,
|
||||||
0xd6, 0x1f, 0xcf, 0x5a, 0x8f, 0xb4, 0x8c, 0xd7, 0x22, 0x2e, 0x8b, 0xba, 0x66, 0x34, 0xab, 0x22,
|
0xdd, 0xbd, 0x7e, 0x79, 0xae, 0xad, 0x91, 0x14, 0xf1, 0x6a, 0x88, 0x0b, 0x53, 0x57, 0xb7, 0x66,
|
||||||
0xde, 0xca, 0x8f, 0x56, 0x19, 0x1f, 0x84, 0xb0, 0x3b, 0xce, 0xb2, 0x13, 0x11, 0xa1, 0x24, 0x7d,
|
0x79, 0x88, 0x37, 0xea, 0x23, 0x65, 0x06, 0xfe, 0xd9, 0x5c, 0xe5, 0xf1, 0xfc, 0x4c, 0x04, 0x59,
|
||||||
0x80, 0x6f, 0x39, 0x5e, 0xa4, 0x18, 0x2a, 0x8c, 0xdc, 0x0e, 0x71, 0x8b, 0xd7, 0xf9, 0x32, 0x96,
|
0x94, 0xaa, 0x63, 0xa0, 0xce, 0xa0, 0x19, 0x5d, 0x85, 0xf8, 0xd2, 0x46, 0x9f, 0xc1, 0x16, 0x0d,
|
||||||
0x32, 0xe6, 0x53, 0xd7, 0x22, 0xfb, 0xc5, 0x5d, 0x1d, 0x5f, 0xc4, 0x52, 0x49, 0xd7, 0x26, 0x77,
|
0x54, 0x94, 0x2e, 0xd6, 0x1d, 0xe2, 0x95, 0x62, 0x35, 0xa6, 0xa7, 0x3a, 0x92, 0x98, 0x8c, 0x7b,
|
||||||
0x61, 0xdf, 0x18, 0xbe, 0x16, 0x2a, 0xe0, 0x27, 0x2c, 0x9c, 0xa1, 0xbb, 0xa5, 0xa3, 0xc6, 0x59,
|
0x01, 0xec, 0x8c, 0xb2, 0xec, 0x38, 0x09, 0x99, 0x40, 0x5d, 0x80, 0xd7, 0x9c, 0x5d, 0xa4, 0x2c,
|
||||||
0x26, 0xb2, 0xb3, 0xf3, 0x73, 0x89, 0xca, 0x8d, 0x8e, 0xff, 0xb2, 0xa1, 0x97, 0x13, 0x59, 0xf2,
|
0x90, 0x2c, 0x74, 0x5b, 0xc8, 0x35, 0xa3, 0xe0, 0x65, 0x24, 0x44, 0xc4, 0x27, 0xae, 0x85, 0xf6,
|
||||||
0x90, 0x9c, 0xc0, 0x6e, 0xa9, 0xab, 0x64, 0xd0, 0x2a, 0xb6, 0x46, 0x75, 0x06, 0xf7, 0xdb, 0x85,
|
0xcd, 0xc5, 0x18, 0x5d, 0x44, 0x42, 0x0a, 0xd7, 0x46, 0x37, 0x61, 0x5f, 0x03, 0xdf, 0x24, 0xd2,
|
||||||
0x38, 0x57, 0x9b, 0xe7, 0x05, 0xa2, 0xd6, 0x2e, 0x72, 0x7f, 0x43, 0x69, 0x6a, 0x61, 0x1c, 0x1c,
|
0xe7, 0xc7, 0x34, 0x98, 0x32, 0x77, 0x43, 0x45, 0x8d, 0xb2, 0x2c, 0xc9, 0x4e, 0xcf, 0xcf, 0x05,
|
||||||
0xb6, 0x3b, 0x37, 0x70, 0x92, 0xa4, 0x0d, 0xa7, 0x12, 0xc1, 0x36, 0x9c, 0x86, 0xfa, 0x51, 0x70,
|
0x93, 0x6e, 0x78, 0xef, 0x09, 0xdc, 0x5e, 0xa3, 0x03, 0xed, 0x41, 0xc7, 0xa0, 0x67, 0xcc, 0x6d,
|
||||||
0xeb, 0x2f, 0xc2, 0x44, 0x65, 0xc8, 0xe6, 0xe4, 0x70, 0xe3, 0xc2, 0x35, 0x3e, 0x17, 0x83, 0x1b,
|
0xa9, 0xd4, 0xd7, 0x5c, 0x94, 0x80, 0x35, 0xfc, 0xdb, 0x86, 0x4e, 0x9e, 0xbb, 0xe0, 0x01, 0x3a,
|
||||||
0xbd, 0x47, 0xd6, 0x63, 0xeb, 0xd9, 0x87, 0x7f, 0x5f, 0x0e, 0xad, 0xb7, 0x97, 0x43, 0xeb, 0xdf,
|
0x86, 0x9d, 0x62, 0xfe, 0xa3, 0x5e, 0xe3, 0xa3, 0xa0, 0xa7, 0x63, 0xef, 0x4e, 0xf3, 0x83, 0x91,
|
||||||
0xcb, 0xa1, 0xf5, 0xc7, 0xd5, 0xb0, 0xf3, 0xf6, 0x6a, 0xd8, 0xf9, 0xe7, 0x6a, 0xd8, 0xf9, 0x7e,
|
0x4f, 0xc5, 0xe7, 0x86, 0x51, 0xcd, 0x58, 0x74, 0x67, 0x65, 0x22, 0x56, 0x03, 0xbc, 0x77, 0xd8,
|
||||||
0x70, 0xfd, 0xff, 0x19, 0xaf, 0x6f, 0x99, 0x3f, 0x4f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x24,
|
0xec, 0x5c, 0xe1, 0x89, 0xe3, 0x26, 0x9e, 0x72, 0x58, 0x37, 0xf1, 0xd4, 0xa6, 0x34, 0x01, 0xb7,
|
||||||
0x0c, 0x9c, 0xee, 0x8c, 0x08, 0x00, 0x00,
|
0x7a, 0xb9, 0xc6, 0x32, 0x63, 0x74, 0x86, 0x0e, 0x57, 0x2e, 0x46, 0xed, 0x59, 0xeb, 0xbd, 0xd3,
|
||||||
|
0x7b, 0x64, 0x3d, 0xb4, 0x9e, 0x7d, 0xfc, 0xcf, 0x65, 0xdf, 0x7a, 0x7b, 0xd9, 0xb7, 0xfe, 0xbb,
|
||||||
|
0xec, 0x5b, 0x7f, 0x5e, 0xf5, 0x5b, 0x6f, 0xaf, 0xfa, 0xad, 0x7f, 0xaf, 0xfa, 0xad, 0xef, 0x7b,
|
||||||
|
0xeb, 0xff, 0x1f, 0x3a, 0xdb, 0xd2, 0x7f, 0x1e, 0xff, 0x1f, 0x00, 0x00, 0xff, 0xff, 0x00, 0xa7,
|
||||||
|
0x75, 0x0a, 0x34, 0x09, 0x00, 0x00,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
|
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
|
||||||
@ -1868,6 +1952,43 @@ func (m *SettingsData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|||||||
return len(dAtA) - i, nil
|
return len(dAtA) - i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) Marshal() (dAtA []byte, err error) {
|
||||||
|
size := m.Size()
|
||||||
|
dAtA = make([]byte, size)
|
||||||
|
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return dAtA[:n], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) MarshalTo(dAtA []byte) (int, error) {
|
||||||
|
size := m.Size()
|
||||||
|
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||||
|
i := len(dAtA)
|
||||||
|
_ = i
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
if m.Action != 0 {
|
||||||
|
i = encodeVarintSpacesync(dAtA, i, uint64(m.Action))
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0x10
|
||||||
|
}
|
||||||
|
if len(m.SpaceIds) > 0 {
|
||||||
|
for iNdEx := len(m.SpaceIds) - 1; iNdEx >= 0; iNdEx-- {
|
||||||
|
i -= len(m.SpaceIds[iNdEx])
|
||||||
|
copy(dAtA[i:], m.SpaceIds[iNdEx])
|
||||||
|
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.SpaceIds[iNdEx])))
|
||||||
|
i--
|
||||||
|
dAtA[i] = 0xa
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len(dAtA) - i, nil
|
||||||
|
}
|
||||||
|
|
||||||
func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int {
|
func encodeVarintSpacesync(dAtA []byte, offset int, v uint64) int {
|
||||||
offset -= sovSpacesync(v)
|
offset -= sovSpacesync(v)
|
||||||
base := offset
|
base := offset
|
||||||
@ -2204,6 +2325,24 @@ func (m *SettingsData) Size() (n int) {
|
|||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *SpaceSubscription) Size() (n int) {
|
||||||
|
if m == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
if len(m.SpaceIds) > 0 {
|
||||||
|
for _, s := range m.SpaceIds {
|
||||||
|
l = len(s)
|
||||||
|
n += 1 + l + sovSpacesync(uint64(l))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.Action != 0 {
|
||||||
|
n += 1 + sovSpacesync(uint64(m.Action))
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
func sovSpacesync(x uint64) (n int) {
|
func sovSpacesync(x uint64) (n int) {
|
||||||
return (math_bits.Len64(x|1) + 6) / 7
|
return (math_bits.Len64(x|1) + 6) / 7
|
||||||
}
|
}
|
||||||
@ -4261,6 +4400,107 @@ func (m *SettingsData) Unmarshal(dAtA []byte) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *SpaceSubscription) Unmarshal(dAtA []byte) error {
|
||||||
|
l := len(dAtA)
|
||||||
|
iNdEx := 0
|
||||||
|
for iNdEx < l {
|
||||||
|
preIndex := iNdEx
|
||||||
|
var wire uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowSpacesync
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
wire |= uint64(b&0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fieldNum := int32(wire >> 3)
|
||||||
|
wireType := int(wire & 0x7)
|
||||||
|
if wireType == 4 {
|
||||||
|
return fmt.Errorf("proto: SpaceSubscription: wiretype end group for non-group")
|
||||||
|
}
|
||||||
|
if fieldNum <= 0 {
|
||||||
|
return fmt.Errorf("proto: SpaceSubscription: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||||
|
}
|
||||||
|
switch fieldNum {
|
||||||
|
case 1:
|
||||||
|
if wireType != 2 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field SpaceIds", wireType)
|
||||||
|
}
|
||||||
|
var stringLen uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowSpacesync
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
stringLen |= uint64(b&0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
intStringLen := int(stringLen)
|
||||||
|
if intStringLen < 0 {
|
||||||
|
return ErrInvalidLengthSpacesync
|
||||||
|
}
|
||||||
|
postIndex := iNdEx + intStringLen
|
||||||
|
if postIndex < 0 {
|
||||||
|
return ErrInvalidLengthSpacesync
|
||||||
|
}
|
||||||
|
if postIndex > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
m.SpaceIds = append(m.SpaceIds, string(dAtA[iNdEx:postIndex]))
|
||||||
|
iNdEx = postIndex
|
||||||
|
case 2:
|
||||||
|
if wireType != 0 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType)
|
||||||
|
}
|
||||||
|
m.Action = 0
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowSpacesync
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
m.Action |= SpaceSubscriptionAction(b&0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
iNdEx = preIndex
|
||||||
|
skippy, err := skipSpacesync(dAtA[iNdEx:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||||
|
return ErrInvalidLengthSpacesync
|
||||||
|
}
|
||||||
|
if (iNdEx + skippy) > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
iNdEx += skippy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if iNdEx > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func skipSpacesync(dAtA []byte) (n int, err error) {
|
func skipSpacesync(dAtA []byte) (n int, err error) {
|
||||||
l := len(dAtA)
|
l := len(dAtA)
|
||||||
iNdEx := 0
|
iNdEx := 0
|
||||||
|
|||||||
@ -74,11 +74,11 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
|
|||||||
conn drpc.Conn
|
conn drpc.Conn
|
||||||
sc sec.SecureConn
|
sc sec.SecureConn
|
||||||
)
|
)
|
||||||
log.Warn("dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
conn, sc, err = d.handshake(ctx, addr)
|
conn, sc, err = d.handshake(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("can't connect to host", zap.String("addr", addr), zap.Error(err))
|
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|||||||
17
net/streampool/context.go
Normal file
17
net/streampool/context.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package streampool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type streamCtxKey uint
|
||||||
|
|
||||||
|
const (
|
||||||
|
streamCtxKeyStreamId streamCtxKey = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
func streamCtx(ctx context.Context, streamId uint32, peerId string) context.Context {
|
||||||
|
ctx = peer.CtxWithPeerId(ctx, peerId)
|
||||||
|
return context.WithValue(ctx, streamCtxKeyStreamId, streamId)
|
||||||
|
}
|
||||||
@ -1,6 +1,8 @@
|
|||||||
package streampool
|
package streampool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -12,7 +14,7 @@ type stream struct {
|
|||||||
pool *streamPool
|
pool *streamPool
|
||||||
streamId uint32
|
streamId uint32
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
l *zap.Logger
|
l logger.CtxLogger
|
||||||
tags []string
|
tags []string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,7 +36,9 @@ func (sr *stream) readLoop() error {
|
|||||||
sr.l.Info("msg receive error", zap.Error(err))
|
sr.l.Info("msg receive error", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := sr.pool.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil {
|
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
|
||||||
|
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId))
|
||||||
|
if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil {
|
||||||
sr.l.Info("msg handle error", zap.Error(err))
|
sr.l.Info("msg handle error", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package streampool
|
package streampool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
"github.com/anytypeio/any-sync/net/pool"
|
"github.com/anytypeio/any-sync/net/pool"
|
||||||
"github.com/cheggaaa/mb/v3"
|
"github.com/cheggaaa/mb/v3"
|
||||||
@ -33,6 +34,10 @@ type StreamPool interface {
|
|||||||
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
||||||
// Broadcast sends a message to all peers with given tags. Works async.
|
// Broadcast sends a message to all peers with given tags. Works async.
|
||||||
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
|
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
|
||||||
|
// AddTagsCtx adds tags to stream, stream will be extracted from ctx
|
||||||
|
AddTagsCtx(ctx context.Context, tags ...string) error
|
||||||
|
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
|
||||||
|
RemoveTagsCtx(ctx context.Context, tags ...string) error
|
||||||
// Close closes all streams
|
// Close closes all streams
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
@ -142,7 +147,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message)
|
|||||||
}
|
}
|
||||||
for _, st := range streams {
|
for _, st := range streams {
|
||||||
if err = st.write(msg); err != nil {
|
if err = st.write(msg); err != nil {
|
||||||
st.l.Info("sendOne write error", zap.Error(err), zap.Int("streams", len(streams)))
|
st.l.InfoCtx(ctx, "sendOne write error", zap.Error(err), zap.Int("streams", len(streams)))
|
||||||
// continue with next stream
|
// continue with next stream
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
@ -224,7 +229,7 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
|||||||
for _, st := range streams {
|
for _, st := range streams {
|
||||||
funcs = append(funcs, func() {
|
funcs = append(funcs, func() {
|
||||||
if e := st.write(msg); e != nil {
|
if e := st.write(msg); e != nil {
|
||||||
log.Debug("broadcast write error", zap.Error(e))
|
log.DebugCtx(ctx, "broadcast write error", zap.Error(e))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -234,6 +239,58 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
|||||||
return s.exec.Add(ctx, funcs...)
|
return s.exec.Add(ctx, funcs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *streamPool) AddTagsCtx(ctx context.Context, tags ...string) error {
|
||||||
|
streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("context without streamId")
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
st, ok := s.streams[streamId]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("stream not found")
|
||||||
|
}
|
||||||
|
var newTags = make([]string, 0, len(tags))
|
||||||
|
for _, newTag := range tags {
|
||||||
|
if !slices.Contains(st.tags, newTag) {
|
||||||
|
newTags = append(newTags, newTag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
st.tags = append(st.tags, newTags...)
|
||||||
|
for _, newTag := range tags {
|
||||||
|
s.streamIdsByTag[newTag] = append(s.streamIdsByTag[newTag], streamId)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamPool) RemoveTagsCtx(ctx context.Context, tags ...string) error {
|
||||||
|
streamId, ok := ctx.Value(streamCtxKeyStreamId).(uint32)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("context without streamId")
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
st, ok := s.streams[streamId]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("stream not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
var filtered = st.tags[:0]
|
||||||
|
var toRemove = make([]string, 0, len(tags))
|
||||||
|
for _, t := range st.tags {
|
||||||
|
if slices.Contains(tags, t) {
|
||||||
|
toRemove = append(toRemove, t)
|
||||||
|
} else {
|
||||||
|
filtered = append(filtered, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
st.tags = filtered
|
||||||
|
for _, t := range toRemove {
|
||||||
|
removeStream(s.streamIdsByTag, t, streamId)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *streamPool) removeStream(streamId uint32) {
|
func (s *streamPool) removeStream(streamId uint32) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
@ -242,23 +299,9 @@ func (s *streamPool) removeStream(streamId uint32) {
|
|||||||
log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId))
|
log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId))
|
||||||
}
|
}
|
||||||
|
|
||||||
var removeStream = func(m map[string][]uint32, key string) {
|
removeStream(s.streamIdsByPeer, st.peerId, streamId)
|
||||||
streamIds := m[key]
|
|
||||||
idx := slices.Index(streamIds, streamId)
|
|
||||||
if idx == -1 {
|
|
||||||
log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId))
|
|
||||||
}
|
|
||||||
streamIds = slices.Delete(streamIds, idx, idx+1)
|
|
||||||
if len(streamIds) == 0 {
|
|
||||||
delete(m, key)
|
|
||||||
} else {
|
|
||||||
m[key] = streamIds
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
removeStream(s.streamIdsByPeer, st.peerId)
|
|
||||||
for _, tag := range st.tags {
|
for _, tag := range st.tags {
|
||||||
removeStream(s.streamIdsByTag, tag)
|
removeStream(s.streamIdsByTag, tag, streamId)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.streams, streamId)
|
delete(s.streams, streamId)
|
||||||
@ -279,8 +322,8 @@ func (s *streamPool) handleMessageLoop() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = s.handler.HandleMessage(context.Background(), hm.peerId, hm.msg); err != nil {
|
if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil {
|
||||||
log.Warn("handle message error", zap.Error(err))
|
log.WarnCtx(hm.ctx, "handle message error", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -288,3 +331,17 @@ func (s *streamPool) handleMessageLoop() {
|
|||||||
func (s *streamPool) Close() (err error) {
|
func (s *streamPool) Close() (err error) {
|
||||||
return s.exec.Close()
|
return s.exec.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func removeStream(m map[string][]uint32, key string, streamId uint32) {
|
||||||
|
streamIds := m[key]
|
||||||
|
idx := slices.Index(streamIds, streamId)
|
||||||
|
if idx == -1 {
|
||||||
|
log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId))
|
||||||
|
}
|
||||||
|
streamIds = slices.Delete(streamIds, idx, idx+1)
|
||||||
|
if len(streamIds) == 0 {
|
||||||
|
delete(m, key)
|
||||||
|
} else {
|
||||||
|
m[key] = streamIds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -165,6 +165,28 @@ func TestStreamPool_SendById(t *testing.T) {
|
|||||||
assert.Equal(t, "test", msg.ReqData)
|
assert.Equal(t, "test", msg.ReqData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStreamPool_Tags(t *testing.T) {
|
||||||
|
fx := newFixture(t)
|
||||||
|
defer fx.Finish(t)
|
||||||
|
|
||||||
|
s1, _ := newClientStream(t, fx, "p1")
|
||||||
|
defer s1.Close()
|
||||||
|
fx.AddStream("p1", s1, "t1")
|
||||||
|
|
||||||
|
s2, _ := newClientStream(t, fx, "p2")
|
||||||
|
defer s1.Close()
|
||||||
|
fx.AddStream("p2", s2, "t2")
|
||||||
|
|
||||||
|
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, []uint32{1}, fx.StreamPool.(*streamPool).streamIdsByTag["t3"])
|
||||||
|
|
||||||
|
err = fx.RemoveTagsCtx(streamCtx(ctx, 2, "p2"), "t2")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, fx.StreamPool.(*streamPool).streamIdsByTag["t2"], 0)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func newFixture(t *testing.T) *fixture {
|
func newFixture(t *testing.T) *fixture {
|
||||||
fx := &fixture{}
|
fx := &fixture{}
|
||||||
ts := rpctest.NewTestServer()
|
ts := rpctest.NewTestServer()
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package periodicsync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -14,8 +15,9 @@ type PeriodicSync interface {
|
|||||||
|
|
||||||
type SyncerFunc func(ctx context.Context) error
|
type SyncerFunc func(ctx context.Context) error
|
||||||
|
|
||||||
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l *zap.Logger) PeriodicSync {
|
func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc, l logger.CtxLogger) PeriodicSync {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "periodicSync"))
|
||||||
return &periodicSync{
|
return &periodicSync{
|
||||||
syncer: syncer,
|
syncer: syncer,
|
||||||
log: l,
|
log: l,
|
||||||
@ -28,7 +30,7 @@ func NewPeriodicSync(periodSeconds int, timeout time.Duration, syncer SyncerFunc
|
|||||||
}
|
}
|
||||||
|
|
||||||
type periodicSync struct {
|
type periodicSync struct {
|
||||||
log *zap.Logger
|
log logger.CtxLogger
|
||||||
syncer SyncerFunc
|
syncer SyncerFunc
|
||||||
syncCtx context.Context
|
syncCtx context.Context
|
||||||
syncCancel context.CancelFunc
|
syncCancel context.CancelFunc
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user