debug/fixes

This commit is contained in:
Sergey Cherepanov 2023-01-25 23:35:20 +03:00
parent ddd20ae5b5
commit 6cb696d508
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
8 changed files with 18 additions and 50 deletions

View File

@ -5,7 +5,6 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
"golang.org/x/exp/slices"
)
type commonGetter struct {
@ -22,9 +21,6 @@ func newCommonGetter(spaceId string, getter treegetter.TreeGetter) *commonGetter
}
func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) {
if object == nil {
panic("nil object")
}
c.reservedObjects = append(c.reservedObjects, object)
}
@ -36,13 +32,12 @@ func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (obj
}
func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject {
pos := slices.IndexFunc(c.reservedObjects, func(object syncobjectgetter.SyncObject) bool {
return object.Id() == id
})
if pos == -1 {
return nil
for _, obj := range c.reservedObjects {
if obj != nil && obj.Id() == id {
return obj
}
return c.reservedObjects[pos]
}
return nil
}
func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) {

View File

@ -58,7 +58,7 @@ func NewHeadSync(
l := log.With(zap.String("spaceId", spaceId))
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncer := newDiffSyncer(spaceId, diff, confConnector, cache, storage, factory, syncStatus, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, syncer.Sync, l)
periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute*10, syncer.Sync, l)
return &headSync{
spaceId: spaceId,

View File

@ -3,6 +3,7 @@ package synctree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/mock_synctree"
@ -70,7 +71,7 @@ func (fx *syncHandlerFixture) stop() {
func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
ctx := context.Background()
log = zap.NewNop().Sugar()
log = logger.CtxLogger{Logger: zap.NewNop()}
t.Run("head update non empty all heads added", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
@ -207,7 +208,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
ctx := context.Background()
log = zap.NewNop().Sugar()
log = logger.CtxLogger{Logger: zap.NewNop()}
t.Run("full sync request with change", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
@ -338,7 +339,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
func TestSyncHandler_HandleFullSyncResponse(t *testing.T) {
ctx := context.Background()
log = zap.NewNop().Sugar()
log = logger.CtxLogger{Logger: zap.NewNop()}
t.Run("full sync response with change", func(t *testing.T) {
fx := newSyncHandlerFixture(t)

View File

@ -2,6 +2,7 @@ package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -78,7 +79,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).InfoCtx(ctx, "time elapsed when waiting")
err = ctx.Err()
err = fmt.Errorf("sendSync context error: %v", ctx.Err())
case reply = <-waiter.ch:
// success
}

View File

@ -192,13 +192,13 @@ func (s *space) Init(ctx context.Context) (err error) {
DeletionState: deletionState,
}
s.settingsObject = settings.NewSettingsObject(deps, s.id)
s.cache.AddObject(s.settingsObject)
s.objectSync.Init()
s.headSync.Init(initialIds, deletionState)
err = s.settingsObject.Init(ctx)
if err != nil {
return
}
s.cache.AddObject(s.settingsObject)
s.syncStatus.Run()
return nil

View File

@ -2,12 +2,15 @@ package streampool
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/logger"
"go.uber.org/zap"
"storj.io/drpc"
"sync/atomic"
)
var msgCounter atomic.Uint32
type stream struct {
peerId string
stream drpc.Stream
@ -37,8 +40,8 @@ func (sr *stream) readLoop() error {
return err
}
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId)
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "streamMessage"), zap.String("peerId", sr.peerId))
if err := sr.pool.HandleMessage(ctx, sr.peerId, msg); err != nil {
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", fmt.Sprintf("streamMsg.%d", msgCounter.Add(1))), zap.String("peerId", sr.peerId))
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
sr.l.Info("msg handle error", zap.Error(err))
return err
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/net/context"
@ -49,7 +48,6 @@ type streamPool struct {
streams map[uint32]*stream
opening map[string]*openingProcess
exec *sendPool
handleQueue *mb.MB[handleMessage]
mu sync.RWMutex
lastStreamId uint32
}
@ -64,13 +62,6 @@ type handleMessage struct {
peerId string
}
func (s *streamPool) init() {
// TODO: to config
for i := 0; i < 10; i++ {
go s.handleMessageLoop()
}
}
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
st := s.addStream(peerId, drpcStream, tags...)
return st.readLoop()
@ -308,26 +299,6 @@ func (s *streamPool) removeStream(streamId uint32) {
st.l.Debug("stream removed", zap.Strings("tags", st.tags))
}
func (s *streamPool) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) {
return s.handleQueue.Add(ctx, handleMessage{
ctx: ctx,
msg: msg,
peerId: peerId,
})
}
func (s *streamPool) handleMessageLoop() {
for {
hm, err := s.handleQueue.WaitOne(context.Background())
if err != nil {
return
}
if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil {
log.WarnCtx(hm.ctx, "handle message error", zap.Error(err))
}
}
}
func (s *streamPool) Close() (err error) {
return s.exec.Close()
}

View File

@ -3,7 +3,6 @@ package streampool
import (
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger"
"github.com/cheggaaa/mb/v3"
)
const CName = "common.net.streampool"
@ -30,9 +29,7 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool {
streams: map[uint32]*stream{},
opening: map[string]*openingProcess{},
exec: newStreamSender(10, 100),
handleQueue: mb.New[handleMessage](100),
}
sp.init()
return sp
}