feat: relay msg size (#963)

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2024-01-03 07:06:41 +05:30 committed by GitHub
parent 4f8ed170fe
commit b5068b4357
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 88 additions and 12 deletions

View File

@ -286,6 +286,13 @@ var (
Destination: &options.Relay.MinRelayPeersToPublish, Destination: &options.Relay.MinRelayPeersToPublish,
EnvVars: []string{"WAKUNODE2_MIN_RELAY_PEERS_TO_PUBLISH"}, EnvVars: []string{"WAKUNODE2_MIN_RELAY_PEERS_TO_PUBLISH"},
}) })
MaxRelayMsgSize = altsrc.NewStringFlag(&cli.StringFlag{
Name: "max-msg-size",
Value: "150KB",
Usage: "Maximum message size. Supported formats are B, KiB, KB, MiB. If no suffix, default is bytes",
Destination: &options.Relay.MaxMsgSize,
EnvVars: []string{"WAKUNODE2_MAX_RELAY_MSG_SIZE"},
})
StoreNodeFlag = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ StoreNodeFlag = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "storenode", Name: "storenode",
Usage: "Multiaddr of a peer that supports store protocol. Option may be repeated", Usage: "Multiaddr of a peer that supports store protocol. Option may be repeated",

View File

@ -59,6 +59,7 @@ func main() {
ProtectedTopics, ProtectedTopics,
RelayPeerExchange, RelayPeerExchange,
MinRelayPeersToPublish, MinRelayPeersToPublish,
MaxRelayMsgSize,
StoreNodeFlag, StoreNodeFlag,
StoreFlag, StoreFlag,
StoreMessageDBURL, StoreMessageDBURL,

View File

@ -53,6 +53,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
humanize "github.com/dustin/go-humanize"
) )
func requiresDB(options NodeOptions) bool { func requiresDB(options NodeOptions) bool {
@ -234,10 +236,15 @@ func Execute(options NodeOptions) error {
nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...))
nodeOpts = append(nodeOpts, node.WithNTP()) nodeOpts = append(nodeOpts, node.WithNTP())
maxMsgSize := parseMsgSizeConfig(options.Relay.MaxMsgSize)
if options.Relay.Enable { if options.Relay.Enable {
var wakurelayopts []pubsub.Option var wakurelayopts []pubsub.Option
wakurelayopts = append(wakurelayopts, pubsub.WithPeerExchange(options.Relay.PeerExchange)) wakurelayopts = append(wakurelayopts, pubsub.WithPeerExchange(options.Relay.PeerExchange))
wakurelayopts = append(wakurelayopts, pubsub.WithMaxMessageSize(maxMsgSize))
nodeOpts = append(nodeOpts, node.WithWakuRelayAndMinPeers(options.Relay.MinRelayPeersToPublish, wakurelayopts...)) nodeOpts = append(nodeOpts, node.WithWakuRelayAndMinPeers(options.Relay.MinRelayPeersToPublish, wakurelayopts...))
nodeOpts = append(nodeOpts, node.WithMaxMsgSize(maxMsgSize))
} }
nodeOpts = append(nodeOpts, node.WithWakuFilterLightNode()) nodeOpts = append(nodeOpts, node.WithWakuFilterLightNode())
@ -577,3 +584,12 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption
} }
} }
func parseMsgSizeConfig(msgSizeConfig string) int {
msgSize, err := humanize.ParseBytes(msgSizeConfig)
if err != nil {
msgSize = 0
}
return int(msgSize)
}

View File

@ -30,6 +30,7 @@ type RelayOptions struct {
ContentTopics cli.StringSlice ContentTopics cli.StringSlice
PeerExchange bool PeerExchange bool
MinRelayPeersToPublish int MinRelayPeersToPublish int
MaxMsgSize string
} }
// RLNRelayOptions are settings used to enable RLN Relay. This is a protocol // RLNRelayOptions are settings used to enable RLN Relay. This is a protocol

View File

@ -29,7 +29,7 @@
]; ];
doCheck = false; doCheck = false;
# FIXME: This needs to be manually changed when updating modules. # FIXME: This needs to be manually changed when updating modules.
vendorSha256 = "sha256-kW9xdZ1JWUWpFVJzSFL3F349pVbtIOQ0Qz1OlbhBzn8="; vendorSha256 = "sha256-+g64LjpSraujzau3d63XMWKed/CnRzL/nxcu0PhyvFw=";
# Fix for 'nix run' trying to execute 'go-waku'. # Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; }; meta = { mainProgram = "waku"; };
}; };

1
go.mod
View File

@ -48,6 +48,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/deckarep/golang-set v1.8.0 // indirect github.com/deckarep/golang-set v1.8.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect

3
go.sum
View File

@ -485,6 +485,8 @@ github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw=
@ -563,6 +565,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=

View File

