chore: rename protocols rpc_codec procs from init to decode

This commit is contained in:
Lorenzo Delgado 2022-11-07 16:24:16 +01:00 committed by GitHub
parent 2c2ce20c4e
commit bcc6c32287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 251 additions and 250 deletions

View File

@ -68,7 +68,7 @@ type Chat = ref object
type type
PrivateKey* = crypto.PrivateKey PrivateKey* = crypto.PrivateKey
Topic* = waku_node.PubsubTopic Topic* = waku_message.PubsubTopic
##################### #####################
## chat2 protobufs ## ## chat2 protobufs ##
@ -514,7 +514,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
trace "Hit subscribe handler", topic trace "Hit subscribe handler", topic
let decoded = WakuMessage.init(data) let decoded = WakuMessage.decode(data)
if decoded.isOk(): if decoded.isOk():
if decoded.get().contentTopic == chat.contentTopic: if decoded.get().contentTopic == chat.contentTopic:

View File

@ -196,7 +196,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
# Bridging # Bridging
# Handle messages on Waku v2 and bridge to Matterbridge # Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
trace "Bridging message from Chat2 to Matterbridge", msg=msg[] trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
cmb.toMatterbridge(msg[]) cmb.toMatterbridge(msg[])

View File

@ -54,7 +54,7 @@ type
WakuBridge* = ref object of RootObj WakuBridge* = ref object of RootObj
nodev1*: EthereumNode nodev1*: EthereumNode
nodev2*: WakuNode nodev2*: WakuNode
nodev2PubsubTopic: waku_node.PubsubTopic # Pubsub topic to bridge to/from nodev2PubsubTopic: waku_message.PubsubTopic # Pubsub topic to bridge to/from
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
v1Pool: seq[Node] # Pool of v1 nodes for possible connections v1Pool: seq[Node] # Pool of v1 nodes for possible connections
@ -231,7 +231,7 @@ proc new*(T: type WakuBridge,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
nameResolver: NameResolver = nil, nameResolver: NameResolver = nil,
# Bridge configuration # Bridge configuration
nodev2PubsubTopic: waku_node.PubsubTopic, nodev2PubsubTopic: waku_message.PubsubTopic,
v1Pool: seq[Node] = @[], v1Pool: seq[Node] = @[],
targetV1Peers = 0): T targetV1Peers = 0): T
{.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} = {.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} =
@ -303,7 +303,7 @@ proc start*(bridge: WakuBridge) {.async.} =
# Handle messages on Waku v2 and bridge to Waku v1 # Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk() and msg.get().isBridgeable(): if msg.isOk() and msg.get().isBridgeable():
try: try:
trace "Bridging message from V2 to V1", msg=msg.tryGet() trace "Bridging message from V2 to V1", msg=msg.tryGet()

View File

@ -32,7 +32,7 @@ proc runBackground() {.async.} =
# Subscribe to a topic # Subscribe to a topic
let topic = PubsubTopic("foobar") let topic = PubsubTopic("foobar")
proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value let message = WakuMessage.decode(data).value
let payload = cast[string](message.payload) let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
node.subscribe(topic, handler) node.subscribe(topic, handler)

View File

@ -70,7 +70,7 @@ proc setupAndSubscribe() {.async.} =
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value let message = WakuMessage.decode(data).value
let payloadStr = string.fromBytes(message.payload) let payloadStr = string.fromBytes(message.payload)
if message.contentTopic == contentTopic: if message.contentTopic == contentTopic:
notice "message received", payload=payloadStr, notice "message received", payload=payloadStr,

View File

@ -128,7 +128,7 @@ procSuite "WakuBridge":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk() and msg.value().version == 1: if msg.isOk() and msg.value().version == 1:
check: check:

View File

@ -101,7 +101,7 @@ procSuite "Waku Discovery v5":
# Let's see if we can deliver a message end-to-end # Let's see if we can deliver a message end-to-end
# var completionFut = newFuture[bool]() # var completionFut = newFuture[bool]()
# proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# let msg = WakuMessage.init(data) # let msg = WakuMessage.decode(data)
# if msg.isOk(): # if msg.isOk():
# let val = msg.value() # let val = msg.value()
# check: # check:

View File

@ -172,7 +172,7 @@ procSuite "Waku Noise":
let pb = msg.get().encode() let pb = msg.get().encode()
# We decode the WakuMessage from the ProtoBuffer # We decode the WakuMessage from the ProtoBuffer
let msgFromPb = WakuMessage.init(pb.buffer) let msgFromPb = WakuMessage.decode(pb.buffer)
check: check:
msgFromPb.isOk() msgFromPb.isOk()

