Fix incorrect store read buffer limit (#800)

* Fix store read buffer limit

* Changelog
This commit is contained in:
Hanno Cornelius 2022-01-06 13:42:37 +01:00 committed by GitHub
parent eb5dcadd45
commit 4e6960ad26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 38 additions and 16 deletions

View File

@ -11,6 +11,9 @@ This release contains the following:
which is a sequence of string.
- Metrics: added counters for protocol messages
### Fixes
- Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size.
## 2021-11-05 v0.6
Some useful features and fixes in this release, include:

View File

@ -640,7 +640,8 @@ proc mountRelay*(node: WakuNode,
#msgIdProvider = msgIdProvider,
triggerSelf = triggerSelf,
sign = false,
verifySignature = false
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
)
info "mounting relay", relayMessages=relayMessages

View File

@ -167,7 +167,7 @@ proc encode*(rpc: FilterRPC): ProtoBuffer =
method init*(wf: WakuFilter) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var message = await conn.readLp(MaxRpcSize.int)
var res = FilterRPC.init(message)
if res.isErr:
error "failed to decode rpc"

View File

@ -8,6 +8,12 @@ import
export waku_message
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety: currently we never
# push more than 1 message at a time.
MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024
type
ContentFilter* = object
contentTopic*: ContentTopic

View File

@ -125,7 +125,7 @@ proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
method init*(wlp: WakuLightPush) =
debug "init"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var message = await conn.readLp(MaxRpcSize.int)
var res = PushRPC.init(message)
if res.isErr:
error "failed to decode rpc"

View File

@ -8,6 +8,9 @@ import
export waku_message
const
MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
type
PushRequest* = object
pubSubTopic*: string

View File

@ -13,6 +13,9 @@ import
when defined(rln):
import waku_rln_relay/waku_rln_relay_types
const
MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default
type
ContentTopic* = string

View File

@ -414,7 +414,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(64*1024)
var message = await conn.readLp(MaxRpcSize.int)
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
@ -469,8 +469,8 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity): T =
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity): T =
debug "init"
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages)
output.init(capacity)
@ -526,7 +526,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr:
@ -551,7 +551,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
debug "query is sent", query=query
var message = await connOpt.get().readLp(64*1024)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
debug "response is received"
@ -735,7 +735,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr:

View File

@ -26,12 +26,15 @@ export
waku_message,
pagination
# Constants required for pagination -------------------------------------------
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
# TODO the DefaultPageSize can be changed, it's current value is random
const DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
const
# Constants required for pagination -------------------------------------------
MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
const DefaultTopic* = "/waku/2/default-waku/proto"
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
DefaultTopic* = "/waku/2/default-waku/proto"
type

View File

@ -216,7 +216,7 @@ proc init*(wakuSwap: WakuSwap) =
info "wakuSwap init 1"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
info "swap handle incoming connection"
var message = await conn.readLp(64*1024)
var message = await conn.readLp(MaxChequeSize.int)
# XXX This can be handshake, etc
var res = Cheque.init(message)
if res.isErr:

View File

@ -4,7 +4,10 @@ import
std/tables,
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager
../../node/peer_manager/peer_manager
const
MaxChequeSize* = 64*1024 # Used for read buffers. 64kB should be more than enough for swap cheque
type
# The Swap Mode determines the functionality available in the swap protocol.