Add maxMessageSize option to pubsub (#634)

* Add maxMessageSize option to pubsub
* Switched default to 1mb
This commit is contained in:
Tanguy 2021-10-25 12:58:38 +02:00 committed by GitHub
parent 846baf3853
commit 5885e03680
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 5 deletions

View File

@ -107,6 +107,14 @@ type
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to
maxMessageSize*: int ##\
## the maximum raw message size we'll globally allow
## for finer tuning, check message size on topic validator
##
## sending a big message to a peer with a lower size limit can
## lead to issues, from descoring to connection drops
##
## defaults to 1mB
rng*: ref BrHmacDrbgContext
knownTopics*: HashSet[string]
@ -285,7 +293,7 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)
# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0])
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer
@ -540,6 +548,7 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
rng: ref BrHmacDrbgContext = newRng(),
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
@ -553,6 +562,7 @@ proc init*[PubParams: object | bool](
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high)
else:
@ -565,6 +575,7 @@ proc init*[PubParams: object | bool](
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high)

View File

@ -60,6 +60,7 @@ type
score*: float64
iWantBudget*: int
iHaveBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
@ -119,7 +120,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed
var data = await conn.readLp(64 * 1024)
var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
@ -243,6 +244,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
debug "empty message, skipping", p, msg = shortLog(msg)
return
if msg.len > p.maxMessageSize:
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
let conn = p.sendConn
if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg = shortLog(msg)
@ -280,7 +285,8 @@ proc new*(
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string): T =
codec: string,
maxMessageSize: int): T =
T(
getConn: getConn,
@ -288,4 +294,5 @@ proc new*(
onEvent: onEvent,
codec: codec,
peerId: peerId,
maxMessageSize: maxMessageSize
)

View File

@ -382,3 +382,57 @@ suite "FloodSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub message size validation":
var messageReceived = 0
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check data.len < 50
inc(messageReceived)
let
bigNode = generateNodes(1)
smallNode = generateNodes(1, maxMessageSize = 200)
# start switches
nodesFut = await allFinished(
bigNode[0].switch.start(),
smallNode[0].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
bigNode[0].start(),
smallNode[0].start(),
))
await subscribeNodes(bigNode & smallNode)
bigNode[0].subscribe("foo", handler)
smallNode[0].subscribe("foo", handler)
await waitSub(bigNode[0], smallNode[0], "foo")
let
bigMessage = newSeq[byte](1000)
smallMessage1 = @[1.byte]
smallMessage2 = @[3.byte]
# Need two different messages, otherwise they are the same when anonymized
check (await smallNode[0].publish("foo", smallMessage1)) > 0
check (await bigNode[0].publish("foo", smallMessage2)) > 0
check (await checkExpiring(messageReceived == 2)) == true
check (await smallNode[0].publish("foo", bigMessage)) > 0
check (await bigNode[0].publish("foo", bigMessage)) > 0
await allFuturesThrowing(
smallNode[0].switch.stop(),
bigNode[0].switch.stop()
)
await allFuturesThrowing(
smallNode[0].stop(),
bigNode[0].stop()
)
await allFuturesThrowing(nodesFut)

View File

@ -25,7 +25,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
proc dropConn(peer: PubSubPeer) =
discard # we don't care about it here yet
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec)
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec, 1024 * 1024)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer

View File

@ -26,7 +26,8 @@ proc generateNodes*(
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
sign: bool = libp2p_pubsub_sign,
maxMessageSize: int = 1024 * 1024): seq[PubSub] =
for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
@ -38,6 +39,7 @@ proc generateNodes*(
sign = sign,
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p))
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
@ -51,6 +53,7 @@ proc generateNodes*(
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider,
maxMessageSize = maxMessageSize,
anonymize = anonymize).PubSub
switch.mount(pubsub)