View File

@ -128,7 +128,7 @@ procSuite "Waku Noise Sessions":
pb = wakuMsg.get().encode() pb = wakuMsg.get().encode()
# We decode the WakuMessage from the ProtoBuffer # We decode the WakuMessage from the ProtoBuffer
msgFromPb = WakuMessage.init(pb.buffer) msgFromPb = WakuMessage.decode(pb.buffer)
check: check:
msgFromPb.isOk() msgFromPb.isOk()
@ -185,7 +185,7 @@ procSuite "Waku Noise Sessions":
pb = wakuMsg.get().encode() pb = wakuMsg.get().encode()
# We decode the WakuMessage from the ProtoBuffer # We decode the WakuMessage from the ProtoBuffer
msgFromPb = WakuMessage.init(pb.buffer) msgFromPb = WakuMessage.decode(pb.buffer)
check: check:
msgFromPb.isOk() msgFromPb.isOk()
@ -238,7 +238,7 @@ procSuite "Waku Noise Sessions":
pb = wakuMsg.get().encode() pb = wakuMsg.get().encode()
# We decode the WakuMessage from the ProtoBuffer # We decode the WakuMessage from the ProtoBuffer
msgFromPb = WakuMessage.init(pb.buffer) msgFromPb = WakuMessage.decode(pb.buffer)
check: check:
msgFromPb.isOk() msgFromPb.isOk()

View File

@ -21,7 +21,7 @@ procSuite "Waku Payload":
pb = msg.encode() pb = msg.encode()
# Decoding # Decoding
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
check msgDecoded.isOk() check msgDecoded.isOk()
let let
@ -47,7 +47,7 @@ procSuite "Waku Payload":
pb = msg.encode() pb = msg.encode()
# Decoding # Decoding
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
check msgDecoded.isOk() check msgDecoded.isOk()
let let
@ -73,7 +73,7 @@ procSuite "Waku Payload":
pb = msg.encode() pb = msg.encode()
# Decoding # Decoding
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
check msgDecoded.isOk() check msgDecoded.isOk()
let let
@ -101,7 +101,7 @@ procSuite "Waku Payload":
pb = msg.encode() pb = msg.encode()
# Decoding # Decoding
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
check msgDecoded.isOk() check msgDecoded.isOk()
let let
@ -123,7 +123,7 @@ procSuite "Waku Payload":
## When ## When
let pb = msg.encode() let pb = msg.encode()
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
## Then ## Then
check: check:
@ -144,7 +144,7 @@ procSuite "Waku Payload":
## When ## When
let pb = msg.encode() let pb = msg.encode()
let msgDecoded = WakuMessage.init(pb.buffer) let msgDecoded = WakuMessage.decode(pb.buffer)
## Then ## Then
check: check:

View File

@ -45,7 +45,7 @@ procSuite "Waku Peer Exchange":
## When ## When
let let
rpcBuffer: seq[byte] = rpc.encode().buffer rpcBuffer: seq[byte] = rpc.encode().buffer
res = PeerExchangeRpc.init(rpcBuffer) res = PeerExchangeRpc.decode(rpcBuffer)
## Then ## Then
check: check:

View File

