Sergey Cherepanov 7f0960937e
consensus node
2022-10-04 15:39:29 +03:00

182 lines
4.5 KiB
Go

package db
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserrs"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
const CName = "consensus.db"
var log = logger.NewNamed(CName)
func New() Service {
return &service{}
}
type ChangeReceiver func(logId []byte, records []consensus.Record)
type Service interface {
AddLog(ctx context.Context, log consensus.Log) (err error)
AddRecord(ctx context.Context, logId []byte, record consensus.Record) (err error)
FetchLog(ctx context.Context, logId []byte) (log consensus.Log, err error)
SetChangeReceiver(receiver ChangeReceiver) (err error)
app.ComponentRunnable
}
type service struct {
conf config.Mongo
logColl *mongo.Collection
running bool
changeReceiver ChangeReceiver
streamCtx context.Context
streamCancel context.CancelFunc
listenerDone chan struct{}
}
func (s *service) Init(a *app.App) (err error) {
s.conf = a.MustComponent(config.CName).(*config.Config).Mongo
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(s.conf.Connect))
if err != nil {
return err
}
s.logColl = client.Database(s.conf.Database).Collection(s.conf.LogCollection)
s.running = true
if s.changeReceiver != nil {
if err = s.runStreamListener(ctx); err != nil {
return err
}
}
return
}
func (s *service) AddLog(ctx context.Context, l consensus.Log) (err error) {
_, err = s.logColl.InsertOne(ctx, l)
if mongo.IsDuplicateKeyError(err) {
return consensuserrs.ErrLogExists
}
return
}
type findLogQuery struct {
Id []byte `bson:"_id"`
}
type findRecordQuery struct {
Id []byte `bson:"_id"`
LastRecordId []byte `bson:"records.0.id"`
}
type updateOp struct {
Push struct {
Records struct {
Each []consensus.Record `bson:"$each""`
Pos int `bson:"$position"`
} `bson:"records"`
} `bson:"$push"`
}
func (s *service) AddRecord(ctx context.Context, logId []byte, record consensus.Record) (err error) {
var upd updateOp
upd.Push.Records.Each = []consensus.Record{record}
result, err := s.logColl.UpdateOne(ctx, findRecordQuery{
Id: logId,
LastRecordId: record.PrevId,
}, upd)
if err != nil {
log.Error("addRecord update error", zap.Error(err))
return consensuserrs.ErrUnexpected
}
if result.ModifiedCount == 0 {
return consensuserrs.ErrConflict
}
return
}
func (s *service) FetchLog(ctx context.Context, logId []byte) (l consensus.Log, err error) {
if err = s.logColl.FindOne(ctx, findLogQuery{Id: logId}).Decode(&l); err != nil {
if err == mongo.ErrNoDocuments {
err = consensuserrs.ErrLogNotFound
}
return
}
return
}
func (s *service) SetChangeReceiver(receiver ChangeReceiver) (err error) {
if s.running {
return fmt.Errorf("set receiver must be called before Run")
}
s.changeReceiver = receiver
return
}
type matchPipeline struct {
Match struct {
OT string `bson:"operationType"`
} `bson:"$match"`
}
func (s *service) runStreamListener(ctx context.Context) (err error) {
var mp matchPipeline
mp.Match.OT = "update"
stream, err := s.logColl.Watch(ctx, []matchPipeline{mp})
if err != nil {
return
}
s.listenerDone = make(chan struct{})
s.streamCtx, s.streamCancel = context.WithCancel(context.Background())
go s.streamListener(stream)
return
}
type streamResult struct {
DocumentKey struct {
Id []byte `bson:"_id"`
} `bson:"documentKey"`
UpdateDescription struct {
UpdateFields struct {
Records []consensus.Record `bson:"records"`
} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
func (s *service) streamListener(stream *mongo.ChangeStream) {
defer close(s.listenerDone)
for stream.Next(s.streamCtx) {
var res streamResult
if err := stream.Decode(&res); err != nil {
log.Error("stream decode error:", zap.Error(err))
}
s.changeReceiver(res.DocumentKey.Id, res.UpdateDescription.UpdateFields.Records)
}
}
func (s *service) Close(ctx context.Context) (err error) {
if s.logColl != nil {
err = s.logColl.Database().Client().Disconnect(ctx)
s.logColl = nil
}
if s.listenerDone != nil {
s.streamCancel()
<-s.listenerDone
}
return
}