chore: message.nim - set max message size to 150KiB according to spec (#2298)

* message.nim: set max message size to 150KiB according to spec

Using KiB instead of KB because that seems more aligned with
the actual default defined in nim-libp2p (1024 * 1024)

Spec details: https://rfc.vac.dev/spec/64/#message-size

* test_protocol.nim: align test to current WakuMessage limit
* test_waku_client.nim: adapt test to MaxWakuMessageSize change
* make maxMessageSize configurable for wakunode2
* wakunode2 app now accepts max-num-bytes-msg-size with KiB, KB, or B units
* testlib/wakunode.nim: set maxMessageSize: "1024 KiB"
* test_waku_client.nim: remove duplicate check in "Valid Payload Sizes"
* set DefaultMaxWakuMessageSizeStr as the only source of truth
* external_config.nim: rename max-num-bytes-msg-size -> max-msg-size
This commit is contained in:
Ivan FB 2024-01-03 13:11:50 +01:00 committed by GitHub
parent ebad0385ef
commit ed09074cc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 228 additions and 35 deletions

View File

@ -21,6 +21,7 @@ import
metrics/chronos_httpserver
import
../../waku/common/utils/nat,
../../waku/common/utils/parse_size_units,
../../waku/common/databases/db_sqlite,
../../waku/waku_archive/driver/builder,
../../waku/waku_archive/retention_policy/builder,
@ -441,8 +442,14 @@ proc setupProtocols(node: WakuNode,
else:
conf.topics
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
debug "Setting max message size", num_bytes=parsedMaxMsgSize
try:
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler,
int(parsedMaxMsgSize))
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())

View File

@ -20,6 +20,9 @@ import
../../waku/waku_enr,
../../waku/node/peer_manager
include
../../waku/waku_core/message/default_values
export
confTomlDefs,
confTomlNet,
@ -77,6 +80,11 @@ type
defaultValue: "",
name: "rln-relay-eth-private-key" }: string
maxMessageSize* {.
desc: "Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc."
defaultValue: DefaultMaxWakuMessageSizeStr
name: "max-msg-size" }: string
case cmd* {.
command
defaultValue: noCommand }: StartUpCommand
@ -86,7 +94,6 @@ type
desc: "Runs the registration function on-chain. By default, a dry-run will occur",
defaultValue: false,
name: "execute" .}: bool
of noCommand:
## Application-level configuration

View File

@ -4,4 +4,5 @@ import
./test_enr_builder,
./test_envvar_serialization,
./test_protobuf_validation,
./test_sqlite_migrations
./test_sqlite_migrations,
./test_parse_size

View File

