diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index f356d2f1e..23906baaa 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -126,8 +126,9 @@ suite "Waku Filter - End to End": asyncTest "Subscribing to an empty content topic": # When subscribing to an empty content topic - let subscribeResponse = - await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()) + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]() + ) # Then the subscription is not successful check: @@ -1838,8 +1839,9 @@ suite "Waku Filter - End to End": wakuFilter.subscriptions.isSubscribed(clientPeerId) # When unsubscribing from an empty content topic - let unsubscribeResponse = - await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()) + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]() + ) # Then the unsubscription is not successful check: @@ -2076,10 +2078,11 @@ suite "Waku Filter - End to End": contentTopic = contentTopic, payload = getByteSequence(100 * 1024) ) # 100KiB msg4 = fakeWakuMessage( - contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024) + contentTopic = contentTopic, + payload = getByteSequence(DefaultMaxPushSize - 1024), ) # Max Size (Inclusive Limit) msg5 = fakeWakuMessage( - contentTopic = contentTopic, payload = getByteSequence(MaxPushSize) + contentTopic = contentTopic, payload = getByteSequence(DefaultMaxPushSize) ) # Max Size (Exclusive Limit) # When sending the 1KiB message diff --git a/tests/waku_lightpush/test_client.nim b/tests/waku_lightpush/test_client.nim index 337e1e09e..a49361c98 100644 --- a/tests/waku_lightpush/test_client.nim +++ b/tests/waku_lightpush/test_client.nim @@ -43,6 +43,9 @@ suite "Waku Lightpush Client": handler = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} = + let msgLen = message.encode().buffer.len + if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024: + return err("length greater than maxMessageSize") handlerFuture.complete((pubsubTopic, message)) return ok() @@ -209,11 +212,11 @@ suite "Waku Lightpush Client": ) # 100KiB message4 = fakeWakuMessage( contentTopic = contentTopic, - payload = getByteSequence(MaxRpcSize - overheadBytes - 1), + payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1), ) # Inclusive Limit message5 = fakeWakuMessage( contentTopic = contentTopic, - payload = getByteSequence(MaxRpcSize - overheadBytes), + payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024), ) # Exclusive Limit # When publishing the 1KiB payload diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index db2f441f5..3daa4037e 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -354,7 +354,7 @@ suite "Waku Peer Exchange": var buffer: seq[byte] await conn.writeLP(rpc.encode().buffer) - buffer = await conn.readLp(MaxRpcSize.int) + buffer = await conn.readLp(DefaultMaxRpcSize.int) # Decode the response let decodedBuff = PeerExchangeRpc.decode(buffer) diff --git a/tests/waku_relay/test_protocol.nim b/tests/waku_relay/test_protocol.nim index 410528c2a..df9bcacb5 100644 --- a/tests/waku_relay/test_protocol.nim +++ b/tests/waku_relay/test_protocol.nim @@ -1042,14 +1042,15 @@ suite "Waku Relay": ) # 100KiB msg4 = fakeWakuMessage( contentTopic = contentTopic, - payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 38), + payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 38), ) # Max Size (Inclusive Limit) msg5 = fakeWakuMessage( contentTopic = contentTopic, - payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 37), + payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 37), ) # Max Size (Exclusive Limit) msg6 = fakeWakuMessage( - contentTopic = contentTopic, payload = getByteSequence(MaxWakuMessageSize) + contentTopic = contentTopic, + payload = getByteSequence(DefaultMaxWakuMessageSize), ) # MaxWakuMessageSize -> Out of Max Size # Notice that the message is wrapped with more data in https://github.com/status-im/nim-libp2p/blob/3011ba4326fa55220a758838835797ff322619fc/libp2p/protocols/pubsub/gossipsub.nim#L627-L632 @@ -1092,7 +1093,7 @@ suite "Waku Relay": (pubsubTopic, msg3) == handlerFuture.read() (pubsubTopic, msg3) == otherHandlerFuture.read() - # When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 38' message + # When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 38' message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg4) @@ -1104,7 +1105,7 @@ suite "Waku Relay": (pubsubTopic, msg4) == handlerFuture.read() (pubsubTopic, msg4) == otherHandlerFuture.read() - # When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 37' message + # When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 37' message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg5) @@ -1115,7 +1116,7 @@ suite "Waku Relay": not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg5) == handlerFuture.read() - # When sending the 'MaxWakuMessageSize' message + # When sending the 'DefaultMaxWakuMessageSize' message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg6) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index ab60be098..36bea003d 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -596,7 +596,7 @@ suite "Waku v2 Rest API - Relay": let response = await client.relayPostMessagesV1( DefaultPubsubTopic, RelayWakuMessage( - payload: base64.encode(getByteSequence(MaxWakuMessageSize)), + payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)), # Message will be bigger than the max size contentTopic: some(DefaultContentTopic), timestamp: some(int64(2022)), @@ -608,7 +608,7 @@ suite "Waku v2 Rest API - Relay": response.status == 400 $response.contentType == $MIMETYPE_TEXT response.data == - fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes" + fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" await restServer.stop() await restServer.closeWait() @@ -657,7 +657,7 @@ suite "Waku v2 Rest API - Relay": # When let response = await client.relayPostAutoMessagesV1( RelayWakuMessage( - payload: base64.encode(getByteSequence(MaxWakuMessageSize)), + payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)), # Message will be bigger than the max size contentTopic: some(DefaultContentTopic), timestamp: some(int64(2022)), @@ -669,7 +669,7 @@ suite "Waku v2 Rest API - Relay": response.status == 400 $response.contentType == $MIMETYPE_TEXT response.data == - fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes" + fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes" await restServer.stop() await restServer.closeWait() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e132eb8de..7cb65ba22 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -388,7 +388,7 @@ proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], peerExchangeHandler = none(RoutingRecordsHandler), - maxMessageSize = int(MaxWakuMessageSize), + maxMessageSize = int(DefaultMaxWakuMessageSize), ) {.async, gcsafe.} = if not node.wakuRelay.isNil(): error "wakuRelay already mounted, skipping" diff --git a/waku/waku_core/message/default_values.nim b/waku/waku_core/message/default_values.nim index 1f78d6a08..101e50c01 100644 --- a/waku/waku_core/message/default_values.nim +++ b/waku/waku_core/message/default_values.nim @@ -3,6 +3,6 @@ import ../../common/utils/parse_size_units const ## https://rfc.vac.dev/spec/64/#message-size DefaultMaxWakuMessageSizeStr* = "150KiB" # Remember that 1 MiB is the PubSub default - MaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr) + DefaultMaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr) DefaultSafetyBufferProtocolOverhead* = 64 * 1024 # overhead measured in bytes diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 7f1b390df..5a772b130 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -41,7 +41,7 @@ proc sendSubscribeRequest( # TODO: this can raise an exception await connection.writeLP(filterSubscribeRequest.encode().buffer) - let respBuf = await connection.readLp(MaxSubscribeResponseSize) + let respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize) let respDecodeRes = FilterSubscribeResponse.decode(respBuf) if respDecodeRes.isErr(): trace "Failed to decode filter subscribe response", servicePeer @@ -79,7 +79,7 @@ proc subscribe*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, - contentTopics: ContentTopic|seq[ContentTopic], + contentTopics: ContentTopic | seq[ContentTopic], ): Future[FilterSubscribeResult] {.async.} = var contentTopicSeq: seq[ContentTopic] when contentTopics is seq[ContentTopic]: @@ -98,14 +98,14 @@ proc unsubscribe*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, - contentTopics: ContentTopic|seq[ContentTopic], + contentTopics: ContentTopic | seq[ContentTopic], ): Future[FilterSubscribeResult] {.async.} = var contentTopicSeq: seq[ContentTopic] when contentTopics is seq[ContentTopic]: contentTopicSeq = contentTopics else: contentTopicSeq = @[contentTopics] - + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq @@ -127,7 +127,7 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) = proc initProtocolHandler(wfc: WakuFilterClient) = proc handler(conn: Connection, proto: string) {.async.} = - let buf = await conn.readLp(int(MaxPushSize)) + let buf = await conn.readLp(int(DefaultMaxPushSize)) let decodeRes = MessagePush.decode(buf) if decodeRes.isErr(): @@ -152,4 +152,4 @@ proc new*( ): T = let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[]) wfc.initProtocolHandler() - wfc \ No newline at end of file + wfc diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 59418ec68..11dcca6c8 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -249,7 +249,7 @@ proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async.} = trace "filter subscribe request handler triggered", peerId = conn.peerId - let buf = await conn.readLp(int(MaxSubscribeSize)) + let buf = await conn.readLp(int(DefaultMaxSubscribeSize)) let decodeRes = FilterSubscribeRequest.decode(buf) if decodeRes.isErr(): diff --git a/waku/waku_filter_v2/rpc_codec.nim b/waku/waku_filter_v2/rpc_codec.nim index 8890a99c3..0705e6bc8 100644 --- a/waku/waku_filter_v2/rpc_codec.nim +++ b/waku/waku_filter_v2/rpc_codec.nim @@ -7,10 +7,10 @@ import std/options import ../common/protobuf, ../waku_core, ./rpc const - MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64 * 1024 + DefaultMaxSubscribeSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead - MaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer. - MaxPushSize* = 10 * MaxWakuMessageSize + 64 * 1024 + DefaultMaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer. + DefaultMaxPushSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer = diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index e46822a23..45c09353f 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -39,7 +39,7 @@ proc sendPushRequest( var buffer: seq[byte] try: - buffer = await connection.readLp(MaxRpcSize.int) + buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: return err("Exception reading: " & getCurrentExceptionMsg()) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 9cec6f7f4..16be3beb5 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -79,7 +79,7 @@ proc handleRequest*( proc initProtocolHandler(wl: WakuLightPush) = proc handle(conn: Connection, proto: string) {.async.} = - let buffer = await conn.readLp(MaxRpcSize.int) + let buffer = await conn.readLp(DefaultMaxRpcSize) let rpc = await handleRequest(wl, conn.peerId, buffer) await conn.writeLp(rpc.encode().buffer) diff --git a/waku/waku_lightpush/rpc_codec.nim b/waku/waku_lightpush/rpc_codec.nim index 3fa5b504d..d2e37e495 100644 --- a/waku/waku_lightpush/rpc_codec.nim +++ b/waku/waku_lightpush/rpc_codec.nim @@ -6,8 +6,7 @@ else: import std/options import ../common/protobuf, ../waku_core, ./rpc -const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 - # We add a 64kB safety buffer for protocol overhead +const DefaultMaxRpcSize* = -1 proc encode*(rpc: PushRequest): ProtoBuffer = var pb = initProtoBuffer() diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 3f26eb7a7..c7d750751 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -29,7 +29,7 @@ logScope: const # We add a 64kB safety buffer for protocol overhead. # 10x-multiplier also for safety - MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 + DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... MaxPeersCacheSize = 60 CacheRefreshInterval = 15.minutes @@ -61,7 +61,7 @@ proc request*( var error: string try: await conn.writeLP(rpc.encode().buffer) - buffer = await conn.readLp(MaxRpcSize.int) + buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) error = $exc.msg @@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = var buffer: seq[byte] try: - buffer = await conn.readLp(MaxRpcSize.int) + buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) return diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index b2aa6bb5e..f2537c58a 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -151,7 +151,7 @@ proc initProtocolHandler(w: WakuRelay) = w.codec = WakuRelayCodec proc new*( - T: type WakuRelay, switch: Switch, maxMessageSize = int(MaxWakuMessageSize) + T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize) ): WakuRelayResult[T] = ## maxMessageSize: max num bytes that are allowed for the WakuMessage diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 3d04a7f99..b0fac6fd2 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -49,7 +49,9 @@ proc sendHistoryQueryRPC( let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC())) await connection.writeLP(reqRpc.encode().buffer) - let buf = await connection.readLp(MaxRpcSize.int) + #TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail. + # Need to find a workaround for this. + let buf = await connection.readLp(DefaultMaxRpcSize.int) let respDecodeRes = HistoryRPC.decode(buf) if respDecodeRes.isErr(): waku_store_errors.inc(labelValues = [decodeRpcFailure]) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 7f7118dab..6818d4213 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -46,7 +46,7 @@ type WakuStore* = ref object of LPProtocol proc initProtocolHandler(ws: WakuStore) = proc handler(conn: Connection, proto: string) {.async.} = - let buf = await conn.readLp(MaxRpcSize.int) + let buf = await conn.readLp(DefaultMaxRpcSize.int) let decodeRes = HistoryRPC.decode(buf) if decodeRes.isErr(): diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index 9d0e281a4..2d5867e00 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -6,8 +6,7 @@ else: import std/options, nimcrypto/hash import ../common/[protobuf, paging], ../waku_core, ./common, ./rpc -const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64 * 1024 - # We add a 64kB safety buffer for protocol overhead +const DefaultMaxRpcSize* = -1 ## Pagination