go-waku/waku/v2/protocol/lightpush/waku_lightpush.go

220 lines
5.9 KiB
Go
Raw Normal View History

package lightpush
import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
2021-06-16 10:14:22 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
2021-09-30 15:59:51 +00:00
const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
ctx context.Context
log *zap.SugaredLogger
started bool
}
func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.SugaredLogger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.ctx = ctx
wakuLP.h = h
wakuLP.log = log.Named("lightpush")
2021-11-01 12:38:03 +00:00
return wakuLP
}
func (wakuLP *WakuLightPush) Start() error {
if wakuLP.IsClientOnly() {
return errors.New("relay is required, without it, it is only a client and cannot be started")
}
2021-11-01 12:38:03 +00:00
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
wakuLP.log.Info("Light Push protocol started")
wakuLP.started = true
2021-11-01 12:38:03 +00:00
return nil
}
func (wakuLp *WakuLightPush) IsClientOnly() bool {
return wakuLp.relay == nil
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
defer s.Close()
requestPushRPC := &pb.PushRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(requestPushRPC)
if err != nil {
wakuLP.log.Error("error reading request", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure")
return
}
wakuLP.log.Info(fmt.Sprintf("%s: lightpush message received from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
if requestPushRPC.Query != nil {
wakuLP.log.Info("lightpush push request")
response := new(pb.PushResponse)
if !wakuLP.IsClientOnly() {
2021-11-19 16:19:48 +00:00
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
2021-11-01 12:38:03 +00:00
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
2021-11-20 00:03:05 +00:00
_, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic)
if err != nil {
response.IsSuccess = false
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
2021-06-28 14:14:28 +00:00
response.Info = "Totally" // TODO: ask about this
}
} else {
wakuLP.log.Debug("no relay protocol present, unsuccessful push")
response.IsSuccess = false
response.Info = "No relay protocol"
}
responsePushRPC := &pb.PushRPC{}
responsePushRPC.RequestId = requestPushRPC.RequestId
responsePushRPC.Response = response
err = writer.WriteMsg(responsePushRPC)
if err != nil {
wakuLP.log.Error("error writing response", err)
_ = s.Reset()
} else {
wakuLP.log.Info(fmt.Sprintf("%s: response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()))
}
}
if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess {
wakuLP.log.Info("lightpush message success")
} else {
wakuLP.log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info))
}
}
}
2021-11-01 12:38:03 +00:00
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.host = wakuLP.h
params.log = wakuLP.log
optList := DefaultOptions(wakuLP.h)
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
2021-09-30 15:59:51 +00:00
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
wakuLP.log.Info("failed to connect to remote peer", err)
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, err
}
defer connOpt.Close()
2021-08-13 11:56:09 +00:00
defer func() {
err := connOpt.Reset()
if err != nil {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
wakuLP.log.Error("failed to reset connection", err)
2021-08-13 11:56:09 +00:00
}
}()
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(pushRequestRPC)
if err != nil {
wakuLP.log.Error("could not write request", err)
return nil, err
}
pushResponseRPC := &pb.PushRPC{}
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
wakuLP.log.Error("could not read response", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure")
return nil, err
}
return pushResponseRPC.Response, nil
}
func (wakuLP *WakuLightPush) IsStarted() bool {
return wakuLP.started
}
func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
wakuLP.started = false
}
2021-11-01 12:38:03 +00:00
2021-11-20 00:03:05 +00:00
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) {
2021-11-01 12:38:03 +00:00
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = topic
2021-11-01 12:38:03 +00:00
response, err := wakuLP.request(ctx, req, opts...)
2021-11-01 12:38:03 +00:00
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) {
2021-11-20 00:03:05 +00:00
return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...)
}