94 lines
2.2 KiB
Go
94 lines
2.2 KiB
Go
package acl
|
|
|
|
import (
|
|
"context"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/acl/aclrecordproto"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectsync/synchandler"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
|
|
"go.uber.org/zap"
|
|
"sync"
|
|
)
|
|
|
|
func newWatcher(spaceId, aclId string, h synchandler.SyncHandler) (w *watcher, err error) {
|
|
w = &watcher{
|
|
aclId: aclId,
|
|
spaceId: spaceId,
|
|
handler: h,
|
|
ready: make(chan struct{}),
|
|
}
|
|
if w.logId, err = cidToByte(aclId); err != nil {
|
|
return nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
type watcher struct {
|
|
spaceId string
|
|
aclId string
|
|
logId []byte
|
|
handler synchandler.SyncHandler
|
|
ready chan struct{}
|
|
isReady sync.Once
|
|
err error
|
|
}
|
|
|
|
func (w *watcher) AddConsensusRecords(recs []*consensusproto.Record) {
|
|
w.isReady.Do(func() {
|
|
close(w.ready)
|
|
})
|
|
records := make([]*aclrecordproto.RawAclRecordWithId, 0, len(recs))
|
|
|
|
for _, rec := range recs {
|
|
recId, err := cidToString(rec.Id)
|
|
if err != nil {
|
|
log.Error("received invalid id from consensus node", zap.Error(err))
|
|
continue
|
|
}
|
|
records = append(records, &aclrecordproto.RawAclRecordWithId{
|
|
Payload: rec.Payload,
|
|
Id: recId,
|
|
})
|
|
}
|
|
|
|
aclReq := &aclrecordproto.AclSyncMessage{
|
|
Content: &aclrecordproto.AclSyncContentValue{
|
|
Value: &aclrecordproto.AclSyncContentValue_AddRecords{
|
|
AddRecords: &aclrecordproto.AclAddRecords{
|
|
Records: records,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
payload, err := aclReq.Marshal()
|
|
if err != nil {
|
|
log.Error("acl payload marshal error", zap.Error(err))
|
|
return
|
|
}
|
|
req := &spacesyncproto.ObjectSyncMessage{
|
|
SpaceId: w.spaceId,
|
|
Payload: payload,
|
|
ObjectId: w.aclId,
|
|
}
|
|
|
|
if err = w.handler.HandleMessage(context.TODO(), "", req); err != nil {
|
|
log.Warn("handle message error", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (w *watcher) AddConsensusError(err error) {
|
|
w.isReady.Do(func() {
|
|
w.err = err
|
|
close(w.ready)
|
|
})
|
|
}
|
|
|
|
func (w *watcher) Ready(ctx context.Context) (err error) {
|
|
select {
|
|
case <-w.ready:
|
|
return w.err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|