Fix: big messages in pubsub (#808)
This commit is contained in:
parent
c43aacdc81
commit
1711c204ea
|
@ -304,14 +304,15 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
|
||||||
if ? pb.getRepeatedField(2, msgpbs):
|
if ? pb.getRepeatedField(2, msgpbs):
|
||||||
trace "decodeMessages: read messages", count = len(msgpbs)
|
trace "decodeMessages: read messages", count = len(msgpbs)
|
||||||
for item in msgpbs:
|
for item in msgpbs:
|
||||||
msgs.add(? decodeMessage(initProtoBuffer(item)))
|
# size is constrained at the network level
|
||||||
|
msgs.add(? decodeMessage(initProtoBuffer(item, maxSize = uint.high)))
|
||||||
else:
|
else:
|
||||||
trace "decodeMessages: no messages found"
|
trace "decodeMessages: no messages found"
|
||||||
ok(msgs)
|
ok(msgs)
|
||||||
|
|
||||||
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
||||||
trace "encodeRpcMsg: encoding message", msg = msg.shortLog()
|
trace "encodeRpcMsg: encoding message", msg = msg.shortLog()
|
||||||
var pb = initProtoBuffer()
|
var pb = initProtoBuffer(maxSize = uint.high)
|
||||||
for item in msg.subscriptions:
|
for item in msg.subscriptions:
|
||||||
pb.write(1, item)
|
pb.write(1, item)
|
||||||
for item in msg.messages:
|
for item in msg.messages:
|
||||||
|
@ -324,7 +325,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
||||||
|
|
||||||
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
|
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
|
||||||
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
|
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
|
||||||
var pb = initProtoBuffer(msg)
|
var pb = initProtoBuffer(msg, maxSize = uint.high)
|
||||||
var rpcMsg = ok(RPCMsg())
|
var rpcMsg = ok(RPCMsg())
|
||||||
assign(rpcMsg.get().messages, ? pb.decodeMessages())
|
assign(rpcMsg.get().messages, ? pb.decodeMessages())
|
||||||
assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions())
|
assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions())
|
||||||
|
|
|
@ -362,3 +362,35 @@ suite "FloodSub":
|
||||||
)
|
)
|
||||||
|
|
||||||
await allFuturesThrowing(nodesFut)
|
await allFuturesThrowing(nodesFut)
|
||||||
|
|
||||||
|
asyncTest "FloodSub message size validation 2":
|
||||||
|
var messageReceived = 0
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
inc(messageReceived)
|
||||||
|
|
||||||
|
let
|
||||||
|
bigNode1 = generateNodes(1, maxMessageSize = 20000000)
|
||||||
|
bigNode2 = generateNodes(1, maxMessageSize = 20000000)
|
||||||
|
|
||||||
|
# start switches
|
||||||
|
nodesFut = await allFinished(
|
||||||
|
bigNode1[0].switch.start(),
|
||||||
|
bigNode2[0].switch.start(),
|
||||||
|
)
|
||||||
|
|
||||||
|
await subscribeNodes(bigNode1 & bigNode2)
|
||||||
|
bigNode2[0].subscribe("foo", handler)
|
||||||
|
await waitSub(bigNode1[0], bigNode2[0], "foo")
|
||||||
|
|
||||||
|
let bigMessage = newSeq[byte](19000000)
|
||||||
|
|
||||||
|
check (await bigNode1[0].publish("foo", bigMessage)) > 0
|
||||||
|
|
||||||
|
checkExpiring: messageReceived == 1
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
bigNode1[0].switch.stop(),
|
||||||
|
bigNode2[0].switch.stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
await allFuturesThrowing(nodesFut)
|
||||||
|
|
Loading…
Reference in New Issue