mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 00:13:06 +00:00
fix: don't use WakuMessageSize in req/resp protocols (#2601)
* fix: don't use WakuMessageSize in req/resp protocols
This commit is contained in:
parent
1ba9df4be0
commit
e61e4ff90a
@ -126,8 +126,9 @@ suite "Waku Filter - End to End":
|
||||
|
||||
asyncTest "Subscribing to an empty content topic":
|
||||
# When subscribing to an empty content topic
|
||||
let subscribeResponse =
|
||||
await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
|
||||
let subscribeResponse = await wakuFilterClient.subscribe(
|
||||
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
|
||||
)
|
||||
|
||||
# Then the subscription is not successful
|
||||
check:
|
||||
@ -1838,8 +1839,9 @@ suite "Waku Filter - End to End":
|
||||
wakuFilter.subscriptions.isSubscribed(clientPeerId)
|
||||
|
||||
# When unsubscribing from an empty content topic
|
||||
let unsubscribeResponse =
|
||||
await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
|
||||
let unsubscribeResponse = await wakuFilterClient.unsubscribe(
|
||||
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
|
||||
)
|
||||
|
||||
# Then the unsubscription is not successful
|
||||
check:
|
||||
@ -2076,10 +2078,11 @@ suite "Waku Filter - End to End":
|
||||
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
|
||||
) # 100KiB
|
||||
msg4 = fakeWakuMessage(
|
||||
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024)
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(DefaultMaxPushSize - 1024),
|
||||
) # Max Size (Inclusive Limit)
|
||||
msg5 = fakeWakuMessage(
|
||||
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize)
|
||||
contentTopic = contentTopic, payload = getByteSequence(DefaultMaxPushSize)
|
||||
) # Max Size (Exclusive Limit)
|
||||
|
||||
# When sending the 1KiB message
|
||||
|
||||
@ -43,6 +43,9 @@ suite "Waku Lightpush Client":
|
||||
handler = proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
let msgLen = message.encode().buffer.len
|
||||
if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024:
|
||||
return err("length greater than maxMessageSize")
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
@ -209,11 +212,11 @@ suite "Waku Lightpush Client":
|
||||
) # 100KiB
|
||||
message4 = fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxRpcSize - overheadBytes - 1),
|
||||
payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1),
|
||||
) # Inclusive Limit
|
||||
message5 = fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxRpcSize - overheadBytes),
|
||||
payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024),
|
||||
) # Exclusive Limit
|
||||
|
||||
# When publishing the 1KiB payload
|
||||
|
||||
@ -354,7 +354,7 @@ suite "Waku Peer Exchange":
|
||||
|
||||
var buffer: seq[byte]
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
buffer = await conn.readLp(MaxRpcSize.int)
|
||||
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
||||
|
||||
# Decode the response
|
||||
let decodedBuff = PeerExchangeRpc.decode(buffer)
|
||||
|
||||
@ -1042,14 +1042,15 @@ suite "Waku Relay":
|
||||
) # 100KiB
|
||||
msg4 = fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 38),
|
||||
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 38),
|
||||
) # Max Size (Inclusive Limit)
|
||||
msg5 = fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 37),
|
||||
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 37),
|
||||
) # Max Size (Exclusive Limit)
|
||||
msg6 = fakeWakuMessage(
|
||||
contentTopic = contentTopic, payload = getByteSequence(MaxWakuMessageSize)
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(DefaultMaxWakuMessageSize),
|
||||
) # MaxWakuMessageSize -> Out of Max Size
|
||||
|
||||
# Notice that the message is wrapped with more data in https://github.com/status-im/nim-libp2p/blob/3011ba4326fa55220a758838835797ff322619fc/libp2p/protocols/pubsub/gossipsub.nim#L627-L632
|
||||
@ -1092,7 +1093,7 @@ suite "Waku Relay":
|
||||
(pubsubTopic, msg3) == handlerFuture.read()
|
||||
(pubsubTopic, msg3) == otherHandlerFuture.read()
|
||||
|
||||
# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 38' message
|
||||
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 38' message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg4)
|
||||
@ -1104,7 +1105,7 @@ suite "Waku Relay":
|
||||
(pubsubTopic, msg4) == handlerFuture.read()
|
||||
(pubsubTopic, msg4) == otherHandlerFuture.read()
|
||||
|
||||
# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 37' message
|
||||
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 37' message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg5)
|
||||
@ -1115,7 +1116,7 @@ suite "Waku Relay":
|
||||
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg5) == handlerFuture.read()
|
||||
|
||||
# When sending the 'MaxWakuMessageSize' message
|
||||
# When sending the 'DefaultMaxWakuMessageSize' message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg6)
|
||||
|
||||
@ -596,7 +596,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
let response = await client.relayPostMessagesV1(
|
||||
DefaultPubsubTopic,
|
||||
RelayWakuMessage(
|
||||
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
|
||||
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
|
||||
# Message will be bigger than the max size
|
||||
contentTopic: some(DefaultContentTopic),
|
||||
timestamp: some(int64(2022)),
|
||||
@ -608,7 +608,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
response.status == 400
|
||||
$response.contentType == $MIMETYPE_TEXT
|
||||
response.data ==
|
||||
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
|
||||
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
@ -657,7 +657,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
# When
|
||||
let response = await client.relayPostAutoMessagesV1(
|
||||
RelayWakuMessage(
|
||||
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
|
||||
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
|
||||
# Message will be bigger than the max size
|
||||
contentTopic: some(DefaultContentTopic),
|
||||
timestamp: some(int64(2022)),
|
||||
@ -669,7 +669,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
response.status == 400
|
||||
$response.contentType == $MIMETYPE_TEXT
|
||||
response.data ==
|
||||
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
|
||||
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
|
||||
@ -388,7 +388,7 @@ proc mountRelay*(
|
||||
node: WakuNode,
|
||||
pubsubTopics: seq[string] = @[],
|
||||
peerExchangeHandler = none(RoutingRecordsHandler),
|
||||
maxMessageSize = int(MaxWakuMessageSize),
|
||||
maxMessageSize = int(DefaultMaxWakuMessageSize),
|
||||
) {.async, gcsafe.} =
|
||||
if not node.wakuRelay.isNil():
|
||||
error "wakuRelay already mounted, skipping"
|
||||
|
||||
@ -3,6 +3,6 @@ import ../../common/utils/parse_size_units
|
||||
const
|
||||
## https://rfc.vac.dev/spec/64/#message-size
|
||||
DefaultMaxWakuMessageSizeStr* = "150KiB" # Remember that 1 MiB is the PubSub default
|
||||
MaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)
|
||||
DefaultMaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)
|
||||
|
||||
DefaultSafetyBufferProtocolOverhead* = 64 * 1024 # overhead measured in bytes
|
||||
|
||||
@ -41,7 +41,7 @@ proc sendSubscribeRequest(
|
||||
# TODO: this can raise an exception
|
||||
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
||||
|
||||
let respBuf = await connection.readLp(MaxSubscribeResponseSize)
|
||||
let respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize)
|
||||
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
|
||||
if respDecodeRes.isErr():
|
||||
trace "Failed to decode filter subscribe response", servicePeer
|
||||
@ -79,7 +79,7 @@ proc subscribe*(
|
||||
wfc: WakuFilterClient,
|
||||
servicePeer: RemotePeerInfo,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: ContentTopic|seq[ContentTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
var contentTopicSeq: seq[ContentTopic]
|
||||
when contentTopics is seq[ContentTopic]:
|
||||
@ -98,14 +98,14 @@ proc unsubscribe*(
|
||||
wfc: WakuFilterClient,
|
||||
servicePeer: RemotePeerInfo,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: ContentTopic|seq[ContentTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
var contentTopicSeq: seq[ContentTopic]
|
||||
when contentTopics is seq[ContentTopic]:
|
||||
contentTopicSeq = contentTopics
|
||||
else:
|
||||
contentTopicSeq = @[contentTopics]
|
||||
|
||||
|
||||
let requestId = generateRequestId(wfc.rng)
|
||||
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
|
||||
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
|
||||
@ -127,7 +127,7 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =
|
||||
|
||||
proc initProtocolHandler(wfc: WakuFilterClient) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
let buf = await conn.readLp(int(MaxPushSize))
|
||||
let buf = await conn.readLp(int(DefaultMaxPushSize))
|
||||
|
||||
let decodeRes = MessagePush.decode(buf)
|
||||
if decodeRes.isErr():
|
||||
@ -152,4 +152,4 @@ proc new*(
|
||||
): T =
|
||||
let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[])
|
||||
wfc.initProtocolHandler()
|
||||
wfc
|
||||
wfc
|
||||
|
||||
@ -249,7 +249,7 @@ proc initProtocolHandler(wf: WakuFilter) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
trace "filter subscribe request handler triggered", peerId = conn.peerId
|
||||
|
||||
let buf = await conn.readLp(int(MaxSubscribeSize))
|
||||
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))
|
||||
|
||||
let decodeRes = FilterSubscribeRequest.decode(buf)
|
||||
if decodeRes.isErr():
|
||||
|
||||
@ -7,10 +7,10 @@ import std/options
|
||||
import ../common/protobuf, ../waku_core, ./rpc
|
||||
|
||||
const
|
||||
MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64 * 1024
|
||||
DefaultMaxSubscribeSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
|
||||
# We add a 64kB safety buffer for protocol overhead
|
||||
MaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
|
||||
MaxPushSize* = 10 * MaxWakuMessageSize + 64 * 1024
|
||||
DefaultMaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
|
||||
DefaultMaxPushSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
|
||||
# We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer =
|
||||
|
||||
@ -39,7 +39,7 @@ proc sendPushRequest(
|
||||
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await connection.readLp(MaxRpcSize.int)
|
||||
buffer = await connection.readLp(DefaultMaxRpcSize.int)
|
||||
except LPStreamRemoteClosedError:
|
||||
return err("Exception reading: " & getCurrentExceptionMsg())
|
||||
|
||||
|
||||
@ -79,7 +79,7 @@ proc handleRequest*(
|
||||
|
||||
proc initProtocolHandler(wl: WakuLightPush) =
|
||||
proc handle(conn: Connection, proto: string) {.async.} =
|
||||
let buffer = await conn.readLp(MaxRpcSize.int)
|
||||
let buffer = await conn.readLp(DefaultMaxRpcSize)
|
||||
let rpc = await handleRequest(wl, conn.peerId, buffer)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
|
||||
@ -6,8 +6,7 @@ else:
|
||||
import std/options
|
||||
import ../common/protobuf, ../waku_core, ./rpc
|
||||
|
||||
const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024
|
||||
# We add a 64kB safety buffer for protocol overhead
|
||||
const DefaultMaxRpcSize* = -1
|
||||
|
||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
@ -29,7 +29,7 @@ logScope:
|
||||
const
|
||||
# We add a 64kB safety buffer for protocol overhead.
|
||||
# 10x-multiplier also for safety
|
||||
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
|
||||
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
|
||||
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
|
||||
MaxPeersCacheSize = 60
|
||||
CacheRefreshInterval = 15.minutes
|
||||
@ -61,7 +61,7 @@ proc request*(
|
||||
var error: string
|
||||
try:
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
buffer = await conn.readLp(MaxRpcSize.int)
|
||||
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
||||
except CatchableError as exc:
|
||||
waku_px_errors.inc(labelValues = [exc.msg])
|
||||
error = $exc.msg
|
||||
@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await conn.readLp(MaxRpcSize.int)
|
||||
buffer = await conn.readLp(DefaultMaxRpcSize.int)
|
||||
except CatchableError as exc:
|
||||
waku_px_errors.inc(labelValues = [exc.msg])
|
||||
return
|
||||
|
||||
@ -151,7 +151,7 @@ proc initProtocolHandler(w: WakuRelay) =
|
||||
w.codec = WakuRelayCodec
|
||||
|
||||
proc new*(
|
||||
T: type WakuRelay, switch: Switch, maxMessageSize = int(MaxWakuMessageSize)
|
||||
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
||||
): WakuRelayResult[T] =
|
||||
## maxMessageSize: max num bytes that are allowed for the WakuMessage
|
||||
|
||||
|
||||
@ -49,7 +49,9 @@ proc sendHistoryQueryRPC(
|
||||
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
|
||||
await connection.writeLP(reqRpc.encode().buffer)
|
||||
|
||||
let buf = await connection.readLp(MaxRpcSize.int)
|
||||
#TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail.
|
||||
# Need to find a workaround for this.
|
||||
let buf = await connection.readLp(DefaultMaxRpcSize.int)
|
||||
let respDecodeRes = HistoryRPC.decode(buf)
|
||||
if respDecodeRes.isErr():
|
||||
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||
|
||||
@ -46,7 +46,7 @@ type WakuStore* = ref object of LPProtocol
|
||||
|
||||
proc initProtocolHandler(ws: WakuStore) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
let buf = await conn.readLp(MaxRpcSize.int)
|
||||
let buf = await conn.readLp(DefaultMaxRpcSize.int)
|
||||
|
||||
let decodeRes = HistoryRPC.decode(buf)
|
||||
if decodeRes.isErr():
|
||||
|
||||
@ -6,8 +6,7 @@ else:
|
||||
import std/options, nimcrypto/hash
|
||||
import ../common/[protobuf, paging], ../waku_core, ./common, ./rpc
|
||||
|
||||
const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64 * 1024
|
||||
# We add a 64kB safety buffer for protocol overhead
|
||||
const DefaultMaxRpcSize* = -1
|
||||
|
||||
## Pagination
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user