Fix bugs with storage
This commit is contained in:
parent
df4f1d709d
commit
1af3f1b290
@ -71,7 +71,8 @@ func (t *textDocument) AddText(text string) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.objTree.Lock()
|
||||
defer t.objTree.Unlock()
|
||||
_, err = t.objTree.AddContent(context.Background(), tree.SignableChangeContent{
|
||||
Data: res,
|
||||
Key: t.account.Account().SignKey,
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
package spacesyncproto
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
"storj.io/drpc"
|
||||
)
|
||||
@ -25,6 +26,7 @@ func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTre
|
||||
},
|
||||
RootChange: rootChange,
|
||||
TreeId: treeId,
|
||||
TrackingId: trackingId,
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,6 +37,7 @@ func WrapFullRequest(request *ObjectFullSyncRequest, rootChange *treechangeproto
|
||||
},
|
||||
RootChange: rootChange,
|
||||
TreeId: treeId,
|
||||
TrackingId: trackingId,
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,6 +48,7 @@ func WrapFullResponse(response *ObjectFullSyncResponse, rootChange *treechangepr
|
||||
},
|
||||
RootChange: rootChange,
|
||||
TreeId: treeId,
|
||||
TrackingId: trackingId,
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,5 +59,21 @@ func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeI
|
||||
},
|
||||
RootChange: rootChange,
|
||||
TreeId: treeId,
|
||||
TrackingId: trackingId,
|
||||
}
|
||||
}
|
||||
|
||||
func MessageDescription(msg *ObjectSyncMessage) string {
|
||||
content := msg.GetContent()
|
||||
switch {
|
||||
case content.GetHeadUpdate() != nil:
|
||||
return fmt.Sprintf("head update/%v", content.GetHeadUpdate().Heads)
|
||||
case content.GetFullSyncRequest() != nil:
|
||||
return fmt.Sprintf("fullsync request/%v", content.GetFullSyncRequest().Heads)
|
||||
case content.GetFullSyncResponse() != nil:
|
||||
return fmt.Sprintf("fullsync response/%v", content.GetFullSyncResponse().Heads)
|
||||
case content.GetErrorResponse() != nil:
|
||||
return fmt.Sprintf("error response/%v", content.GetErrorResponse().Error)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
@ -90,7 +90,7 @@ func (s *streamPool) SendSync(
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: limit wait time here and remove the waiter
|
||||
reply = <-waiter.ch
|
||||
return
|
||||
}
|
||||
@ -112,6 +112,9 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
|
||||
streams := getStreams()
|
||||
s.Unlock()
|
||||
|
||||
log.With("description", spacesyncproto.MessageDescription(message)).
|
||||
With("treeId", message.TreeId).
|
||||
Debugf("sending message to %d peers", len(streams))
|
||||
for _, s := range streams {
|
||||
err = s.Send(message)
|
||||
}
|
||||
@ -158,6 +161,9 @@ Loop:
|
||||
|
||||
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
streams := s.getAllStreams()
|
||||
log.With("description", spacesyncproto.MessageDescription(message)).
|
||||
With("treeId", message.TreeId).
|
||||
Debugf("broadcasting message to %d peers", len(streams))
|
||||
for _, stream := range streams {
|
||||
if err = stream.Send(message); err != nil {
|
||||
// TODO: add logging
|
||||
|
||||
@ -44,7 +44,10 @@ func (s *syncHandler) handleHeadUpdate(
|
||||
senderId string,
|
||||
update *spacesyncproto.ObjectHeadUpdate,
|
||||
msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
|
||||
log.With("senderId", senderId).
|
||||
With("heads", update.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("received head update message")
|
||||
var (
|
||||
fullRequest *spacesyncproto.ObjectSyncMessage
|
||||
isEmptyUpdate = len(update.Changes) == 0
|
||||
@ -86,8 +89,16 @@ func (s *syncHandler) handleHeadUpdate(
|
||||
}()
|
||||
|
||||
if fullRequest != nil {
|
||||
log.With("senderId", senderId).
|
||||
With("heads", update.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("sending full sync request")
|
||||
return s.syncClient.SendAsync([]string{senderId}, fullRequest)
|
||||
}
|
||||
log.With("senderId", senderId).
|
||||
With("heads", update.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("head update finished correctly")
|
||||
return
|
||||
}
|
||||
|
||||
@ -96,6 +107,11 @@ func (s *syncHandler) handleFullSyncRequest(
|
||||
senderId string,
|
||||
request *spacesyncproto.ObjectFullSyncRequest,
|
||||
msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log.With("senderId", senderId).
|
||||
With("heads", request.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
With("trackingId", msg.TrackingId).
|
||||
Debug("received full sync request message")
|
||||
var (
|
||||
fullResponse *spacesyncproto.ObjectSyncMessage
|
||||
header = msg.RootChange
|
||||
@ -141,11 +157,18 @@ func (s *syncHandler) handleFullSyncResponse(
|
||||
senderId string,
|
||||
response *spacesyncproto.ObjectFullSyncResponse,
|
||||
msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log.With("senderId", senderId).
|
||||
With("heads", response.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("received full sync response message")
|
||||
objTree, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId)
|
||||
if err != nil {
|
||||
log.With("senderId", senderId).
|
||||
With("heads", response.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("failed to find the tree in full sync response")
|
||||
return
|
||||
}
|
||||
|
||||
err = func() error {
|
||||
objTree.Lock()
|
||||
defer objTree.Unlock()
|
||||
@ -157,6 +180,10 @@ func (s *syncHandler) handleFullSyncResponse(
|
||||
_, err = objTree.AddRawChanges(ctx, response.Changes...)
|
||||
return err
|
||||
}()
|
||||
log.With("error", err != nil).
|
||||
With("heads", response.Heads).
|
||||
With("treeId", msg.TreeId).
|
||||
Debug("finished full sync response")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package synctree
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
|
||||
@ -21,6 +22,8 @@ type SyncTree struct {
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("commonspace.synctree").Sugar()
|
||||
|
||||
var createDerivedObjectTree = tree2.CreateDerivedObjectTree
|
||||
var createObjectTree = tree2.CreateObjectTree
|
||||
var buildObjectTree = tree2.BuildObjectTree
|
||||
@ -138,8 +141,10 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
|
||||
}
|
||||
|
||||
func (s *SyncTree) Close() (err error) {
|
||||
log.With("id", s.ID()).Debug("closing sync tree")
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
log.With("id", s.ID()).Debug("taken lock on sync tree")
|
||||
if s.isClosed {
|
||||
err = ErrSyncTreeClosed
|
||||
return
|
||||
|
||||
@ -99,7 +99,7 @@ func (t *treeStorage) Heads() (heads []string, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if heads == nil {
|
||||
if headsBytes == nil {
|
||||
err = storage2.ErrUnknownTreeId
|
||||
return
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user