Remove error in broadcast

This commit is contained in:
mcrakhman 2023-04-26 18:12:03 +02:00
parent 4cb88b2bfb
commit 75a44c4281
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 15 additions and 15 deletions

View File

@ -116,9 +116,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if isFirstBuild { if isFirstBuild {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree // send to everybody, because everybody should know that the node or client got new tree
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil { syncTree.syncClient.Broadcast(ctx, headUpdate)
log.ErrorCtx(ctx, "broadcast error", zap.Error(e))
}
} }
return return
} }
@ -155,7 +153,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
} }
s.syncStatus.HeadsChange(s.Id(), res.Heads) s.syncStatus.HeadsChange(s.Id(), res.Heads)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.Broadcast(ctx, headUpdate) s.syncClient.Broadcast(ctx, headUpdate)
return return
} }
@ -182,7 +180,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
s.notifiable.UpdateHeads(s.Id(), res.Heads) s.notifiable.UpdateHeads(s.Id(), res.Heads)
} }
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.Broadcast(ctx, headUpdate) s.syncClient.Broadcast(ctx, headUpdate)
} }
return return
} }

View File

@ -73,7 +73,7 @@ func Test_BuildSyncTree(t *testing.T) {
updateListenerMock.EXPECT().Update(tr) updateListenerMock.EXPECT().Update(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload) res, err := tr.AddRawChanges(ctx, payload)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)
@ -95,7 +95,7 @@ func Test_BuildSyncTree(t *testing.T) {
updateListenerMock.EXPECT().Rebuild(tr) updateListenerMock.EXPECT().Rebuild(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload) res, err := tr.AddRawChanges(ctx, payload)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)
@ -133,7 +133,7 @@ func Test_BuildSyncTree(t *testing.T) {
Return(expectedRes, nil) Return(expectedRes, nil)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate) syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate)).Return(nil) syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddContent(ctx, content) res, err := tr.AddContent(ctx, content)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedRes, res) require.Equal(t, expectedRes, res)

View File

@ -39,11 +39,9 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
} }
// Broadcast mocks base method. // Broadcast mocks base method.
func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) m.ctrl.Call(m, "Broadcast", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
} }
// Broadcast indicates an expected call of Broadcast. // Broadcast indicates an expected call of Broadcast.

View File

@ -4,11 +4,12 @@ import (
"context" "context"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap"
) )
type SyncClient interface { type SyncClient interface {
RequestFactory RequestFactory
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage)
SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
MessagePool() MessagePool MessagePool() MessagePool
@ -31,12 +32,15 @@ func NewSyncClient(
} }
} }
func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
if err != nil { if err != nil {
return return
} }
return s.messagePool.Broadcast(ctx, objMsg) err = s.messagePool.Broadcast(ctx, objMsg)
if err != nil {
log.DebugCtx(ctx, "broadcast error", zap.Error(err))
}
} }
func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {