Compare commits

...

4 Commits

Author SHA1 Message Date
mcrakhman
959b1307c6
Remove from config 2023-07-17 14:48:05 +02:00
mcrakhman
a35d94a20a
Add sync logger to reduce number of logs 2023-07-17 13:50:49 +02:00
Mikhail Rakhmanov
7049a884a7
Merge pull request #48 from anyproto/fix-open-requests
Fix openingWaitCount logic
2023-07-14 16:25:31 +02:00
mcrakhman
21a9ce6035
Fix openingWaitCount logic 2023-07-14 16:19:38 +02:00
4 changed files with 51 additions and 13 deletions

View File

@ -26,6 +26,8 @@ type DiffSyncer interface {
Close() error
}
const logPeriodSecs = 200
func newDiffSyncer(hs *headSync) DiffSyncer {
return &diffSyncer{
diff: hs.diff,
@ -35,7 +37,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
peerManager: hs.peerManager,
clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient),
credentialProvider: hs.credentialProvider,
log: log,
log: newSyncLogger(hs.log, logPeriodSecs),
syncStatus: hs.syncStatus,
deletionState: hs.deletionState,
}
@ -48,7 +50,7 @@ type diffSyncer struct {
treeManager treemanager.TreeManager
storage spacestorage.SpaceStorage
clientFactory spacesyncproto.ClientFactory
log logger.CtxLogger
log syncLogger
deletionState deletionstate.ObjectDeletionState
credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusUpdater
@ -100,7 +102,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
}
}
d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
return nil
}
@ -145,12 +147,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
if err != nil {
return err
}
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)),
zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
zap.String("peerId", p.Id()),
)
d.log.logSyncDone(p.Id(), len(newIds), len(changedIds), len(removedIds), totalLen-len(existingIds)-len(missingIds))
return
}

View File

@ -6,7 +6,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
config2 "github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
@ -70,7 +70,7 @@ var createDiffSyncer = newDiffSyncer
func (h *headSync) Init(a *app.App) (err error) {
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
cfg := a.MustComponent("config").(config2.ConfigGetter)
cfg := a.MustComponent("config").(config.ConfigGetter)
h.spaceId = shared.SpaceId
h.spaceIsDeleted = shared.SpaceIsDeleted
h.syncPeriod = cfg.GetSpace().SyncPeriod

View File

@ -0,0 +1,41 @@
package headsync
import (
"time"
"github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap"
)
type syncLogger struct {
lastLogged map[string]time.Time
logInterval time.Duration
logger.CtxLogger
}
func newSyncLogger(log logger.CtxLogger, syncLogPeriodSecs int) syncLogger {
return syncLogger{
lastLogged: map[string]time.Time{},
logInterval: time.Duration(syncLogPeriodSecs) * time.Second,
CtxLogger: log,
}
}
func (s syncLogger) logSyncDone(peerId string, newIds, changedIds, removedIds, deltedIds int) {
now := time.Now()
differentIds := newIds + changedIds + removedIds + deltedIds
// always logging if some ids are different or there are no log interval
if differentIds == 0 && s.logInterval > 0 {
lastLogged := s.lastLogged[peerId]
if now.Before(lastLogged.Add(s.logInterval)) {
return
}
}
s.lastLogged[peerId] = now
s.Info("sync done:", zap.Int("newIds", newIds),
zap.Int("changedIds", changedIds),
zap.Int("removedIds", removedIds),
zap.Int("already deleted ids", deltedIds),
zap.String("peerId", peerId),
)
}

View File

@ -106,10 +106,10 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
p.mu.Lock()
if len(p.inactive) == 0 {
wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load()))
p.openingWaitCount.Add(1)
defer p.openingWaitCount.Add(-1)
p.mu.Unlock()
if wait != nil {
p.openingWaitCount.Add(1)
defer p.openingWaitCount.Add(-1)
// throttle new connection opening
select {
case <-ctx.Done():