mirror of https://github.com/waku-org/nwaku.git
Add encoding/decoding of payload for Waku v2 (#251)
* Add encoding/decoding of payload for Waku v2 * Allow for Waku v1 payload in chat through compile time flag
This commit is contained in:
parent
57c3cd44a9
commit
0b70fe61a4
|
@ -3,7 +3,8 @@ when not(compileOption("threads")):
|
||||||
|
|
||||||
import std/[tables, strformat, strutils]
|
import std/[tables, strformat, strutils]
|
||||||
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
||||||
eth/keys, bearssl, stew/[byteutils, endians2]
|
eth/keys, bearssl, stew/[byteutils, endians2],
|
||||||
|
nimcrypto/pbkdf2
|
||||||
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||||
multistream, # tag stream with short header to identify it
|
multistream, # tag stream with short header to identify it
|
||||||
crypto/crypto, # cryptographic functions
|
crypto/crypto, # cryptographic functions
|
||||||
|
@ -18,7 +19,7 @@ import libp2p/[switch, # manage transports, a single entry poi
|
||||||
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
|
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
|
||||||
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
||||||
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
||||||
import ../../waku/node/v2/[config, wakunode2, waku_types],
|
import ../../waku/node/v2/[config, wakunode2, waku_types, waku_payload],
|
||||||
../../waku/protocol/v2/[waku_relay, waku_store, waku_filter],
|
../../waku/protocol/v2/[waku_relay, waku_store, waku_filter],
|
||||||
../../waku/node/common
|
../../waku/node/common
|
||||||
|
|
||||||
|
@ -30,10 +31,12 @@ const Help = """
|
||||||
exit: closes the chat
|
exit: closes the chat
|
||||||
"""
|
"""
|
||||||
|
|
||||||
const DefaultTopic = "/waku/2/default-waku/proto"
|
const
|
||||||
|
PayloadV1* {.booldefine.} = false
|
||||||
|
DefaultTopic = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
const Dingpu = "dingpu".toBytes
|
Dingpu = "dingpu".toBytes
|
||||||
const DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu))
|
DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu))
|
||||||
|
|
||||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||||
# Could poll connection pool or something here, I suppose
|
# Could poll connection pool or something here, I suppose
|
||||||
|
@ -49,6 +52,18 @@ type
|
||||||
PrivateKey* = crypto.PrivateKey
|
PrivateKey* = crypto.PrivateKey
|
||||||
Topic* = waku_types.Topic
|
Topic* = waku_types.Topic
|
||||||
|
|
||||||
|
|
||||||
|
# Similarly as Status public chats now.
|
||||||
|
proc generateSymKey(contentTopic: ContentTopic): SymKey =
|
||||||
|
var ctx: HMAC[sha256]
|
||||||
|
var symKey: SymKey
|
||||||
|
if pbkdf2(ctx, contentTopic.toBytes(), "", 65356, symKey) != sizeof(SymKey):
|
||||||
|
raise (ref Defect)(msg: "Should not occur as array is properly sized")
|
||||||
|
|
||||||
|
symKey
|
||||||
|
|
||||||
|
let DefaultSymKey = generateSymKey(DefaultContentTopic)
|
||||||
|
|
||||||
proc initAddress(T: type MultiAddress, str: string): T =
|
proc initAddress(T: type MultiAddress, str: string): T =
|
||||||
let address = MultiAddress.init(str).tryGet()
|
let address = MultiAddress.init(str).tryGet()
|
||||||
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
||||||
|
@ -68,9 +83,23 @@ proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
|
||||||
c.connected = true
|
c.connected = true
|
||||||
|
|
||||||
proc publish(c: Chat, line: string) =
|
proc publish(c: Chat, line: string) =
|
||||||
let payload = cast[seq[byte]](line)
|
when PayloadV1:
|
||||||
let message = WakuMessage(payload: payload, contentTopic: DefaultContentTopic)
|
# Use Waku v1 payload encoding/encryption
|
||||||
c.node.publish(DefaultTopic, message)
|
let
|
||||||
|
payload = Payload(payload: line.toBytes(), symKey: some(DefaultSymKey))
|
||||||
|
version = 1'u32
|
||||||
|
encodedPayload = payload.encode(version, c.node.rng[])
|
||||||
|
if encodedPayload.isOk():
|
||||||
|
let message = WakuMessage(payload: encodedPayload.get(),
|
||||||
|
contentTopic: DefaultContentTopic, version: version)
|
||||||
|
c.node.publish(DefaultTopic, message)
|
||||||
|
else:
|
||||||
|
warn "Payload encoding failed", error = encodedPayload.error
|
||||||
|
else:
|
||||||
|
# No payload encoding/encryption from Waku
|
||||||
|
let message = WakuMessage(payload: line.toBytes(),
|
||||||
|
contentTopic: DefaultContentTopic, version: 0)
|
||||||
|
c.node.publish(DefaultTopic, message)
|
||||||
|
|
||||||
# TODO This should read or be subscribe handler subscribe
|
# TODO This should read or be subscribe handler subscribe
|
||||||
proc readAndPrint(c: Chat) {.async.} =
|
proc readAndPrint(c: Chat) {.async.} =
|
||||||
|
@ -156,7 +185,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||||
node = WakuNode.init(conf.nodeKey, conf.listenAddress,
|
node = WakuNode.init(conf.nodeKey, conf.listenAddress,
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
||||||
|
|
||||||
# waitFor vs await
|
|
||||||
await node.start()
|
await node.start()
|
||||||
|
|
||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
|
@ -180,7 +208,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||||
|
|
||||||
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
|
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
|
||||||
for msg in response.messages:
|
for msg in response.messages:
|
||||||
let payload = cast[string](msg.payload)
|
let payload = string.fromBytes(msg.payload)
|
||||||
echo &"{payload}"
|
echo &"{payload}"
|
||||||
info "Hit store handler"
|
info "Hit store handler"
|
||||||
|
|
||||||
|
@ -192,7 +220,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||||
node.wakuFilter.setPeer(parsePeer(conf.filternode))
|
node.wakuFilter.setPeer(parsePeer(conf.filternode))
|
||||||
|
|
||||||
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
|
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
|
||||||
let payload = cast[string](msg.payload)
|
let payload = string.fromBytes(msg.payload)
|
||||||
echo &"{payload}"
|
echo &"{payload}"
|
||||||
info "Hit filter handler"
|
info "Hit filter handler"
|
||||||
|
|
||||||
|
@ -205,10 +233,31 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||||
# TODO To get end to end sender would require more information in payload
|
# TODO To get end to end sender would require more information in payload
|
||||||
# We could possibly indicate the relayer point with connection somehow probably (?)
|
# We could possibly indicate the relayer point with connection somehow probably (?)
|
||||||
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
|
||||||
let message = WakuMessage.init(data).value
|
let decoded = WakuMessage.init(data)
|
||||||
let payload = cast[string](message.payload)
|
if decoded.isOk():
|
||||||
echo &"{payload}"
|
let msg = decoded.get()
|
||||||
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
when PayloadV1:
|
||||||
|
# Use Waku v1 payload encoding/encryption
|
||||||
|
let
|
||||||
|
keyInfo = KeyInfo(kind: Symmetric, symKey: DefaultSymKey)
|
||||||
|
decodedPayload = decodePayload(decoded.get(), keyInfo)
|
||||||
|
|
||||||
|
if decodedPayload.isOK():
|
||||||
|
let payload = string.fromBytes(decodedPayload.get().payload)
|
||||||
|
echo &"{payload}"
|
||||||
|
info "Hit subscribe handler", topic, payload,
|
||||||
|
contentTopic = msg.contentTopic
|
||||||
|
else:
|
||||||
|
debug "Invalid encoded WakuMessage payload",
|
||||||
|
error = decodedPayload.error
|
||||||
|
else:
|
||||||
|
# No payload encoding/encryption from Waku
|
||||||
|
let payload = string.fromBytes(msg.payload)
|
||||||
|
echo &"{payload}"
|
||||||
|
info "Hit subscribe handler", topic, payload,
|
||||||
|
contentTopic = msg.contentTopic
|
||||||
|
else:
|
||||||
|
trace "Invalid encoded WakuMessage", error = decoded.error
|
||||||
|
|
||||||
let topic = cast[Topic](DefaultTopic)
|
let topic = cast[Topic](DefaultTopic)
|
||||||
await node.subscribe(topic, handler)
|
await node.subscribe(topic, handler)
|
||||||
|
|
|
@ -5,5 +5,6 @@ import
|
||||||
./v2/test_wakunode,
|
./v2/test_wakunode,
|
||||||
./v2/test_waku_store,
|
./v2/test_waku_store,
|
||||||
./v2/test_waku_filter,
|
./v2/test_waku_filter,
|
||||||
./v2/test_rpc_waku,
|
./v2/test_waku_pagination,
|
||||||
./v2/test_waku_pagination
|
./v2/test_waku_payload,
|
||||||
|
./v2/test_rpc_waku
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/unittest,
|
||||||
|
../../waku/node/v2/waku_payload,
|
||||||
|
../test_helpers
|
||||||
|
|
||||||
|
procSuite "Waku Payload":
|
||||||
|
let rng = newRng()
|
||||||
|
|
||||||
|
test "Encode/Decode without encryption (version 0)":
|
||||||
|
## This would be the usual way when no encryption is done or when it is done
|
||||||
|
## on the application layer.
|
||||||
|
|
||||||
|
# Encoding
|
||||||
|
let
|
||||||
|
version = 0'u32
|
||||||
|
payload = @[byte 0, 1, 2]
|
||||||
|
msg = WakuMessage(payload: payload, version: version)
|
||||||
|
pb = msg.encode()
|
||||||
|
|
||||||
|
# Decoding
|
||||||
|
let msgDecoded = WakuMessage.init(pb.buffer)
|
||||||
|
check msgDecoded.isOk()
|
||||||
|
|
||||||
|
let
|
||||||
|
keyInfo = KeyInfo(kind:None)
|
||||||
|
decoded = decodePayload(msgDecoded.get(), keyInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
decoded.isOk()
|
||||||
|
decoded.get().payload == payload
|
||||||
|
|
||||||
|
test "Encode/Decode without encryption (version 0) with encodePayload":
|
||||||
|
## This is a bit silly and only there for completeness
|
||||||
|
|
||||||
|
# Encoding
|
||||||
|
let
|
||||||
|
version = 0'u32
|
||||||
|
payload = Payload(payload: @[byte 0, 1, 2])
|
||||||
|
encodedPayload = payload.encode(version, rng[])
|
||||||
|
|
||||||
|
check encodedPayload.isOk()
|
||||||
|
let
|
||||||
|
msg = WakuMessage(payload: encodedPayload.get(), version: version)
|
||||||
|
pb = msg.encode()
|
||||||
|
|
||||||
|
# Decoding
|
||||||
|
let msgDecoded = WakuMessage.init(pb.buffer)
|
||||||
|
check msgDecoded.isOk()
|
||||||
|
|
||||||
|
let
|
||||||
|
keyInfo = KeyInfo(kind:None)
|
||||||
|
decoded = decodePayload(msgDecoded.get(), keyInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
decoded.isOk()
|
||||||
|
decoded.get().payload == payload.payload
|
||||||
|
|
||||||
|
test "Encode/Decode with encryption (version 1)":
|
||||||
|
# Encoding
|
||||||
|
let
|
||||||
|
privKey = PrivateKey.random(rng[])
|
||||||
|
version = 1'u32
|
||||||
|
payload = Payload(payload: @[byte 0, 1, 2],
|
||||||
|
dst: some(privKey.toPublicKey()))
|
||||||
|
encodedPayload = payload.encode(version, rng[])
|
||||||
|
|
||||||
|
check encodedPayload.isOk()
|
||||||
|
let
|
||||||
|
msg = WakuMessage(payload: encodedPayload.get(), version: version)
|
||||||
|
pb = msg.encode()
|
||||||
|
|
||||||
|
# Decoding
|
||||||
|
let msgDecoded = WakuMessage.init(pb.buffer)
|
||||||
|
check msgDecoded.isOk()
|
||||||
|
|
||||||
|
let
|
||||||
|
keyInfo = KeyInfo(kind: Asymmetric, privKey: privKey)
|
||||||
|
decoded = decodePayload(msgDecoded.get(), keyInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
decoded.isOk()
|
||||||
|
decoded.get().payload == payload.payload
|
||||||
|
|
||||||
|
test "Encode with unsupported version":
|
||||||
|
let
|
||||||
|
version = 2'u32
|
||||||
|
payload = Payload(payload: @[byte 0, 1, 2])
|
||||||
|
encodedPayload = payload.encode(version, rng[])
|
||||||
|
|
||||||
|
check encodedPayload.isErr()
|
||||||
|
|
||||||
|
test "Decode with unsupported version":
|
||||||
|
# Encoding
|
||||||
|
let
|
||||||
|
version = 2'u32
|
||||||
|
payload = @[byte 0, 1, 2]
|
||||||
|
msg = WakuMessage(payload: payload, version: version)
|
||||||
|
pb = msg.encode()
|
||||||
|
|
||||||
|
# Decoding
|
||||||
|
let msgDecoded = WakuMessage.init(pb.buffer)
|
||||||
|
check msgDecoded.isOk()
|
||||||
|
|
||||||
|
let
|
||||||
|
keyInfo = KeyInfo(kind:None)
|
||||||
|
decoded = decodePayload(msgDecoded.get(), keyInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
decoded.isErr()
|
|
@ -0,0 +1,73 @@
|
||||||
|
import
|
||||||
|
std/options,
|
||||||
|
eth/keys,
|
||||||
|
eth/p2p/rlpx_protocols/whisper/whisper_types,
|
||||||
|
./waku_types
|
||||||
|
|
||||||
|
export whisper_types, waku_types, keys, options
|
||||||
|
|
||||||
|
type
|
||||||
|
KeyKind* = enum
|
||||||
|
Symmetric
|
||||||
|
Asymmetric
|
||||||
|
None
|
||||||
|
|
||||||
|
KeyInfo* = object
|
||||||
|
case kind*: KeyKind
|
||||||
|
of Symmetric:
|
||||||
|
symKey*: SymKey
|
||||||
|
of Asymmetric:
|
||||||
|
privKey*: PrivateKey
|
||||||
|
of None:
|
||||||
|
discard
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
# - This is using `DecodedPayload` from Waku v1 / Whisper and could be altered
|
||||||
|
# by making that a case object also, e.g. useful for the version 0, but
|
||||||
|
# especially in the future if there would be yet another version.
|
||||||
|
# - Also reworking that API to use Result instead of Option could make this
|
||||||
|
# cleaner.
|
||||||
|
# - For now this `KeyInfo` is a bit silly also, but perhaps with v2 or
|
||||||
|
# adjustments to Waku v1 encoding, it can be better.
|
||||||
|
proc decodePayload*(message: WakuMessage, keyInfo: KeyInfo):
|
||||||
|
WakuResult[DecodedPayload] =
|
||||||
|
case message.version
|
||||||
|
of 0:
|
||||||
|
return ok(DecodedPayload(payload:message.payload))
|
||||||
|
of 1:
|
||||||
|
case keyInfo.kind
|
||||||
|
of Symmetric:
|
||||||
|
let decoded = message.payload.decode(none[PrivateKey](),
|
||||||
|
some(keyInfo.symKey))
|
||||||
|
if decoded.isSome():
|
||||||
|
return ok(decoded.get())
|
||||||
|
else:
|
||||||
|
return err("Couldn't decrypt using symmetric key")
|
||||||
|
of Asymmetric:
|
||||||
|
let decoded = message.payload.decode(some(keyInfo.privkey),
|
||||||
|
none[SymKey]())
|
||||||
|
if decoded.isSome():
|
||||||
|
return ok(decoded.get())
|
||||||
|
else:
|
||||||
|
return err("Couldn't decrypt using asymmetric key")
|
||||||
|
of None:
|
||||||
|
discard
|
||||||
|
else:
|
||||||
|
return err("Unsupported WakuMessage version")
|
||||||
|
|
||||||
|
# TODO: same story as for `decodedPayload`, but then regarding the `Payload`
|
||||||
|
# object.
|
||||||
|
proc encode*(payload: Payload, version: uint32, rng: var BrHmacDrbgContext):
|
||||||
|
WakuResult[seq[byte]] =
|
||||||
|
case version
|
||||||
|
of 0:
|
||||||
|
# This is rather silly
|
||||||
|
return ok(payload.payload)
|
||||||
|
of 1:
|
||||||
|
let encoded = encode(rng, payload)
|
||||||
|
if encoded.isSome():
|
||||||
|
return ok(encoded.get())
|
||||||
|
else:
|
||||||
|
return err("Couldn't encode the payload")
|
||||||
|
else:
|
||||||
|
return err("Unsupported WakuMessage version")
|
|
@ -144,6 +144,8 @@ type
|
||||||
listenStr*: string
|
listenStr*: string
|
||||||
#multiaddrStrings*: seq[string]
|
#multiaddrStrings*: seq[string]
|
||||||
|
|
||||||
|
WakuResult*[T] = Result[T, cstring]
|
||||||
|
|
||||||
# Encoding and decoding -------------------------------------------------------
|
# Encoding and decoding -------------------------------------------------------
|
||||||
|
|
||||||
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
|
Loading…
Reference in New Issue