@ -18,7 +18,7 @@ procSuite "Waku Store - RPC codec":
## When ## When
let encodedIndex = index.encode() let encodedIndex = index.encode()
let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer)
## Then ## Then
check: check:
@ -34,7 +34,7 @@ procSuite "Waku Store - RPC codec":
let emptyIndex = PagingIndex() let emptyIndex = PagingIndex()
let encodedIndex = emptyIndex.encode() let encodedIndex = emptyIndex.encode()
let decodedIndexRes = PagingIndex.init(encodedIndex.buffer) let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer)
## Then ## Then
check: check:
@ -53,7 +53,7 @@ procSuite "Waku Store - RPC codec":
## When ## When
let pb = pagingInfo.encode() let pb = pagingInfo.encode()
let decodedPagingInfo = PagingInfo.init(pb.buffer) let decodedPagingInfo = PagingInfo.decode(pb.buffer)
## Then ## Then
check: check:
@ -69,8 +69,8 @@ procSuite "Waku Store - RPC codec":
let emptyPagingInfo = PagingInfo() let emptyPagingInfo = PagingInfo()
## When ## When
let epb = emptyPagingInfo.encode() let pb = emptyPagingInfo.encode()
let decodedEmptyPagingInfo = PagingInfo.init(epb.buffer) let decodedEmptyPagingInfo = PagingInfo.decode(pb.buffer)
## Then ## Then
check: check:
@ -89,7 +89,7 @@ procSuite "Waku Store - RPC codec":
## When ## When
let pb = query.encode() let pb = query.encode()
let decodedQuery = HistoryQuery.init(pb.buffer) let decodedQuery = HistoryQuery.decode(pb.buffer)
## Then ## Then
check: check:
@ -104,8 +104,8 @@ procSuite "Waku Store - RPC codec":
let emptyQuery = HistoryQuery() let emptyQuery = HistoryQuery()
## When ## When
let epb = emptyQuery.encode() let pb = emptyQuery.encode()
let decodedEmptyQuery = HistoryQuery.init(epb.buffer) let decodedEmptyQuery = HistoryQuery.decode(pb.buffer)
## Then ## Then
check: check:
@ -125,7 +125,7 @@ procSuite "Waku Store - RPC codec":
## When ## When
let pb = res.encode() let pb = res.encode()
let decodedRes = HistoryResponse.init(pb.buffer) let decodedRes = HistoryResponse.decode(pb.buffer)
## Then ## Then
check: check:
@ -140,8 +140,8 @@ procSuite "Waku Store - RPC codec":
let emptyRes = HistoryResponse() let emptyRes = HistoryResponse()
## When ## When
let epb = emptyRes.encode() let pb = emptyRes.encode()
let decodedEmptyRes = HistoryResponse.init(epb.buffer) let decodedEmptyRes = HistoryResponse.decode(pb.buffer)
## Then ## Then
check: check:

View File

@ -58,7 +58,7 @@ procSuite "WakuNode":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:

View File

@ -44,7 +44,7 @@ procSuite "WakuNode - Lightpush":
var completionFutRelay = newFuture[bool]() var completionFutRelay = newFuture[bool]()
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data).get() let msg = WakuMessage.decode(data).get()
check: check:
pubsubTopic == DefaultPubsubTopic pubsubTopic == DefaultPubsubTopic
msg == message msg == message

View File

@ -89,7 +89,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -158,7 +158,7 @@ procSuite "WakuNode - Relay":
## the validator that only allows messages with contentTopic1 to be relayed ## the validator that only allows messages with contentTopic1 to be relayed
check: check:
topic == pubSubTopic topic == pubSubTopic
let msg = WakuMessage.init(message.data) let msg = WakuMessage.decode(message.data)
if msg.isOk(): if msg.isOk():
# only relay messages with contentTopic1 # only relay messages with contentTopic1
if msg.value().contentTopic == contentTopic1: if msg.value().contentTopic == contentTopic1:
@ -175,7 +175,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "relayed pubsub topic:", topic debug "relayed pubsub topic:", topic
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -228,7 +228,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -272,7 +272,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -320,7 +320,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -363,7 +363,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:
@ -406,7 +406,7 @@ procSuite "WakuNode - Relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let val = msg.value() let val = msg.value()
check: check:

View File

@ -107,7 +107,7 @@ procSuite "WakuNode - RLN relay":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
debug "The received topic:", topic debug "The received topic:", topic
if topic == rlnRelayPubSubTopic: if topic == rlnRelayPubSubTopic:
@ -212,7 +212,7 @@ procSuite "WakuNode - RLN relay":
# define a custom relay handler # define a custom relay handler
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
debug "The received topic:", topic debug "The received topic:", topic
if topic == rlnRelayPubSubTopic: if topic == rlnRelayPubSubTopic:
@ -361,7 +361,7 @@ procSuite "WakuNode - RLN relay":
var completionFut3 = newFuture[bool]() var completionFut3 = newFuture[bool]()
var completionFut4 = newFuture[bool]() var completionFut4 = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
let wm = msg.value() let wm = msg.value()
debug "The received topic:", topic debug "The received topic:", topic

View File

@ -25,7 +25,7 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} = proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} =
trace "Topic handler triggered", topic=topic trace "Topic handler triggered", topic=topic
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isOk(): if msg.isOk():
# Add message to current cache # Add message to current cache
trace "WakuMessage received", msg=msg, topic=topic trace "WakuMessage received", msg=msg, topic=topic

View File

@ -37,7 +37,7 @@ proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
trace "Topic handler triggered", topic=topic trace "Topic handler triggered", topic=topic
# Add message to current cache # Add message to current cache
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isErr(): if msg.isErr():
debug "WakuMessage received but failed to decode", msg=msg, topic=topic debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# TODO: handle message decode failure # TODO: handle message decode failure

View File

