From 5885e03680ced0e240e7a15eb983734192eff537 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 25 Oct 2021 12:58:38 +0200 Subject: [PATCH] Add maxMessageSize option to pubsub (#634) * Add maxMessageSize option to pubsub * Switched default to 1mb --- libp2p/protocols/pubsub/pubsub.nim | 13 ++++++- libp2p/protocols/pubsub/pubsubpeer.nim | 11 +++++- tests/pubsub/testfloodsub.nim | 54 ++++++++++++++++++++++++++ tests/pubsub/testgossipinternal.nim | 2 +- tests/pubsub/utils.nim | 5 ++- 5 files changed, 80 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 53894acc3..ecfec828e 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -107,6 +107,14 @@ type anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to + maxMessageSize*: int ##\ + ## the maximum raw message size we'll globally allow + ## for finer tuning, check message size on topic validator + ## + ## sending a big message to a peer with a lower size limit can + ## lead to issues, from descoring to connection drops + ## + ## defaults to 1mB rng*: ref BrHmacDrbgContext knownTopics*: HashSet[string] @@ -285,7 +293,7 @@ proc getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0]) + let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer @@ -540,6 +548,7 @@ proc init*[PubParams: object | bool]( sign: bool = true, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, subscriptionValidator: SubscriptionValidator = nil, + maxMessageSize: int = 1024 * 1024, rng: ref BrHmacDrbgContext = newRng(), parameters: PubParams = false): P {.raises: [Defect, InitializationError].} = @@ -553,6 +562,7 @@ proc init*[PubParams: object | bool]( sign: sign, msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, + maxMessageSize: maxMessageSize, rng: rng, topicsHigh: int.high) else: @@ -565,6 +575,7 @@ proc init*[PubParams: object | bool]( msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, parameters: parameters, + maxMessageSize: maxMessageSize, rng: rng, topicsHigh: int.high) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a10cd159e..af301f09b 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -60,6 +60,7 @@ type score*: float64 iWantBudget*: int iHaveBudget*: int + maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score @@ -119,7 +120,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = while not conn.atEof: trace "waiting for data", conn, peer = p, closed = conn.closed - var data = await conn.readLp(64 * 1024) + var data = await conn.readLp(p.maxMessageSize) trace "read data from peer", conn, peer = p, closed = conn.closed, data = data.shortLog @@ -243,6 +244,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} = debug "empty message, skipping", p, msg = shortLog(msg) return + if msg.len > p.maxMessageSize: + info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + return + let conn = p.sendConn if conn == nil or conn.closed(): trace "No send connection, skipping message", p, msg = shortLog(msg) @@ -280,7 +285,8 @@ proc new*( getConn: GetConn, dropConn: DropConn, onEvent: OnEvent, - codec: string): T = + codec: string, + maxMessageSize: int): T = T( getConn: getConn, @@ -288,4 +294,5 @@ proc new*( onEvent: onEvent, codec: codec, peerId: peerId, + maxMessageSize: maxMessageSize ) diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 5e167be17..12bb288bb 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -382,3 +382,57 @@ suite "FloodSub": it.switch.stop()))) await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub message size validation": + var messageReceived = 0 + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check data.len < 50 + inc(messageReceived) + + let + bigNode = generateNodes(1) + smallNode = generateNodes(1, maxMessageSize = 200) + + # start switches + nodesFut = await allFinished( + bigNode[0].switch.start(), + smallNode[0].switch.start(), + ) + + # start pubsubcon + await allFuturesThrowing( + allFinished( + bigNode[0].start(), + smallNode[0].start(), + )) + + await subscribeNodes(bigNode & smallNode) + bigNode[0].subscribe("foo", handler) + smallNode[0].subscribe("foo", handler) + await waitSub(bigNode[0], smallNode[0], "foo") + + let + bigMessage = newSeq[byte](1000) + smallMessage1 = @[1.byte] + smallMessage2 = @[3.byte] + + # Need two different messages, otherwise they are the same when anonymized + check (await smallNode[0].publish("foo", smallMessage1)) > 0 + check (await bigNode[0].publish("foo", smallMessage2)) > 0 + + check (await checkExpiring(messageReceived == 2)) == true + + check (await smallNode[0].publish("foo", bigMessage)) > 0 + check (await bigNode[0].publish("foo", bigMessage)) > 0 + + await allFuturesThrowing( + smallNode[0].switch.stop(), + bigNode[0].switch.stop() + ) + + await allFuturesThrowing( + smallNode[0].stop(), + bigNode[0].stop() + ) + + await allFuturesThrowing(nodesFut) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index d8ee5c583..450496aef 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -25,7 +25,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer = proc dropConn(peer: PubSubPeer) = discard # we don't care about it here yet - let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec) + let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec, 1024 * 1024) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 4d5914963..452edd412 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -26,7 +26,8 @@ proc generateNodes*( triggerSelf: bool = false, verifySignature: bool = libp2p_pubsub_verify, anonymize: bool = libp2p_pubsub_anonymize, - sign: bool = libp2p_pubsub_sign): seq[PubSub] = + sign: bool = libp2p_pubsub_sign, + maxMessageSize: int = 1024 * 1024): seq[PubSub] = for i in 0..