Fix putTree close and handshake params and add logs for trees used debug

This commit is contained in:
mcrakhman 2023-02-21 16:36:58 +01:00 committed by Mikhail Iudin
parent d6737cc547
commit aa6c2e7ce4
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
8 changed files with 33 additions and 17 deletions

View File

@ -178,6 +178,7 @@ Load:
if closing { if closing {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object close", zap.String("id", id))
return nil, ctx.Err() return nil, ctx.Err()
case <-e.close: case <-e.close:
goto Load goto Load
@ -196,6 +197,7 @@ Load:
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object load", zap.String("id", id))
return nil, ctx.Err() return nil, ctx.Err()
case <-e.load: case <-e.load:
} }

View File

@ -85,7 +85,7 @@ func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []*
} }
proposedHeads = append(proposedHeads, heads...) proposedHeads = append(proposedHeads, heads...)
log.With(zap.Strings("heads", proposedHeads)).Debug("building tree") log.With(zap.Strings("heads", proposedHeads), zap.String("id", tb.treeStorage.Id())).Debug("building tree")
if err = tb.buildTree(proposedHeads, breakpoint); err != nil { if err = tb.buildTree(proposedHeads, breakpoint); err != nil {
return nil, fmt.Errorf("buildTree error: %v", err) return nil, fmt.Errorf("buildTree error: %v", err)
} }

View File

@ -109,7 +109,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
listener: deps.Listener, listener: deps.Listener,
syncStatus: deps.SyncStatus, syncStatus: deps.SyncStatus,
} }
syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.SyncStatus) syncHandler := newSyncTreeHandler(deps.SpaceId, syncTree, syncClient, deps.SyncStatus)
syncTree.SyncHandler = syncHandler syncTree.SyncHandler = syncHandler
t = syncTree t = syncTree
syncTree.Lock() syncTree.Lock()
@ -191,7 +191,10 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
} }
func (s *syncTree) Delete() (err error) { func (s *syncTree) Delete() (err error) {
log.With(zap.String("id", s.Id())).Debug("deleting sync tree") log.Debug("deleting sync tree", zap.String("id", s.Id()))
defer func() {
log.Debug("deleted sync tree", zap.Error(err), zap.String("id", s.Id()))
}()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if err = s.checkAlive(); err != nil { if err = s.checkAlive(); err != nil {
@ -206,7 +209,10 @@ func (s *syncTree) Delete() (err error) {
} }
func (s *syncTree) Close() (err error) { func (s *syncTree) Close() (err error) {
log.With(zap.String("id", s.Id())).Debug("closing sync tree") log.Debug("closing sync tree", zap.String("id", s.Id()))
defer func() {
log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id()))
}()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if s.isClosed { if s.isClosed {

View File

@ -18,16 +18,18 @@ type syncTreeHandler struct {
syncClient SyncClient syncClient SyncClient
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
handlerLock sync.Mutex handlerLock sync.Mutex
spaceId string
queue ReceiveQueue queue ReceiveQueue
} }
const maxQueueSize = 5 const maxQueueSize = 5
func newSyncTreeHandler(objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{ return &syncTreeHandler{
objTree: objTree, objTree: objTree,
syncClient: syncClient, syncClient: syncClient,
syncStatus: syncStatus, syncStatus: syncStatus,
spaceId: spaceId,
queue: newReceiveQueue(maxQueueSize), queue: newReceiveQueue(maxQueueSize),
} }
} }
@ -38,7 +40,6 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
if err != nil { if err != nil {
return return
} }
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId) queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId)
@ -82,7 +83,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
objTree = s.objTree objTree = s.objTree
) )
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id())) log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()), zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received head update message") log.DebugCtx(ctx, "received head update message")
defer func() { defer func() {
@ -99,7 +100,6 @@ func (s *syncTreeHandler) handleHeadUpdate(
// isEmptyUpdate is sent when the tree is brought up from cache // isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate { if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
if headEquals { if headEquals {
@ -150,7 +150,11 @@ func (s *syncTreeHandler) handleFullSyncRequest(
objTree = s.objTree objTree = s.objTree
) )
log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId)) log := log.With(zap.String("senderId", senderId),
zap.Strings("heads", request.Heads),
zap.String("treeId", s.objTree.Id()),
zap.String("replyId", replyId),
zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received full sync request message") log.DebugCtx(ctx, "received full sync request message")
defer func() { defer func() {
@ -188,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncResponse(
var ( var (
objTree = s.objTree objTree = s.objTree
) )
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id())) log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()), zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received full sync response message") log.DebugCtx(ctx, "received full sync response message")
defer func() { defer func() {

View File

@ -73,7 +73,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
delete(s.waiters, msg.RequestId) delete(s.waiters, msg.RequestId)
s.waitersMx.Unlock() s.waitersMx.Unlock()
log.With(zap.String("requestId", msg.RequestId)).WarnCtx(ctx, "time elapsed when waiting") log.With(zap.String("requestId", msg.RequestId)).DebugCtx(ctx, "time elapsed when waiting")
err = fmt.Errorf("sendSync context error: %v", ctx.Err()) err = fmt.Errorf("sendSync context error: %v", ctx.Err())
case reply = <-waiter.ch: case reply = <-waiter.ch:
// success // success

View File

@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
Listener: listener, Listener: listener,
AclList: s.aclList, AclList: s.aclList,
SpaceStorage: s.storage, SpaceStorage: s.storage,
OnClose: func(id string) {}, OnClose: s.onObjectClose,
SyncStatus: s.syncStatus, SyncStatus: s.syncStatus,
PeerGetter: s.peerManager, PeerGetter: s.peerManager,
} }
@ -334,6 +334,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
return nil, err return nil, err
} }
log.Debug("incrementing counter", zap.String("id", id), zap.String("spaceId", s.id))
s.treesUsed.Add(1) s.treesUsed.Add(1)
return return
} }
@ -401,6 +402,7 @@ func (s *space) handleMessage(msg HandleMessage) {
} }
func (s *space) onObjectClose(id string) { func (s *space) onObjectClose(id string) {
log.Debug("decrementing counter", zap.String("id", id), zap.String("spaceId", s.id))
s.treesUsed.Add(-1) s.treesUsed.Add(-1)
_ = s.handleQueue.CloseThread(id) _ = s.handleQueue.CloseThread(id)
} }