@ -62,9 +62,6 @@ const git_version* {.strdefine.} = "n/a"
# Default clientId # Default clientId
const clientId* = "Nimbus Waku v2 node" const clientId* = "Nimbus Waku v2 node"
# TODO: Unify pubusub topic type and default value
type PubsubTopic* = string
const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto" const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto"
# Default Waku Filter Timeout # Default Waku Filter Timeout
@ -270,7 +267,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
# A default handler should be registered for all topics # A default handler should be registered for all topics
trace "Hit default handler", topic=topic, data=data trace "Hit default handler", topic=topic, data=data
let msg = WakuMessage.init(data) let msg = WakuMessage.decode(data)
if msg.isErr(): if msg.isErr():
# TODO: Add metric to track waku message decode errors # TODO: Add metric to track waku message decode errors
return return

View File

@ -90,7 +90,7 @@ proc initProtocolHandler(wf: WakuFilterClient) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int) let buffer = await conn.readLp(MaxRpcSize.int)
let decodeReqRes = FilterRPC.init(buffer) let decodeReqRes = FilterRPC.decode(buffer)
if decodeReqRes.isErr(): if decodeReqRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure]) waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return return

View File

@ -90,7 +90,7 @@ proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int) let buffer = await conn.readLp(MaxRpcSize.int)
let decodeRpcRes = FilterRPC.init(buffer) let decodeRpcRes = FilterRPC.decode(buffer)
if decodeRpcRes.isErr(): if decodeRpcRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure]) waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return return

View File

@ -7,8 +7,8 @@ import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/varint libp2p/varint
import import
../waku_message,
../../utils/protobuf, ../../utils/protobuf,
../waku_message,
./rpc ./rpc
@ -18,36 +18,37 @@ const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
proc encode*(filter: ContentFilter): ProtoBuffer = proc encode*(filter: ContentFilter): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output pb.write3(1, filter.contentTopic)
pb.finish3()
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic) discard ?pb.getField(1, contentTopic)
return ok(ContentFilter(contentTopic: contentTopic)) ok(ContentFilter(contentTopic: contentTopic))
proc encode*(rpc: FilterRequest): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, uint64(rpc.subscribe))
output.write3(2, rpc.pubSubTopic) pb.write3(1, uint64(rpc.subscribe))
pb.write3(2, rpc.pubSubTopic)
for filter in rpc.contentFilters: for filter in rpc.contentFilters:
output.write3(3, filter.encode()) pb.write3(3, filter.encode())
output.finish3() pb.finish3()
return output pb
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
var subflag: uint64 var subflag: uint64
@ -61,45 +62,46 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs) discard ?pb.getRepeatedField(3, buffs)
for buf in buffs: for buf in buffs:
rpc.contentFilters.add(?ContentFilter.init(buf)) rpc.contentFilters.add(?ContentFilter.decode(buf))
return ok(rpc) ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer = proc encode*(push: MessagePush): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
for push in push.messages: for push in push.messages:
output.write3(1, push.encode()) pb.write3(1, push.encode())
output.finish3()
return output pb.finish3()
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var push = MessagePush() var push = MessagePush()
var messages: seq[seq[byte]] var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(1, messages) discard ?pb.getRepeatedField(1, messages)
for buf in messages: for buf in messages:
push.messages.add(?WakuMessage.init(buf)) push.messages.add(?WakuMessage.decode(buf))
return ok(push) ok(push)
proc encode*(rpc: FilterRPC): ProtoBuffer = proc encode*(rpc: FilterRPC): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode())
output.write3(3, rpc.push.encode())
output.finish3()
return output pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.encode())
pb.write3(3, rpc.push.encode())
pb.finish3()
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = FilterRPC() var rpc = FilterRPC()
var requestId: string var requestId: string
@ -108,10 +110,10 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var requestBuffer: seq[byte] var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer) discard ?pb.getField(2, requestBuffer)
rpc.request = ?FilterRequest.init(requestBuffer) rpc.request = ?FilterRequest.decode(requestBuffer)
var pushBuffer: seq[byte] var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer) discard ?pb.getField(3, pushBuffer)
rpc.push = ?MessagePush.init(pushBuffer) rpc.push = ?MessagePush.decode(pushBuffer)
return ok(rpc) ok(rpc)

View File

@ -46,7 +46,7 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
await connection.writeLP(rpc.encode().buffer) await connection.writeLP(rpc.encode().buffer)
var buffer = await connection.readLp(MaxRpcSize.int) var buffer = await connection.readLp(MaxRpcSize.int)
let decodeRespRes = PushRPC.init(buffer) let decodeRespRes = PushRPC.decode(buffer)
if decodeRespRes.isErr(): if decodeRespRes.isErr():
error "failed to decode response" error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])

