Compare commits
4 Commits
main
...
release-fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
959b1307c6 | ||
|
|
a35d94a20a | ||
|
|
7049a884a7 | ||
|
|
21a9ce6035 |
@ -26,6 +26,8 @@ type DiffSyncer interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
const logPeriodSecs = 200
|
||||
|
||||
func newDiffSyncer(hs *headSync) DiffSyncer {
|
||||
return &diffSyncer{
|
||||
diff: hs.diff,
|
||||
@ -35,7 +37,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
|
||||
peerManager: hs.peerManager,
|
||||
clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient),
|
||||
credentialProvider: hs.credentialProvider,
|
||||
log: log,
|
||||
log: newSyncLogger(hs.log, logPeriodSecs),
|
||||
syncStatus: hs.syncStatus,
|
||||
deletionState: hs.deletionState,
|
||||
}
|
||||
@ -48,7 +50,7 @@ type diffSyncer struct {
|
||||
treeManager treemanager.TreeManager
|
||||
storage spacestorage.SpaceStorage
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
log logger.CtxLogger
|
||||
log syncLogger
|
||||
deletionState deletionstate.ObjectDeletionState
|
||||
credentialProvider credentialprovider.CredentialProvider
|
||||
syncStatus syncstatus.StatusUpdater
|
||||
@ -100,7 +102,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -145,12 +147,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||
zap.Int("changedIds", len(changedIds)),
|
||||
zap.Int("removedIds", len(removedIds)),
|
||||
zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
|
||||
zap.String("peerId", p.Id()),
|
||||
)
|
||||
d.log.logSyncDone(p.Id(), len(newIds), len(changedIds), len(removedIds), totalLen-len(existingIds)-len(missingIds))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ import (
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/ldiff"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
config2 "github.com/anyproto/any-sync/commonspace/config"
|
||||
"github.com/anyproto/any-sync/commonspace/config"
|
||||
"github.com/anyproto/any-sync/commonspace/credentialprovider"
|
||||
"github.com/anyproto/any-sync/commonspace/deletionstate"
|
||||
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
||||
@ -70,7 +70,7 @@ var createDiffSyncer = newDiffSyncer
|
||||
|
||||
func (h *headSync) Init(a *app.App) (err error) {
|
||||
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
||||
cfg := a.MustComponent("config").(config2.ConfigGetter)
|
||||
cfg := a.MustComponent("config").(config.ConfigGetter)
|
||||
h.spaceId = shared.SpaceId
|
||||
h.spaceIsDeleted = shared.SpaceIsDeleted
|
||||
h.syncPeriod = cfg.GetSpace().SyncPeriod
|
||||
|
||||
41
commonspace/headsync/synclogger.go
Normal file
41
commonspace/headsync/synclogger.go
Normal file
@ -0,0 +1,41 @@
|
||||
package headsync
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type syncLogger struct {
|
||||
lastLogged map[string]time.Time
|
||||
logInterval time.Duration
|
||||
logger.CtxLogger
|
||||
}
|
||||
|
||||
func newSyncLogger(log logger.CtxLogger, syncLogPeriodSecs int) syncLogger {
|
||||
return syncLogger{
|
||||
lastLogged: map[string]time.Time{},
|
||||
logInterval: time.Duration(syncLogPeriodSecs) * time.Second,
|
||||
CtxLogger: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (s syncLogger) logSyncDone(peerId string, newIds, changedIds, removedIds, deltedIds int) {
|
||||
now := time.Now()
|
||||
differentIds := newIds + changedIds + removedIds + deltedIds
|
||||
// always logging if some ids are different or there are no log interval
|
||||
if differentIds == 0 && s.logInterval > 0 {
|
||||
lastLogged := s.lastLogged[peerId]
|
||||
if now.Before(lastLogged.Add(s.logInterval)) {
|
||||
return
|
||||
}
|
||||
}
|
||||
s.lastLogged[peerId] = now
|
||||
s.Info("sync done:", zap.Int("newIds", newIds),
|
||||
zap.Int("changedIds", changedIds),
|
||||
zap.Int("removedIds", removedIds),
|
||||
zap.Int("already deleted ids", deltedIds),
|
||||
zap.String("peerId", peerId),
|
||||
)
|
||||
}
|
||||
@ -106,10 +106,10 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
||||
p.mu.Lock()
|
||||
if len(p.inactive) == 0 {
|
||||
wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load()))
|
||||
p.openingWaitCount.Add(1)
|
||||
defer p.openingWaitCount.Add(-1)
|
||||
p.mu.Unlock()
|
||||
if wait != nil {
|
||||
p.openingWaitCount.Add(1)
|
||||
defer p.openingWaitCount.Add(-1)
|
||||
// throttle new connection opening
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user