Compare commits
4 Commits
main
...
release-fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
959b1307c6 | ||
|
|
a35d94a20a | ||
|
|
7049a884a7 | ||
|
|
21a9ce6035 |
@ -26,6 +26,8 @@ type DiffSyncer interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const logPeriodSecs = 200
|
||||||
|
|
||||||
func newDiffSyncer(hs *headSync) DiffSyncer {
|
func newDiffSyncer(hs *headSync) DiffSyncer {
|
||||||
return &diffSyncer{
|
return &diffSyncer{
|
||||||
diff: hs.diff,
|
diff: hs.diff,
|
||||||
@ -35,7 +37,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
|
|||||||
peerManager: hs.peerManager,
|
peerManager: hs.peerManager,
|
||||||
clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient),
|
clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient),
|
||||||
credentialProvider: hs.credentialProvider,
|
credentialProvider: hs.credentialProvider,
|
||||||
log: log,
|
log: newSyncLogger(hs.log, logPeriodSecs),
|
||||||
syncStatus: hs.syncStatus,
|
syncStatus: hs.syncStatus,
|
||||||
deletionState: hs.deletionState,
|
deletionState: hs.deletionState,
|
||||||
}
|
}
|
||||||
@ -48,7 +50,7 @@ type diffSyncer struct {
|
|||||||
treeManager treemanager.TreeManager
|
treeManager treemanager.TreeManager
|
||||||
storage spacestorage.SpaceStorage
|
storage spacestorage.SpaceStorage
|
||||||
clientFactory spacesyncproto.ClientFactory
|
clientFactory spacesyncproto.ClientFactory
|
||||||
log logger.CtxLogger
|
log syncLogger
|
||||||
deletionState deletionstate.ObjectDeletionState
|
deletionState deletionstate.ObjectDeletionState
|
||||||
credentialProvider credentialprovider.CredentialProvider
|
credentialProvider credentialprovider.CredentialProvider
|
||||||
syncStatus syncstatus.StatusUpdater
|
syncStatus syncstatus.StatusUpdater
|
||||||
@ -100,7 +102,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
|||||||
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.log.InfoCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,12 +147,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
d.log.logSyncDone(p.Id(), len(newIds), len(changedIds), len(removedIds), totalLen-len(existingIds)-len(missingIds))
|
||||||
zap.Int("changedIds", len(changedIds)),
|
|
||||||
zap.Int("removedIds", len(removedIds)),
|
|
||||||
zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
|
|
||||||
zap.String("peerId", p.Id()),
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/anyproto/any-sync/app"
|
"github.com/anyproto/any-sync/app"
|
||||||
"github.com/anyproto/any-sync/app/ldiff"
|
"github.com/anyproto/any-sync/app/ldiff"
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
config2 "github.com/anyproto/any-sync/commonspace/config"
|
"github.com/anyproto/any-sync/commonspace/config"
|
||||||
"github.com/anyproto/any-sync/commonspace/credentialprovider"
|
"github.com/anyproto/any-sync/commonspace/credentialprovider"
|
||||||
"github.com/anyproto/any-sync/commonspace/deletionstate"
|
"github.com/anyproto/any-sync/commonspace/deletionstate"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
||||||
@ -70,7 +70,7 @@ var createDiffSyncer = newDiffSyncer
|
|||||||
|
|
||||||
func (h *headSync) Init(a *app.App) (err error) {
|
func (h *headSync) Init(a *app.App) (err error) {
|
||||||
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
||||||
cfg := a.MustComponent("config").(config2.ConfigGetter)
|
cfg := a.MustComponent("config").(config.ConfigGetter)
|
||||||
h.spaceId = shared.SpaceId
|
h.spaceId = shared.SpaceId
|
||||||
h.spaceIsDeleted = shared.SpaceIsDeleted
|
h.spaceIsDeleted = shared.SpaceIsDeleted
|
||||||
h.syncPeriod = cfg.GetSpace().SyncPeriod
|
h.syncPeriod = cfg.GetSpace().SyncPeriod
|
||||||
|
|||||||
41
commonspace/headsync/synclogger.go
Normal file
41
commonspace/headsync/synclogger.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package headsync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type syncLogger struct {
|
||||||
|
lastLogged map[string]time.Time
|
||||||
|
logInterval time.Duration
|
||||||
|
logger.CtxLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncLogger(log logger.CtxLogger, syncLogPeriodSecs int) syncLogger {
|
||||||
|
return syncLogger{
|
||||||
|
lastLogged: map[string]time.Time{},
|
||||||
|
logInterval: time.Duration(syncLogPeriodSecs) * time.Second,
|
||||||
|
CtxLogger: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s syncLogger) logSyncDone(peerId string, newIds, changedIds, removedIds, deltedIds int) {
|
||||||
|
now := time.Now()
|
||||||
|
differentIds := newIds + changedIds + removedIds + deltedIds
|
||||||
|
// always logging if some ids are different or there are no log interval
|
||||||
|
if differentIds == 0 && s.logInterval > 0 {
|
||||||
|
lastLogged := s.lastLogged[peerId]
|
||||||
|
if now.Before(lastLogged.Add(s.logInterval)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.lastLogged[peerId] = now
|
||||||
|
s.Info("sync done:", zap.Int("newIds", newIds),
|
||||||
|
zap.Int("changedIds", changedIds),
|
||||||
|
zap.Int("removedIds", removedIds),
|
||||||
|
zap.Int("already deleted ids", deltedIds),
|
||||||
|
zap.String("peerId", peerId),
|
||||||
|
)
|
||||||
|
}
|
||||||
@ -106,10 +106,10 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
|||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
if len(p.inactive) == 0 {
|
if len(p.inactive) == 0 {
|
||||||
wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load()))
|
wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load()))
|
||||||
|
p.openingWaitCount.Add(1)
|
||||||
|
defer p.openingWaitCount.Add(-1)
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
if wait != nil {
|
if wait != nil {
|
||||||
p.openingWaitCount.Add(1)
|
|
||||||
defer p.openingWaitCount.Add(-1)
|
|
||||||
// throttle new connection opening
|
// throttle new connection opening
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user