diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index f87cb31f..e5fd8588 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -6,6 +6,7 @@ import ( cli "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" "github.com/waku-org/go-waku/waku/cliutils" + "github.com/waku-org/go-waku/waku/v2/node" ) var ( @@ -234,11 +235,18 @@ var ( }) AgentString = altsrc.NewStringFlag(&cli.StringFlag{ Name: "agent-string", - Value: "go-waku", + Value: node.UserAgent, Usage: "client id to advertise", Destination: &options.UserAgent, EnvVars: []string{"WAKUNODE2_AGENT_STRING"}, }) + IPColocationLimit = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "ip-colocation-limit", + Value: node.DefaultMaxConnectionsPerIP, + Usage: "max number of allowed peers from the same IP. Set it to 0 to remove the limitation.", + Destination: &options.IPColocationLimit, + EnvVars: []string{"WAKUNODE2_IP_COLOCATION_LIMIT"}, + }) Relay = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "relay", Value: true, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index fc733196..7f4bbd23 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -48,6 +48,7 @@ func main() { ForceReachability, ResourceScalingMemoryPercent, ResourceScalingFDPercent, + IPColocationLimit, LogLevel, LogEncoding, LogOutput, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index a56a616c..c11b5f35 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -140,6 +140,7 @@ func Execute(options NodeOptions) error { node.WithMaxPeerConnections(options.MaxPeerConnections), node.WithPrometheusRegisterer(prometheus.DefaultRegisterer), node.WithPeerStoreCapacity(options.PeerStoreCapacity), + node.WithMaxConnectionsPerIP(options.IPColocationLimit), node.WithClusterID(uint16(options.ClusterID)), } if len(options.AdvertiseAddresses) != 0 { diff --git a/cmd/waku/options.go b/cmd/waku/options.go index 4031a2f5..bda8dc3a 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -172,6 +172,7 @@ type NodeOptions struct { PProf bool MaxPeerConnections int PeerStoreCapacity int + IPColocationLimit int PeerExchange PeerExchangeOptions Websocket WSOptions diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fdc8de67..e4c6950f 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -344,7 +344,7 @@ func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start(ctx context.Context) error { - connGater := peermanager.NewConnectionGater(w.log) + connGater := peermanager.NewConnectionGater(w.opts.maxConnectionsPerIP, w.log) ctx, cancel := context.WithCancel(ctx) w.cancel = cancel diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index dd6d9958..80783969 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -37,23 +37,26 @@ import ( "go.uber.org/zap/zapcore" ) -// Default userAgent -const userAgent string = "go-waku" +// Default UserAgent +const UserAgent string = "go-waku" // Default minRelayPeersToPublish const defaultMinRelayPeersToPublish = 0 +const DefaultMaxConnectionsPerIP = 5 + type WakuNodeParameters struct { - hostAddr *net.TCPAddr - clusterID uint16 - dns4Domain string - advertiseAddrs []multiaddr.Multiaddr - multiAddr []multiaddr.Multiaddr - addressFactory basichost.AddrsFactory - privKey *ecdsa.PrivateKey - libP2POpts []libp2p.Option - peerstore peerstore.Peerstore - prometheusReg prometheus.Registerer + hostAddr *net.TCPAddr + maxConnectionsPerIP int + clusterID uint16 + dns4Domain string + advertiseAddrs []multiaddr.Multiaddr + multiAddr []multiaddr.Multiaddr + addressFactory basichost.AddrsFactory + privKey *ecdsa.PrivateKey + libP2POpts []libp2p.Option + peerstore peerstore.Peerstore + prometheusReg prometheus.Registerer circuitRelayMinInterval time.Duration circuitRelayBootDelay time.Duration @@ -124,6 +127,7 @@ type WakuNodeOption func(*WakuNodeParameters) error var DefaultWakuNodeOptions = []WakuNodeOption{ WithPrometheusRegisterer(prometheus.NewRegistry()), WithMaxPeerConnections(50), + WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), } @@ -304,6 +308,14 @@ func WithClusterID(clusterID uint16) WakuNodeOption { } } +// WithMaxConnectionsPerIP sets the max number of allowed peers from the same IP +func WithMaxConnectionsPerIP(limit int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.maxConnectionsPerIP = limit + return nil + } +} + // WithNTP is used to use ntp for any operation that requires obtaining time // A list of ntp servers can be passed but if none is specified, some defaults // will be used @@ -560,7 +572,7 @@ var DefaultLibP2POptions = []libp2p.Option{ libp2p.Transport(quic.NewTransport), libp2p.Transport(libp2pwebtransport.New), ), - libp2p.UserAgent(userAgent), + libp2p.UserAgent(UserAgent), libp2p.ChainOptions( libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), diff --git a/waku/v2/peermanager/connection_gater.go b/waku/v2/peermanager/connection_gater.go index 5b9b761b..d0800813 100644 --- a/waku/v2/peermanager/connection_gater.go +++ b/waku/v2/peermanager/connection_gater.go @@ -16,19 +16,21 @@ import ( // the number of connections per IP address type ConnectionGater struct { sync.Mutex - logger *zap.Logger - limiter map[string]int + logger *zap.Logger + limiter map[string]int + maxConnsPerIP int } -const maxConnsPerIP = 10 - // NewConnectionGater creates a new instance of ConnectionGater -func NewConnectionGater(logger *zap.Logger) *ConnectionGater { +func NewConnectionGater(maxConnsPerIP int, logger *zap.Logger) *ConnectionGater { c := &ConnectionGater{ - logger: logger.Named("connection-gater"), - limiter: make(map[string]int), + logger: logger.Named("connection-gater"), + maxConnsPerIP: maxConnsPerIP, + limiter: make(map[string]int), } + c.logger.Info("configured settings", zap.Int("maxConnsPerIP", maxConnsPerIP)) + return c } @@ -103,7 +105,7 @@ func (c *ConnectionGater) validateInboundConn(addr multiaddr.Multiaddr) bool { c.Lock() defer c.Unlock() - if currConnections := c.limiter[ip.String()]; currConnections+1 > maxConnsPerIP { + if currConnections := c.limiter[ip.String()]; c.maxConnsPerIP > 0 && currConnections+1 > c.maxConnsPerIP { return false }