diff --git a/CHANGELOG.md b/CHANGELOG.md index 399373dc4..8bbabd01e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ This release contains the following: which is a sequence of string. - Metrics: added counters for protocol messages +### Fixes +- Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size. + ## 2021-11-05 v0.6 Some useful features and fixes in this release, include: diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 67817cde0..d8457031c 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -640,7 +640,8 @@ proc mountRelay*(node: WakuNode, #msgIdProvider = msgIdProvider, triggerSelf = triggerSelf, sign = false, - verifySignature = false + verifySignature = false, + maxMessageSize = MaxWakuMessageSize ) info "mounting relay", relayMessages=relayMessages diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 4ce13a982..c266cc39b 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -167,7 +167,7 @@ proc encode*(rpc: FilterRPC): ProtoBuffer = method init*(wf: WakuFilter) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var message = await conn.readLp(64*1024) + var message = await conn.readLp(MaxRpcSize.int) var res = FilterRPC.init(message) if res.isErr: error "failed to decode rpc" diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index e19bdb57e..e63c165dd 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -8,6 +8,12 @@ import export waku_message +const + # We add a 64kB safety buffer for protocol overhead. + # 10x-multiplier also for safety: currently we never + # push more than 1 message at a time. + MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024 + type ContentFilter* = object contentTopic*: ContentTopic diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim index 91ac4ef7d..cef1d91a8 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim @@ -125,7 +125,7 @@ proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = method init*(wlp: WakuLightPush) = debug "init" proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var message = await conn.readLp(64*1024) + var message = await conn.readLp(MaxRpcSize.int) var res = PushRPC.init(message) if res.isErr: error "failed to decode rpc" diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim index 1862b7fb6..2ae01f5cb 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim @@ -8,6 +8,9 @@ import export waku_message +const + MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + type PushRequest* = object pubSubTopic*: string diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index 2e12bc949..c98f10564 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -13,6 +13,9 @@ import when defined(rln): import waku_rln_relay/waku_rln_relay_types +const + MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default + type ContentTopic* = string diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 94af0ac13..4c65e1bdc 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -414,7 +414,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc handler(conn: Connection, proto: string) {.async.} = - var message = await conn.readLp(64*1024) + var message = await conn.readLp(MaxRpcSize.int) var res = HistoryRPC.init(message) if res.isErr: error "failed to decode rpc" @@ -469,8 +469,8 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, - store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, - capacity = DefaultStoreCapacity): T = + store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, + capacity = DefaultStoreCapacity): T = debug "init" var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages) output.init(capacity) @@ -526,7 +526,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) - var message = await connOpt.get().readLp(64*1024) + var message = await connOpt.get().readLp(MaxRpcSize.int) let response = HistoryRPC.init(message) if response.isErr: @@ -551,7 +551,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) debug "query is sent", query=query - var message = await connOpt.get().readLp(64*1024) + var message = await connOpt.get().readLp(MaxRpcSize.int) let response = HistoryRPC.init(message) debug "response is received" @@ -735,7 +735,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng), query: query).encode().buffer) - var message = await connOpt.get().readLp(64*1024) + var message = await connOpt.get().readLp(MaxRpcSize.int) let response = HistoryRPC.init(message) if response.isErr: diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index f204c60be..8184d1a1e 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -26,12 +26,15 @@ export waku_message, pagination -# Constants required for pagination ------------------------------------------- -const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page -# TODO the DefaultPageSize can be changed, it's current value is random -const DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page +const + # Constants required for pagination ------------------------------------------- + MaxPageSize* = uint64(100) # Maximum number of waku messages in each page + # TODO the DefaultPageSize can be changed, it's current value is random + DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page -const DefaultTopic* = "/waku/2/default-waku/proto" + MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + + DefaultTopic* = "/waku/2/default-waku/proto" type diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index a1fe392a9..54bf7780d 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -216,7 +216,7 @@ proc init*(wakuSwap: WakuSwap) = info "wakuSwap init 1" proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = info "swap handle incoming connection" - var message = await conn.readLp(64*1024) + var message = await conn.readLp(MaxChequeSize.int) # XXX This can be handshake, etc var res = Cheque.init(message) if res.isErr: diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 8ebdf9825..4325f62f8 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -4,7 +4,10 @@ import std/tables, bearssl, libp2p/protocols/protocol, - ../../node/peer_manager/peer_manager + ../../node/peer_manager/peer_manager + +const + MaxChequeSize* = 64*1024 # Used for read buffers. 64kB should be more than enough for swap cheque type # The Swap Mode determines the functionality available in the swap protocol.