@ -0,0 +1,107 @@
{.used.}
import
testutils/unittests,
stew/results
import
../../waku/common/utils/parse_size_units
suite "Size serialization test":
test "parse normal sizes":
var sizeInBytesRes = parseMsgSize("15 KiB")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 15360
sizeInBytesRes = parseMsgSize(" 1048576 B")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1048576
sizeInBytesRes = parseMsgSize("150 B")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 150
sizeInBytesRes = parseMsgSize("150 b")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 150
sizeInBytesRes = parseMsgSize("150b")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 150
sizeInBytesRes = parseMsgSize("1024kib")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1048576
sizeInBytesRes = parseMsgSize("1024KiB")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1048576
sizeInBytesRes = parseMsgSize("1024KB")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1024000
sizeInBytesRes = parseMsgSize("1024kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1024000
sizeInBytesRes = parseMsgSize("1.5 kib")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1536
sizeInBytesRes = parseMsgSize("1,5 kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1500
sizeInBytesRes = parseMsgSize("0,5 kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 500
sizeInBytesRes = parseMsgSize("1.5 kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1500
sizeInBytesRes = parseMsgSize("0.5 kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 500
sizeInBytesRes = parseMsgSize(" 1.5 KB")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 1500
sizeInBytesRes = parseMsgSize(" 0.5 kb")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == 500
sizeInBytesRes = parseMsgSize(" 1024 kib")
assert sizeInBytesRes.isOk(), sizeInBytesRes.error
check sizeInBytesRes.get() == uint64(1024 * 1024)
test "parse wrong sizes":
var sizeInBytesRes = parseMsgSize("150K")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150 iB")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150 ib")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150 MB")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
## notice that we don't allow MB units explicitly. If someone want to set 1MiB, the
## s/he should use 1024 KiB
sizeInBytesRes = parseMsgSize("150 MiB")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150MiB")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150K")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("150 K")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"
sizeInBytesRes = parseMsgSize("15..0 KiB")
assert sizeInBytesRes.isErr(), "The size should be considered incorrect"

View File

@ -70,7 +70,7 @@ proc getSampleJsonList*(): JsonNode =
]
proc getByteSequence*(bytesNumber: int): seq[byte] =
proc getByteSequence*(bytesNumber: uint64): seq[byte] =
result = newSeq[byte](bytesNumber)
for i in 0 ..< bytesNumber:
result[i] = cast[byte](i mod 256)

View File

@ -33,7 +33,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
nat: "any",
maxConnections: 50,
topics: @[],
relay: true
relay: true,
maxMessageSize: "1024 KiB"
)
proc newTestWakuNode*(nodeKey: crypto.PrivateKey,

View File

@ -21,7 +21,8 @@ import
import
../../../waku/[
node/peer_manager,
waku_core
waku_core,
waku_filter/rpc_codec
],
../../../waku/waku_filter_v2/[
common,
@ -2009,9 +2010,8 @@ suite "Waku Filter - End to End":
msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB
msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB
msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 968)) # 4MiB - 56B -> Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 969)) # 4MiB - 55B -> Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 970)) # 4MiB - 54B -> Out of Max Size
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(MaxRpcSize - 1024)) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(MaxRpcSize)) # Max Size (Exclusive Limit)
# When sending the 1KiB message
await wakuFilter.handleMessage(pubsubTopic, msg1)
@ -2045,7 +2045,7 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic3 == pubsubTopic
pushedMsg3 == msg3
# When sending the 4MiB - 56B message
# When sending the MaxRpcSize - 1024B message
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic, msg4)
@ -2056,20 +2056,13 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic4 == pubsubTopic
pushedMsg4 == msg4
# When sending the 4MiB - 55B message
# When sending the MaxRpcSize message
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic, msg5)
# Then the message is not pushed to the client
check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
# When sending the 4MiB - 54B message
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
await wakuFilter.handleMessage(pubsubTopic, msg6)
# Then the message is not pushed to the client
check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
suite "Security and Privacy":
asyncTest "Filter Client can receive messages after Client and Server reboot":
# Given a clean client and server

View File

@ -23,7 +23,8 @@ import
node/peer_manager,
waku_relay/protocol,
waku_relay,
waku_core
waku_core,
waku_core/message/codec
],
../testlib/[
wakucore,
@ -1003,13 +1004,22 @@ suite "Waku Relay":
await sleepAsync(500.millis)
# Given some valid payloads
let
msgWithoutPayload =
fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(0))
sizeEmptyMsg = uint64(msgWithoutPayload.encode().buffer.len)
let
msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB
msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB
msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1023*1024 + 933)) # 1MiB - 91B -> Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1023*1024 + 934)) # 1MiB - 90B -> Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Out of Max Size
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 38)) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 37)) # Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(MaxWakuMessageSize)) # MaxWakuMessageSize -> Out of Max Size
# Notice that the message is wrapped with more data in https://github.com/status-im/nim-libp2p/blob/3011ba4326fa55220a758838835797ff322619fc/libp2p/protocols/pubsub/gossipsub.nim#L627-L632
# And therefore, we need to substract a hard-coded values above (for msg4 & msg5), obtained empirically,
# running the tests with 'TRACE' level: nim c -r -d:chronicles_log_level=DEBUG -d:release -d:postgres -d:rln --passL:librln_v0.3.4.a --passL:-lm -d:nimDebugDlOpen tests/waku_relay/test_protocol.nim test "Valid Payload Sizes"
# When sending the 1KiB message
handlerFuture = newPushHandlerFuture()
@ -1047,7 +1057,7 @@ suite "Waku Relay":
(pubsubTopic, msg3) == handlerFuture.read()
(pubsubTopic, msg3) == otherHandlerFuture.read()
# When sending the 1023KiB + 933B message
# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 38' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg4)
@ -1059,7 +1069,7 @@ suite "Waku Relay":
(pubsubTopic, msg4) == handlerFuture.read()
(pubsubTopic, msg4) == otherHandlerFuture.read()
# When sending the 1023KiB + 934B message
# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 37' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg5)
@ -1070,7 +1080,7 @@ suite "Waku Relay":
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
(pubsubTopic, msg5) == handlerFuture.read()
# When sending the 1MiB
# When sending the 'MaxWakuMessageSize' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg6)