@ -281,8 +281,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
} }
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log,
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, w.opts.pubsubOpts...) relay.WithPubSubOptions(w.opts.pubsubOpts),
relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes))
if w.opts.enableRelay { if w.opts.enableRelay {
err = w.setupRLNRelay() err = w.setupRLNRelay()

View File

@ -80,6 +80,7 @@ type WakuNodeParameters struct {
pubsubOpts []pubsub.Option pubsubOpts []pubsub.Option
minRelayPeersToPublish int minRelayPeersToPublish int
maxMsgSizeBytes int
enableStore bool enableStore bool
messageProvider store.MessageProvider messageProvider store.MessageProvider
@ -358,6 +359,13 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
} }
} }
func WithMaxMsgSize(maxMsgSizeBytes int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.maxMsgSizeBytes = maxMsgSizeBytes
return nil
}
}
func WithMaxPeerConnections(maxPeers int) WakuNodeOption { func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.maxPeerConnections = maxPeers params.maxPeerConnections = maxPeers

View File

@ -1,5 +1,7 @@
package relay package relay
import pubsub "github.com/libp2p/go-libp2p-pubsub"
type publishParameters struct { type publishParameters struct {
pubsubTopic string pubsubTopic string
} }
@ -20,3 +22,31 @@ func WithDefaultPubsubTopic() PublishOption {
params.pubsubTopic = DefaultWakuTopic params.pubsubTopic = DefaultWakuTopic
} }
} }
type relayParameters struct {
pubsubOpts []pubsub.Option
maxMsgSizeBytes int
}
type RelayOption func(*relayParameters)
func WithPubSubOptions(opts []pubsub.Option) RelayOption {
return func(params *relayParameters) {
params.pubsubOpts = append(params.pubsubOpts, opts...)
}
}
func WithMaxMsgSize(maxMsgSizeBytes int) RelayOption {
return func(params *relayParameters) {
if maxMsgSizeBytes == 0 {
maxMsgSizeBytes = defaultMaxMsgSizeBytes
}
params.maxMsgSizeBytes = maxMsgSizeBytes
}
}
func defaultOptions() []RelayOption {
return []RelayOption{
WithMaxMsgSize(defaultMaxMsgSizeBytes),
}
}

View File

@ -27,13 +27,15 @@ import (
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
const WakuRelayENRField = uint8(1 << 0) const WakuRelayENRField = uint8(1 << 0)
const defaultMaxMsgSizeBytes = 150 * 1024
// DefaultWakuTopic is the default pubsub topic used across all Waku protocols // DefaultWakuTopic is the default pubsub topic used across all Waku protocols
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String()
// WakuRelay is the implementation of the Waku Relay protocol // WakuRelay is the implementation of the Waku Relay protocol
type WakuRelay struct { type WakuRelay struct {
host host.Host host host.Host
opts []pubsub.Option relayParams *relayParameters
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
params pubsub.GossipSubParams params pubsub.GossipSubParams
peerScoreParams *pubsub.PeerScoreParams peerScoreParams *pubsub.PeerScoreParams
@ -41,9 +43,8 @@ type WakuRelay struct {
topicParams *pubsub.TopicScoreParams topicParams *pubsub.TopicScoreParams
timesource timesource.Timesource timesource timesource.Timesource
metrics Metrics metrics Metrics
log *zap.Logger
log *zap.Logger logMessages *zap.Logger
logMessages *zap.Logger
bcaster Broadcaster bcaster Broadcaster
@ -75,7 +76,7 @@ type pubsubTopicSubscriptionDetails struct {
// NewWakuRelay returns a new instance of a WakuRelay struct // NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource,
reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { reg prometheus.Registerer, log *zap.Logger, opts ...RelayOption) *WakuRelay {
w := new(WakuRelay) w := new(WakuRelay)
w.timesource = timesource w.timesource = timesource
w.topics = make(map[string]*pubsubTopicSubscriptionDetails) w.topics = make(map[string]*pubsubTopicSubscriptionDetails)
@ -87,9 +88,16 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.logMessages = utils.MessagesLogger("relay") w.logMessages = utils.MessagesLogger("relay")
w.events = eventbus.NewBus() w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.logMessages) w.metrics = newMetrics(reg, w.logMessages)
w.relayParams = new(relayParameters)
w.relayParams.pubsubOpts = w.defaultPubsubOptions()
// default options required by WakuRelay options := defaultOptions()
w.opts = append(w.defaultPubsubOptions(), opts...) options = append(options, opts...)
for _, opt := range options {
opt(w.relayParams)
}
w.log.Info("relay config", zap.Int("max-msg-size-bytes", w.relayParams.maxMsgSizeBytes),
zap.Int("min-peers-to-publish", w.minPeersToPublish))
return w return w
} }
@ -123,7 +131,7 @@ func (w *WakuRelay) start() error {
if w.bcaster == nil { if w.bcaster == nil {
return errors.New("broadcaster not specified for relay") return errors.New("broadcaster not specified for relay")
} }
ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...) ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.relayParams.pubsubOpts...)
if err != nil { if err != nil {
return err return err
} }
@ -289,7 +297,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
return nil, err return nil, err
} }
if len(out) > pubsub.DefaultMaxMessageSize { if len(out) > w.relayParams.maxMsgSizeBytes {
return nil, errors.New("message size exceeds gossipsub max message size") return nil, errors.New("message size exceeds gossipsub max message size")
} }