go-waku/waku/v2/protocol/lightpush/waku_lightpush.go

282 lines
8.4 KiB
Go
Raw Normal View History

package lightpush
import (
"context"
"encoding/hex"
"errors"
"math"
2022-10-19 19:39:32 +00:00
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
2021-09-30 15:59:51 +00:00
const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidID = errors.New("invalid request id")
)
// WakuLightPush is the implementation of the Waku LightPush protocol
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
cancel context.CancelFunc
pm *peermanager.PeerManager
metrics Metrics
log *zap.Logger
}
// NewWakuLightPush returns a new instance of Waku Lightpush struct
// Takes an optional peermanager if WakuLightPush is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.log = log.Named("lightpush")
wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg)
2021-11-01 12:38:03 +00:00
return wakuLP
}
2023-04-17 00:04:12 +00:00
// Sets the host to be able to mount or consume a protocol
func (wakuLP *WakuLightPush) SetHost(h host.Host) {
wakuLP.h = h
}
2022-05-04 21:08:24 +00:00
// Start inits the lighpush protocol
2023-01-06 22:37:57 +00:00
func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
if wakuLP.relayIsNotAvailable() {
return errors.New("relay is required, without it, it is only a client and cannot be started")
}
2023-01-06 22:37:57 +00:00
ctx, cancel := context.WithCancel(ctx)
wakuLP.cancel = cancel
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")
2021-11-01 12:38:03 +00:00
return nil
}
// relayIsNotAvailable determines if this node supports relaying messages for other lightpush clients
func (wakuLP *WakuLightPush) relayIsNotAvailable() bool {
return wakuLP.relay == nil
}
2023-01-06 22:37:57 +00:00
func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wakuLP.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestPushRPC := &pb.PushRPC{}
writer := pbio.NewDelimitedWriter(s)
reader := pbio.NewDelimitedReader(s, math.MaxInt32)
2023-01-06 22:37:57 +00:00
err := reader.ReadMsg(requestPushRPC)
if err != nil {
logger.Error("reading request", zap.Error(err))
wakuLP.metrics.RecordError(decodeRPCFailure)
2023-01-06 22:37:57 +00:00
return
}
2023-01-06 22:37:57 +00:00
logger.Info("request received")
if requestPushRPC.Query != nil {
logger.Info("push request")
response := new(pb.PushResponse)
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
wakuLP.metrics.RecordMessage()
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic)
if err != nil {
logger.Error("publishing message", zap.Error(err))
wakuLP.metrics.RecordError(messagePushFailure)
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
response.Info = "Totally" // TODO: ask about this
}
2023-01-06 22:37:57 +00:00
responsePushRPC := &pb.PushRPC{}
responsePushRPC.RequestId = requestPushRPC.RequestId
responsePushRPC.Response = response
2023-01-06 22:37:57 +00:00
err = writer.WriteMsg(responsePushRPC)
if err != nil {
wakuLP.metrics.RecordError(writeResponseFailure)
2023-01-06 22:37:57 +00:00
logger.Error("writing response", zap.Error(err))
_ = s.Reset()
} else {
logger.Info("response sent")
}
} else {
wakuLP.metrics.RecordError(emptyRequestBodyFailure)
}
2023-01-06 22:37:57 +00:00
if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess {
logger.Info("request success")
} else {
logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info))
}
} else {
wakuLP.metrics.RecordError(emptyResponseBodyFailure)
}
}
}
// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushParameters) (*pb.PushResponse, error) {
if params == nil {
return nil, errors.New("lightpush params are mandatory")
}
if len(params.requestID) == 0 {
return nil, ErrInvalidID
}
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
2022-03-03 16:04:03 +00:00
2021-09-30 15:59:51 +00:00
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
return nil, err
}
defer connOpt.Close()
2021-08-13 11:56:09 +00:00
defer func() {
err := connOpt.Reset()
if err != nil {
wakuLP.metrics.RecordError(dialFailure)
logger.Error("resetting connection", zap.Error(err))
2021-08-13 11:56:09 +00:00
}
}()
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestID), Query: req}
writer := pbio.NewDelimitedWriter(connOpt)
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(pushRequestRPC)
if err != nil {
wakuLP.metrics.RecordError(writeRequestFailure)
logger.Error("writing request", zap.Error(err))
return nil, err
}
pushResponseRPC := &pb.PushRPC{}
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
wakuLP.metrics.RecordError(decodeRPCFailure)
return nil, err
}
return pushResponseRPC.Response, nil
}
2022-05-04 21:08:24 +00:00
// Stop unmounts the lightpush protocol
func (wakuLP *WakuLightPush) Stop() {
2023-01-06 22:37:57 +00:00
if wakuLP.cancel == nil {
return
}
2023-01-06 22:37:57 +00:00
wakuLP.cancel()
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}
2021-11-01 12:38:03 +00:00
func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) {
params := new(lightPushParameters)
params.host = wakuLP.h
params.log = wakuLP.log
params.pm = wakuLP.pm
var err error
optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {
opt(params)
}
2021-11-01 12:38:03 +00:00
if params.pubsubTopic == "" {
params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
}
if params.pm != nil && params.selectedPeer == "" {
params.selectedPeer, err = wakuLP.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1,
PubsubTopic: params.pubsubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
}
if params.selectedPeer == "" {
if err != nil {
params.log.Error("selecting peer", zap.Error(err))
wakuLP.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable
}
}
return params, nil
}
// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
params, err := wakuLP.handleOpts(ctx, message, opts...)
if err != nil {
return nil, err
}
2021-11-01 12:38:03 +00:00
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = params.pubsubTopic
2021-11-01 12:38:03 +00:00
response, err := wakuLP.request(ctx, req, params)
2021-11-01 12:38:03 +00:00
if err != nil {
return nil, err
}
if response.IsSuccess {
hash := message.Hash(params.pubsubTopic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
2021-11-01 12:38:03 +00:00
return hash, nil
}
return nil, errors.New(response.Info)
2021-11-01 12:38:03 +00:00
}
// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol
// If auto-sharding is not to be used, then PublishToTopic API should be used
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
return wakuLP.PublishToTopic(ctx, message, opts...)
}