View File

@ -0,0 +1,53 @@
import
std/strutils,
stew/results,
regex
proc parseMsgSize*(input: string): Result[uint64, string] =
## Parses size strings such as "1.2 KiB" or "3Kb" and returns the equivalent number of bytes
## if the parse task goes well. If not, it returns an error describing the problem.
const RegexDef = """\s*(\d+([\,\.]\d*)?)\s*([Kk]{0,1}[i]?[Bb]{1})"""
const RegexParseSize = re2(RegexDef)
var m: RegexMatch2
if input.match(RegexParseSize, m) == false:
return err("error in parseSize. regex is not matching: " & RegexDef)
var value: float
try:
value = parseFloat(input[m.captures[0]].replace(",", "."))
except ValueError:
return err("invalid size in parseSize: " & getCurrentExceptionMsg() &
" error parsing: " & input[m.captures[0]] & " KKK : " & $m)
let units = input[m.captures[2]].toLowerAscii() # units is "kib", or "kb", or "b".
var multiplier: float
case units:
of "kb":
multiplier = 1000
of "kib":
multiplier = 1024
of "ib":
return err("wrong units. ib or iB aren't allowed.")
else: ## bytes
multiplier = 1
value = value * multiplier
return ok(uint64(value))
proc parseCorrectMsgSize*(input: string): uint64 =
## This proc always returns an int and wraps the following proc:
##
## proc parseMsgSize*(input: string): Result[int, string] = ...
##
## in case of error, it just returns 0, and this is expected to
## be called only from a controlled and well-known inputs
let ret = parseMsgSize(input).valueOr:
return 0
return ret

View File

@ -369,7 +369,8 @@ proc startRelay*(node: WakuNode) {.async.} =
proc mountRelay*(node: WakuNode,
pubsubTopics: seq[string] = @[],
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(MaxWakuMessageSize)) {.async, gcsafe.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
return
@ -377,7 +378,7 @@ proc mountRelay*(node: WakuNode,
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info "mounting relay protocol"
let initRes = WakuRelay.new(node.switch)
let initRes = WakuRelay.new(node.switch, maxMessageSize)
if initRes.isErr():
error "failed mounting relay protocol", error=initRes.error
return

View File

@ -1,9 +1,11 @@
import
./message/message,
./message/default_values,
./message/codec,
./message/digest
export
message,
default_values,
codec,
digest

View File

@ -0,0 +1,10 @@
import
../../common/utils/parse_size_units
const
## https://rfc.vac.dev/spec/64/#message-size
DefaultMaxWakuMessageSizeStr* = "150KiB" # Remember that 1 MiB is the PubSub default
MaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)
DefaultSafetyBufferProtocolOverhead* = 64 * 1024 # overhead measured in bytes

View File

@ -11,14 +11,12 @@ else:
import
../topics,
../time
../time,
./default_values
const
MaxMetaAttrLength* = 64 # 64 bytes
MaxWakuMessageSize* = 1024 * 1024 # 1 Mibytes. Corresponds to PubSub default
type WakuMessage* = object
# Data payload transmitted.
payload*: seq[byte]

View File

@ -128,7 +128,7 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =
proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxPushSize)
let buf = await conn.readLp(int(MaxPushSize))
let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():

View File

@ -226,7 +226,7 @@ proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peerId=conn.peerId
let buf = await conn.readLp(MaxSubscribeSize)
let buf = await conn.readLp(int(MaxSubscribeSize))
let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():

View File

@ -151,7 +151,10 @@ proc initProtocolHandler(w: WakuRelay) =
w.handler = handler
w.codec = WakuRelayCodec
proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] =
proc new*(T: type WakuRelay,
switch: Switch,
maxMessageSize = int(MaxWakuMessageSize)): WakuRelayResult[T] =
## maxMessageSize: max num bytes that are allowed for the WakuMessage
var w: WakuRelay
try:
@ -162,7 +165,7 @@ proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] =
sign = false,
triggerSelf = true,
msgIdProvider = defaultMessageIdProvider,
maxMessageSize = MaxWakuMessageSize,
maxMessageSize = maxMessageSize,
parameters = GossipsubParameters
)