View File

@ -38,7 +38,7 @@ type
proc initProtocolHandler*(wl: WakuLightPush) = proc initProtocolHandler*(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int) let buffer = await conn.readLp(MaxRpcSize.int)
let reqDecodeRes = PushRPC.init(buffer) let reqDecodeRes = PushRPC.decode(buffer)
if reqDecodeRes.isErr(): if reqDecodeRes.isErr():
error "failed to decode rpc" error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])

View File

@ -7,8 +7,8 @@ else:
import import
libp2p/protobuf/minprotobuf libp2p/protobuf/minprotobuf
import import
../waku_message,
../../utils/protobuf, ../../utils/protobuf,
../waku_message,
./rpc ./rpc
@ -16,16 +16,16 @@ const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer
proc encode*(rpc: PushRequest): ProtoBuffer = proc encode*(rpc: PushRequest): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.pubSubTopic)
output.write3(2, rpc.message.encode())
output.finish3()
return output pb.write3(1, rpc.pubSubTopic)
pb.write3(2, rpc.message.encode())
pb.finish3()
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PushRequest() var rpc = PushRequest()
var pubSubTopic: string var pubSubTopic: string
@ -34,22 +34,22 @@ proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
var buf: seq[byte] var buf: seq[byte]
discard ?pb.getField(2, buf) discard ?pb.getField(2, buf)
rpc.message = ?WakuMessage.init(buf) rpc.message = ?WakuMessage.decode(buf)
return ok(rpc) ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer = proc encode*(rpc: PushResponse): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, uint64(rpc.isSuccess))
output.write3(2, rpc.info)
output.finish3()
return output pb.write3(1, uint64(rpc.isSuccess))
pb.write3(2, rpc.info)
pb.finish3()
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PushResponse(isSuccess: false, info: "") var rpc = PushResponse(isSuccess: false, info: "")
var isSuccess: uint64 var isSuccess: uint64
@ -60,21 +60,21 @@ proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
discard ?pb.getField(2, info) discard ?pb.getField(2, info)
rpc.info = info rpc.info = info
return ok(rpc) ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer = proc encode*(rpc: PushRPC): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode()) pb.write3(1, rpc.requestId)
output.write3(3, rpc.response.encode()) pb.write3(2, rpc.request.encode())
output.finish3() pb.write3(3, rpc.response.encode())
pb.finish3()
return output pb
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PushRPC() var rpc = PushRPC()
var requestId: string var requestId: string
@ -83,10 +83,10 @@ proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
var requestBuffer: seq[byte] var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer) discard ?pb.getField(2, requestBuffer)
rpc.request = ?PushRequest.init(requestBuffer) rpc.request = ?PushRequest.decode(requestBuffer)
var pushBuffer: seq[byte] var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer) discard ?pb.getField(3, pushBuffer)
rpc.response = ?PushResponse.init(pushBuffer) rpc.response = ?PushResponse.decode(pushBuffer)
return ok(rpc) ok(rpc)

View File

