fix(lightpush): return non-empty reqId and add LP opts to builder (#1103)

This commit is contained in:
Vaclav Pavlin 2024-05-17 00:06:39 +02:00 committed by GitHub
parent 07d9fc9770
commit 879bc08426
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 43 additions and 6 deletions

View File

@ -18,6 +18,7 @@ import (
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
@ -64,6 +65,7 @@ func main() {
node.WithHostAddress(hostAddr1), node.WithHostAddress(hostAddr1),
node.WithWakuRelay(), node.WithWakuRelay(),
node.WithWakuFilterFullNode(), node.WithWakuFilterFullNode(),
node.WithLightPush(lightpush.WithRateLimiter(1, 1)),
) )
if err != nil { if err != nil {
panic(err) panic(err)
@ -118,6 +120,7 @@ func main() {
go writeLoop(ctx, fullNode) go writeLoop(ctx, fullNode)
go readLoop(ctx, fullNode) go readLoop(ctx, fullNode)
go writeLightpushLoop(ctx, lightNode)
go func() { go func() {
// Unsubscribe filter after 5 seconds // Unsubscribe filter after 5 seconds
@ -144,6 +147,24 @@ func randomHex(n int) (string, error) {
} }
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
msg := createMessage(wakuNode, msgContent)
_, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String()))
if err != nil {
log.Error("Error sending a message: ", err)
}
}
func writeLightpush(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
msg := createMessage(wakuNode, msgContent)
_, err := wakuNode.Lightpush().Publish(ctx, msg, lightpush.WithPubSubTopic(pubSubTopic.String()))
if err != nil {
log.Error("Error sending a LP message: ", err)
}
}
func createMessage(wakuNode *node.WakuNode, msgContent string) *pb.WakuMessage {
var version uint32 = 0 var version uint32 = 0
p := new(payload.Payload) p := new(payload.Payload)
@ -159,10 +180,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()), Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()),
} }
_, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String())) return msg
if err != nil {
log.Error("Error sending a message: ", err)
}
} }
func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
@ -172,6 +190,14 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
} }
} }
func writeLightpushLoop(ctx context.Context, wakuNode *node.WakuNode) {
for {
//Change this sleep value to lower than 1 second to get Lightpush rate limiting kick in
time.Sleep(1000 * time.Millisecond)
writeLightpush(ctx, wakuNode, "Hello World via Lightpush!")
}
}
func readLoop(ctx context.Context, wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
pubsubTopic := pubSubTopic.String() pubsubTopic := pubSubTopic.String()
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic)) sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))

View File

@ -291,7 +291,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)

View File

@ -28,6 +28,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
@ -77,6 +78,7 @@ type WakuNodeParameters struct {
enableFilterFullNode bool enableFilterFullNode bool
filterOpts []filter.Option filterOpts []filter.Option
pubsubOpts []pubsub.Option pubsubOpts []pubsub.Option
lightpushOpts []lightpush.Option
minRelayPeersToPublish int minRelayPeersToPublish int
maxMsgSizeBytes int maxMsgSizeBytes int
@ -458,9 +460,10 @@ func WithMessageProvider(s legacy_store.MessageProvider) WakuNodeOption {
} }
// WithLightPush is a WakuNodeOption that enables the lightpush protocol // WithLightPush is a WakuNodeOption that enables the lightpush protocol
func WithLightPush() WakuNodeOption { func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableLightPush = true params.enableLightPush = true
params.lightpushOpts = lightpushOpts
return nil return nil
} }
} }

View File

@ -2,6 +2,10 @@ package pb
import "errors" import "errors"
// This special value for requestId indicates that the message was rate limited
// and we did not retreive the requestId to avoid a potential attack vector.
const REQUESTID_RATE_LIMITED = "N/A"
var ( var (
errMissingRequestID = errors.New("missing RequestId field") errMissingRequestID = errors.New("missing RequestId field")
errMissingQuery = errors.New("missing Query field") errMissingQuery = errors.New("missing Query field")
@ -32,6 +36,9 @@ func (x *PushRpc) ValidateRequest() error {
} }
func (x *PushRpc) ValidateResponse(requestID string) error { func (x *PushRpc) ValidateResponse(requestID string) error {
if x.RequestId == REQUESTID_RATE_LIMITED {
return nil
}
if x.RequestId == "" { if x.RequestId == "" {
return errMissingRequestID return errMissingRequestID
} }

View File

@ -108,6 +108,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
wakuLP.metrics.RecordError(rateLimitFailure) wakuLP.metrics.RecordError(rateLimitFailure)
responseMsg := "exceeds the rate limit" responseMsg := "exceeds the rate limit"
responsePushRPC.Response.Info = &responseMsg responsePushRPC.Response.Info = &responseMsg
responsePushRPC.RequestId = pb.REQUESTID_RATE_LIMITED
wakuLP.reply(stream, responsePushRPC, logger) wakuLP.reply(stream, responsePushRPC, logger)
return return
} }