From b5068b43574263820651dd97645cbf42523c74d0 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 3 Jan 2024 07:06:41 +0530 Subject: [PATCH] feat: relay msg size (#963) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: richΛrd --- cmd/waku/flags.go | 7 +++++++ cmd/waku/main.go | 1 + cmd/waku/node.go | 16 +++++++++++++++ cmd/waku/options.go | 1 + flake.nix | 2 +- go.mod | 1 + go.sum | 3 +++ waku/v2/node/wakunode2.go | 5 +++-- waku/v2/node/wakuoptions.go | 8 ++++++++ waku/v2/protocol/relay/options.go | 30 ++++++++++++++++++++++++++++ waku/v2/protocol/relay/waku_relay.go | 26 +++++++++++++++--------- 11 files changed, 88 insertions(+), 12 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 91a82d5c..f87cb31f 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -286,6 +286,13 @@ var ( Destination: &options.Relay.MinRelayPeersToPublish, EnvVars: []string{"WAKUNODE2_MIN_RELAY_PEERS_TO_PUBLISH"}, }) + MaxRelayMsgSize = altsrc.NewStringFlag(&cli.StringFlag{ + Name: "max-msg-size", + Value: "150KB", + Usage: "Maximum message size. Supported formats are B, KiB, KB, MiB. If no suffix, default is bytes", + Destination: &options.Relay.MaxMsgSize, + EnvVars: []string{"WAKUNODE2_MAX_RELAY_MSG_SIZE"}, + }) StoreNodeFlag = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "storenode", Usage: "Multiaddr of a peer that supports store protocol. Option may be repeated", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index f868505f..fc733196 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -59,6 +59,7 @@ func main() { ProtectedTopics, RelayPeerExchange, MinRelayPeersToPublish, + MaxRelayMsgSize, StoreNodeFlag, StoreFlag, StoreMessageDBURL, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index e9d38f16..ef725a1c 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -53,6 +53,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" + + humanize "github.com/dustin/go-humanize" ) func requiresDB(options NodeOptions) bool { @@ -234,10 +236,15 @@ func Execute(options NodeOptions) error { nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) nodeOpts = append(nodeOpts, node.WithNTP()) + maxMsgSize := parseMsgSizeConfig(options.Relay.MaxMsgSize) + if options.Relay.Enable { var wakurelayopts []pubsub.Option wakurelayopts = append(wakurelayopts, pubsub.WithPeerExchange(options.Relay.PeerExchange)) + wakurelayopts = append(wakurelayopts, pubsub.WithMaxMessageSize(maxMsgSize)) + nodeOpts = append(nodeOpts, node.WithWakuRelayAndMinPeers(options.Relay.MinRelayPeersToPublish, wakurelayopts...)) + nodeOpts = append(nodeOpts, node.WithMaxMsgSize(maxMsgSize)) } nodeOpts = append(nodeOpts, node.WithWakuFilterLightNode()) @@ -577,3 +584,12 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption } } + +func parseMsgSizeConfig(msgSizeConfig string) int { + + msgSize, err := humanize.ParseBytes(msgSizeConfig) + if err != nil { + msgSize = 0 + } + return int(msgSize) +} diff --git a/cmd/waku/options.go b/cmd/waku/options.go index ee32bb2d..4031a2f5 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -30,6 +30,7 @@ type RelayOptions struct { ContentTopics cli.StringSlice PeerExchange bool MinRelayPeersToPublish int + MaxMsgSize string } // RLNRelayOptions are settings used to enable RLN Relay. This is a protocol diff --git a/flake.nix b/flake.nix index dc02d66e..fd85c4ba 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-kW9xdZ1JWUWpFVJzSFL3F349pVbtIOQ0Qz1OlbhBzn8="; + vendorSha256 = "sha256-+g64LjpSraujzau3d63XMWKed/CnRzL/nxcu0PhyvFw="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index b7cd18c1..18f9d78f 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect github.com/deckarep/golang-set v1.8.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect diff --git a/go.sum b/go.sum index f66b5b37..7f25e1e7 100644 --- a/go.sum +++ b/go.sum @@ -485,6 +485,8 @@ github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= @@ -563,6 +565,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5faf5435..395c6444 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -281,8 +281,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) - - w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, w.opts.pubsubOpts...) + w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, + relay.WithPubSubOptions(w.opts.pubsubOpts), + relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes)) if w.opts.enableRelay { err = w.setupRLNRelay() diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 39f04fe3..dd6d9958 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -80,6 +80,7 @@ type WakuNodeParameters struct { pubsubOpts []pubsub.Option minRelayPeersToPublish int + maxMsgSizeBytes int enableStore bool messageProvider store.MessageProvider @@ -358,6 +359,13 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) } } +func WithMaxMsgSize(maxMsgSizeBytes int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.maxMsgSizeBytes = maxMsgSizeBytes + return nil + } +} + func WithMaxPeerConnections(maxPeers int) WakuNodeOption { return func(params *WakuNodeParameters) error { params.maxPeerConnections = maxPeers diff --git a/waku/v2/protocol/relay/options.go b/waku/v2/protocol/relay/options.go index 253a9fde..7d21f927 100644 --- a/waku/v2/protocol/relay/options.go +++ b/waku/v2/protocol/relay/options.go @@ -1,5 +1,7 @@ package relay +import pubsub "github.com/libp2p/go-libp2p-pubsub" + type publishParameters struct { pubsubTopic string } @@ -20,3 +22,31 @@ func WithDefaultPubsubTopic() PublishOption { params.pubsubTopic = DefaultWakuTopic } } + +type relayParameters struct { + pubsubOpts []pubsub.Option + maxMsgSizeBytes int +} + +type RelayOption func(*relayParameters) + +func WithPubSubOptions(opts []pubsub.Option) RelayOption { + return func(params *relayParameters) { + params.pubsubOpts = append(params.pubsubOpts, opts...) + } +} + +func WithMaxMsgSize(maxMsgSizeBytes int) RelayOption { + return func(params *relayParameters) { + if maxMsgSizeBytes == 0 { + maxMsgSizeBytes = defaultMaxMsgSizeBytes + } + params.maxMsgSizeBytes = maxMsgSizeBytes + } +} + +func defaultOptions() []RelayOption { + return []RelayOption{ + WithMaxMsgSize(defaultMaxMsgSizeBytes), + } +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index a1d279df..f2f35801 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -27,13 +27,15 @@ import ( const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") const WakuRelayENRField = uint8(1 << 0) +const defaultMaxMsgSizeBytes = 150 * 1024 + // DefaultWakuTopic is the default pubsub topic used across all Waku protocols var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() // WakuRelay is the implementation of the Waku Relay protocol type WakuRelay struct { host host.Host - opts []pubsub.Option + relayParams *relayParameters pubsub *pubsub.PubSub params pubsub.GossipSubParams peerScoreParams *pubsub.PeerScoreParams @@ -41,9 +43,8 @@ type WakuRelay struct { topicParams *pubsub.TopicScoreParams timesource timesource.Timesource metrics Metrics - - log *zap.Logger - logMessages *zap.Logger + log *zap.Logger + logMessages *zap.Logger bcaster Broadcaster @@ -75,7 +76,7 @@ type pubsubTopicSubscriptionDetails struct { // NewWakuRelay returns a new instance of a WakuRelay struct func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, - reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { + reg prometheus.Registerer, log *zap.Logger, opts ...RelayOption) *WakuRelay { w := new(WakuRelay) w.timesource = timesource w.topics = make(map[string]*pubsubTopicSubscriptionDetails) @@ -87,9 +88,16 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.logMessages = utils.MessagesLogger("relay") w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.logMessages) + w.relayParams = new(relayParameters) + w.relayParams.pubsubOpts = w.defaultPubsubOptions() - // default options required by WakuRelay - w.opts = append(w.defaultPubsubOptions(), opts...) + options := defaultOptions() + options = append(options, opts...) + for _, opt := range options { + opt(w.relayParams) + } + w.log.Info("relay config", zap.Int("max-msg-size-bytes", w.relayParams.maxMsgSizeBytes), + zap.Int("min-peers-to-publish", w.minPeersToPublish)) return w } @@ -123,7 +131,7 @@ func (w *WakuRelay) start() error { if w.bcaster == nil { return errors.New("broadcaster not specified for relay") } - ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...) + ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.relayParams.pubsubOpts...) if err != nil { return err } @@ -289,7 +297,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . return nil, err } - if len(out) > pubsub.DefaultMaxMessageSize { + if len(out) > w.relayParams.maxMsgSizeBytes { return nil, errors.New("message size exceeds gossipsub max message size") }