@ -5,26 +5,29 @@
## ##
## For payload content and encryption, see waku/v2/node/waku_payload.nim ## For payload content and encryption, see waku/v2/node/waku_payload.nim
when (NimMajor, NimMinor) < (1, 4): when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].} {.push raises: [Defect].}
else: else:
{.push raises: [].} {.push raises: [].}
import import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/varint, libp2p/varint
import
../utils/protobuf, ../utils/protobuf,
../utils/time, ../utils/time,
waku_rln_relay/waku_rln_relay_types ./waku_rln_relay/waku_rln_relay_types
const
MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default
type type
PubsubTopic* = string
ContentTopic* = string ContentTopic* = string
WakuMessage* = object
type WakuMessage* = object
payload*: seq[byte] payload*: seq[byte]
contentTopic*: ContentTopic contentTopic*: ContentTopic
version*: uint32 version*: uint32
@ -38,45 +41,42 @@ type
# be stored. bools and uints are # be stored. bools and uints are
# equivalent in serialization of the protobuf # equivalent in serialization of the protobuf
ephemeral*: bool ephemeral*: bool
# Encoding and decoding -------------------------------------------------------
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = ## Encoding and decoding
proc encode*(message: WakuMessage): ProtoBuffer =
var buf = initProtoBuffer()
buf.write3(1, message.payload)
buf.write3(2, message.contentTopic)
buf.write3(3, message.version)
buf.write3(10, zint64(message.timestamp))
buf.write3(21, message.proof.encode())
buf.write3(31, uint64(message.ephemeral))
buf.finish3()
buf
proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
var msg = WakuMessage(ephemeral: false) var msg = WakuMessage(ephemeral: false)
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, msg.payload) discard ?pb.getField(1, msg.payload)
discard ? pb.getField(2, msg.contentTopic) discard ?pb.getField(2, msg.contentTopic)
discard ? pb.getField(3, msg.version) discard ?pb.getField(3, msg.version)
var timestamp: zint64 var timestamp: zint64
discard ? pb.getField(10, timestamp) discard ?pb.getField(10, timestamp)
msg.timestamp = Timestamp(timestamp) msg.timestamp = Timestamp(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec
var proofBytes: seq[byte] var proofBytes: seq[byte]
discard ? pb.getField(21, proofBytes) discard ?pb.getField(21, proofBytes)
msg.proof = ? RateLimitProof.init(proofBytes) msg.proof = ?RateLimitProof.init(proofBytes)
# Behaviour of ephemeral with storeTTL to be defined,
# If a message is marked ephemeral, it should not have a storeTTL.
# If a message is not marked ephemeral, it should have a storeTTL.
# How would we handle messages that should be stored permanently?
var ephemeral: uint var ephemeral: uint
if ? pb.getField(31, ephemeral): if ?pb.getField(31, ephemeral):
msg.ephemeral = bool(ephemeral) msg.ephemeral = bool(ephemeral)
ok(msg) ok(msg)
proc encode*(message: WakuMessage): ProtoBuffer =
result = initProtoBuffer()
result.write3(1, message.payload)
result.write3(2, message.contentTopic)
result.write3(3, message.version)
result.write3(10, zint64(message.timestamp))
result.write3(21, message.proof.encode())
result.write3(31, uint64(message.ephemeral))
result.finish3()

View File

@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buff = await conn.readLp(MaxRpcSize.int) let buff = await conn.readLp(MaxRpcSize.int)
let res = PeerExchangeRpc.init(buff) let res = PeerExchangeRpc.decode(buff)
if res.isErr(): if res.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure]) waku_px_errors.inc(labelValues = [decodeRpcFailure])
return return

View File

@ -3,6 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/varint libp2p/varint
@ -10,85 +11,86 @@ import
../../utils/protobuf, ../../utils/protobuf,
./rpc ./rpc
proc encode*(rpc: PeerExchangeRequest): ProtoBuffer = proc encode*(rpc: PeerExchangeRequest): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.numPeers) pb.write3(1, rpc.numPeers)
output.finish3() pb.finish3()
return output pb
proc init*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PeerExchangeRequest(numPeers: 0) var rpc = PeerExchangeRequest(numPeers: 0)
var numPeers: uint64 var numPeers: uint64
if ?pb.getField(1, numPeers): if ?pb.getField(1, numPeers):
rpc.numPeers = numPeers rpc.numPeers = numPeers
return ok(rpc) ok(rpc)
proc encode*(rpc: PeerExchangePeerInfo): ProtoBuffer = proc encode*(rpc: PeerExchangePeerInfo): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.enr) pb.write3(1, rpc.enr)
output.finish3() pb.finish3()
return output pb
proc init*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PeerExchangePeerInfo(enr: @[]) var rpc = PeerExchangePeerInfo(enr: @[])
var peerInfoBuffer: seq[byte] var peerInfoBuffer: seq[byte]
if ?pb.getField(1, peerInfoBuffer): if ?pb.getField(1, peerInfoBuffer):
rpc.enr = peerInfoBuffer rpc.enr = peerInfoBuffer
ok(rpc)
return ok(rpc)
proc encode*(rpc: PeerExchangeResponse): ProtoBuffer = proc encode*(rpc: PeerExchangeResponse): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
for pi in rpc.peerInfos: for pi in rpc.peerInfos:
output.write3(1, pi.encode()) pb.write3(1, pi.encode())
output.finish3()
return output pb.finish3()
proc init*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PeerExchangeResponse(peerInfos: @[]) var rpc = PeerExchangeResponse(peerInfos: @[])
var peerInfoBuffers: seq[seq[byte]] var peerInfoBuffers: seq[seq[byte]]
if ?pb.getRepeatedField(1, peerInfoBuffers): if ?pb.getRepeatedField(1, peerInfoBuffers):
for pib in peerInfoBuffers: for pib in peerInfoBuffers:
rpc.peerInfos.add(?PeerExchangePeerInfo.init(pib)) rpc.peerInfos.add(?PeerExchangePeerInfo.decode(pib))
ok(rpc)
return ok(rpc)
proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = proc encode*(rpc: PeerExchangeRpc): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.request.encode())
output.write3(2, rpc.response.encode())
output.finish3()
return output pb.write3(1, rpc.request.encode())
pb.write3(2, rpc.response.encode())
pb.finish3()
proc init*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = pb
proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var rpc = PeerExchangeRpc() var rpc = PeerExchangeRpc()
var requestBuffer: seq[byte] var requestBuffer: seq[byte]
discard ?pb.getField(1, requestBuffer) discard ?pb.getField(1, requestBuffer)
rpc.request = ?PeerExchangeRequest.init(requestBuffer) rpc.request = ?PeerExchangeRequest.decode(requestBuffer)
var responseBuffer: seq[byte] var responseBuffer: seq[byte]
discard ?pb.getField(2, responseBuffer) discard ?pb.getField(2, responseBuffer)
rpc.response = ?PeerExchangeResponse.init(responseBuffer) rpc.response = ?PeerExchangeResponse.decode(responseBuffer)
return ok(rpc)
ok(rpc)