View File

@ -32,6 +32,7 @@ type Params struct {
ListenAddrs []string ListenAddrs []string
Wrapper DRPCHandlerWrapper Wrapper DRPCHandlerWrapper
TimeoutMillis int TimeoutMillis int
Handshake func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error)
} }
func NewBaseDrpcServer() *BaseDrpcServer { func NewBaseDrpcServer() *BaseDrpcServer {
@ -42,6 +43,7 @@ func (s *BaseDrpcServer) Run(ctx context.Context, params Params) (err error) {
s.drpcServer = drpcserver.NewWithOptions(params.Wrapper(s.Mux), drpcserver.Options{Manager: drpcmanager.Options{ s.drpcServer = drpcserver.NewWithOptions(params.Wrapper(s.Mux), drpcserver.Options{Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{MaximumBufferSize: params.BufferSizeMb * (1 << 20)}, Reader: drpcwire.ReaderOptions{MaximumBufferSize: params.BufferSizeMb * (1 << 20)},
}}) }})
s.handshake = params.Handshake
ctx, s.cancel = context.WithCancel(ctx) ctx, s.cancel = context.WithCancel(ctx)
for _, addr := range params.ListenAddrs { for _, addr := range params.ListenAddrs {
list, err := net.Listen("tcp", addr) list, err := net.Listen("tcp", addr)

View File

@ -70,11 +70,11 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
SummaryVec: histVec, SummaryVec: histVec,
} }
}, },
} Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
s.handshake = func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
return s.transport.SecureInbound(ctx, conn) return s.transport.SecureInbound(ctx, conn)
},
} }
return s.BaseDrpcServer.Run(ctx, params) return s.BaseDrpcServer.Run(ctx, params)
} }