View File

@ -1062,7 +1062,7 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: Co
## the message validation logic is according to https://rfc.vac.dev/spec/17/ ## the message validation logic is according to https://rfc.vac.dev/spec/17/
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called" trace "rln-relay topic validator is called"
let msg = WakuMessage.init(message.data) let msg = WakuMessage.decode(message.data)
if msg.isOk(): if msg.isOk():
let let
wakumessage = msg.value() wakumessage = msg.value()

View File

@ -52,7 +52,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
await connection.writeLP(rpc.encode().buffer) await connection.writeLP(rpc.encode().buffer)
var message = await connection.readLp(MaxRpcSize.int) var message = await connection.readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message) let response = HistoryRPC.decode(message)
if response.isErr(): if response.isErr():
error "failed to decode response" error "failed to decode response"

View File

@ -171,7 +171,7 @@ proc initProtocolHandler*(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int) let buf = await conn.readLp(MaxRpcSize.int)
let resReq = HistoryRPC.init(buf) let resReq = HistoryRPC.decode(buf)
if resReq.isErr(): if resReq.isErr():
error "failed to decode rpc", peerId=conn.peerId error "failed to decode rpc", peerId=conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure]) waku_store_errors.inc(labelValues = [decodeRpcFailure])

View File

@ -21,17 +21,17 @@ const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB s
proc encode*(index: PagingIndex): ProtoBuffer = proc encode*(index: PagingIndex): ProtoBuffer =
## Encode an Index object into a ProtoBuffer ## Encode an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer
var pb = initProtoBuffer()
var output = initProtoBuffer() pb.write3(1, index.digest.data)
output.write3(1, index.digest.data) pb.write3(2, zint64(index.receiverTime))
output.write3(2, zint64(index.receiverTime)) pb.write3(3, zint64(index.senderTime))
output.write3(3, zint64(index.senderTime)) pb.write3(4, index.pubsubTopic)
output.write3(4, index.pubsubTopic) pb.finish3()
output.finish3()
return output pb
proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer ## creates and returns an Index object out of buffer
var index = PagingIndex() var index = PagingIndex()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
@ -57,22 +57,22 @@ proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] =
# read the pubsubTopic # read the pubsubTopic
discard ?pb.getField(4, index.pubsubTopic) discard ?pb.getField(4, index.pubsubTopic)
return ok(index) ok(index)
proc encode*(pinfo: PagingInfo): ProtoBuffer = proc encode*(pinfo: PagingInfo): ProtoBuffer =
## Encodes a PagingInfo object into a ProtoBuffer ## Encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer
var pb = initProtoBuffer()
var output = initProtoBuffer() pb.write3(1, pinfo.pageSize)
output.write3(1, pinfo.pageSize) pb.write3(2, pinfo.cursor.encode())
output.write3(2, pinfo.cursor.encode()) pb.write3(3, uint32(ord(pinfo.direction)))
output.write3(3, uint32(ord(pinfo.direction))) pb.finish3()
output.finish3()
return output pb
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
## creates and returns a PagingInfo object out of buffer ## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfo() var pagingInfo = PagingInfo()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
@ -83,22 +83,24 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
var cursorBuffer: seq[byte] var cursorBuffer: seq[byte]
discard ?pb.getField(2, cursorBuffer) discard ?pb.getField(2, cursorBuffer)
pagingInfo.cursor = ?PagingIndex.init(cursorBuffer) pagingInfo.cursor = ?PagingIndex.decode(cursorBuffer)
var direction: uint32 var direction: uint32
discard ?pb.getField(3, direction) discard ?pb.getField(3, direction)
pagingInfo.direction = PagingDirection(direction) pagingInfo.direction = PagingDirection(direction)
return ok(pagingInfo) ok(pagingInfo)
proc encode*(filter: HistoryContentFilter): ProtoBuffer = proc encode*(filter: HistoryContentFilter): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output
proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = pb.write3(1, filter.contentTopic)
pb.finish3()
pb
proc decode*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic) discard ?pb.getField(1, contentTopic)
@ -107,20 +109,20 @@ proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] =
proc encode*(query: HistoryQuery): ProtoBuffer = proc encode*(query: HistoryQuery): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(2, query.pubsubTopic) pb.write3(2, query.pubsubTopic)
for filter in query.contentFilters: for filter in query.contentFilters:
output.write3(3, filter.encode()) pb.write3(3, filter.encode())
output.write3(4, query.pagingInfo.encode()) pb.write3(4, query.pagingInfo.encode())
output.write3(5, zint64(query.startTime)) pb.write3(5, zint64(query.startTime))
output.write3(6, zint64(query.endTime)) pb.write3(6, zint64(query.endTime))
output.finish3() pb.finish3()
return output pb
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQuery()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
@ -129,13 +131,13 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs) discard ?pb.getRepeatedField(3, buffs)
for buf in buffs: for pb in buffs:
msg.contentFilters.add(? HistoryContentFilter.init(buf)) msg.contentFilters.add(? HistoryContentFilter.decode(pb))
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(4, pagingInfoBuffer) discard ?pb.getField(4, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer)
var startTime: zint64 var startTime: zint64
discard ?pb.getField(5, startTime) discard ?pb.getField(5, startTime)
@ -145,66 +147,64 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
discard ?pb.getField(6, endTime) discard ?pb.getField(6, endTime)
msg.endTime = Timestamp(endTime) msg.endTime = Timestamp(endTime)
return ok(msg) ok(msg)
proc encode*(response: HistoryResponse): ProtoBuffer = proc encode*(response: HistoryResponse): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
for msg in response.messages: for msg in response.messages:
output.write3(2, msg.encode()) pb.write3(2, msg.encode())
output.write3(3, response.pagingInfo.encode()) pb.write3(3, response.pagingInfo.encode())
output.write3(4, uint32(ord(response.error))) pb.write3(4, uint32(ord(response.error)))
output.finish3() pb.finish3()
return output pb
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse() var msg = HistoryResponse()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]] var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(2, messages) discard ?pb.getRepeatedField(2, messages)
for buf in messages: for pb in messages:
let message = ?WakuMessage.init(buf) let message = ?WakuMessage.decode(pb)
msg.messages.add(message) msg.messages.add(message)
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(3, pagingInfoBuffer) discard ?pb.getField(3, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer)
var error: uint32 var error: uint32
discard ?pb.getField(4, error) discard ?pb.getField(4, error)
msg.error = HistoryResponseError(error) msg.error = HistoryResponseError(error)
return ok(msg) ok(msg)
proc encode*(rpc: HistoryRPC): ProtoBuffer = proc encode*(rpc: HistoryRPC): ProtoBuffer =
var output = initProtoBuffer() var pb = initProtoBuffer()
output.write3(1, rpc.requestId) pb.write3(1, rpc.requestId)
output.write3(2, rpc.query.encode()) pb.write3(2, rpc.query.encode())
output.write3(3, rpc.response.encode()) pb.write3(3, rpc.response.encode())
pb.finish3()
output.finish3() pb
return output proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC() var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
discard ?pb.getField(1, rpc.requestId) discard ?pb.getField(1, rpc.requestId)
var queryBuffer: seq[byte] var queryBuffer: seq[byte]
discard ?pb.getField(2, queryBuffer) discard ?pb.getField(2, queryBuffer)
rpc.query = ?HistoryQuery.init(queryBuffer) rpc.query = ?HistoryQuery.decode(queryBuffer)
var responseBuffer: seq[byte] var responseBuffer: seq[byte]
discard ?pb.getField(3, responseBuffer) discard ?pb.getField(3, responseBuffer)
rpc.response = ?HistoryResponse.init(responseBuffer) rpc.response = ?HistoryResponse.decode(responseBuffer)
return ok(rpc)
ok(rpc)