From 13c8a69c1ecf55877fa3f09d2069b157dc3e26b8 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 3 Nov 2020 21:20:40 +0100 Subject: [PATCH] Add encoding/decoding of payload for Waku v2 (#251) * Add encoding/decoding of payload for Waku v2 * Allow for Waku v1 payload in chat through compile time flag --- examples/v2/chat2.nim | 79 ++++++++++++++++++----- tests/all_tests_v2.nim | 5 +- tests/v2/test_waku_payload.nim | 111 +++++++++++++++++++++++++++++++++ waku/node/v2/waku_payload.nim | 73 ++++++++++++++++++++++ waku/node/v2/waku_types.nim | 2 + 5 files changed, 253 insertions(+), 17 deletions(-) create mode 100644 tests/v2/test_waku_payload.nim create mode 100644 waku/node/v2/waku_payload.nim diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index d13b0afb5..0c29749e5 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -3,7 +3,8 @@ when not(compileOption("threads")): import std/[tables, strformat, strutils] import confutils, chronicles, chronos, stew/shims/net as stewNet, - eth/keys, bearssl, stew/[byteutils, endians2] + eth/keys, bearssl, stew/[byteutils, endians2], + nimcrypto/pbkdf2 import libp2p/[switch, # manage transports, a single entry point for dialing and listening multistream, # tag stream with short header to identify it crypto/crypto, # cryptographic functions @@ -18,7 +19,7 @@ import libp2p/[switch, # manage transports, a single entry poi protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection muxers/mplex/mplex] # define some contants and message types for stream multiplexing -import ../../waku/node/v2/[config, wakunode2, waku_types], +import ../../waku/node/v2/[config, wakunode2, waku_types, waku_payload], ../../waku/protocol/v2/[waku_relay, waku_store, waku_filter], ../../waku/node/common @@ -30,10 +31,12 @@ const Help = """ exit: closes the chat """ -const DefaultTopic = "/waku/2/default-waku/proto" +const + PayloadV1* {.booldefine.} = false + DefaultTopic = "/waku/2/default-waku/proto" -const Dingpu = "dingpu".toBytes -const DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu)) + Dingpu = "dingpu".toBytes + DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu)) # XXX Connected is a bit annoying, because incoming connections don't trigger state change # Could poll connection pool or something here, I suppose @@ -49,6 +52,18 @@ type PrivateKey* = crypto.PrivateKey Topic* = waku_types.Topic + +# Similarly as Status public chats now. +proc generateSymKey(contentTopic: ContentTopic): SymKey = + var ctx: HMAC[sha256] + var symKey: SymKey + if pbkdf2(ctx, contentTopic.toBytes(), "", 65356, symKey) != sizeof(SymKey): + raise (ref Defect)(msg: "Should not occur as array is properly sized") + + symKey + +let DefaultSymKey = generateSymKey(DefaultContentTopic) + proc initAddress(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str).tryGet() if IPFS.match(address) and matchPartial(multiaddress.TCP, address): @@ -68,9 +83,23 @@ proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} = c.connected = true proc publish(c: Chat, line: string) = - let payload = cast[seq[byte]](line) - let message = WakuMessage(payload: payload, contentTopic: DefaultContentTopic) - c.node.publish(DefaultTopic, message) + when PayloadV1: + # Use Waku v1 payload encoding/encryption + let + payload = Payload(payload: line.toBytes(), symKey: some(DefaultSymKey)) + version = 1'u32 + encodedPayload = payload.encode(version, c.node.rng[]) + if encodedPayload.isOk(): + let message = WakuMessage(payload: encodedPayload.get(), + contentTopic: DefaultContentTopic, version: version) + c.node.publish(DefaultTopic, message) + else: + warn "Payload encoding failed", error = encodedPayload.error + else: + # No payload encoding/encryption from Waku + let message = WakuMessage(payload: line.toBytes(), + contentTopic: DefaultContentTopic, version: 0) + c.node.publish(DefaultTopic, message) # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -156,7 +185,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = node = WakuNode.init(conf.nodeKey, conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) - # waitFor vs await await node.start() if conf.filternode != "": @@ -180,7 +208,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: - let payload = cast[string](msg.payload) + let payload = string.fromBytes(msg.payload) echo &"{payload}" info "Hit store handler" @@ -192,7 +220,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = node.wakuFilter.setPeer(parsePeer(conf.filternode)) proc filterHandler(msg: WakuMessage) {.gcsafe.} = - let payload = cast[string](msg.payload) + let payload = string.fromBytes(msg.payload) echo &"{payload}" info "Hit filter handler" @@ -205,10 +233,31 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = # TODO To get end to end sender would require more information in payload # We could possibly indicate the relayer point with connection somehow probably (?) proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = - let message = WakuMessage.init(data).value - let payload = cast[string](message.payload) - echo &"{payload}" - info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic + let decoded = WakuMessage.init(data) + if decoded.isOk(): + let msg = decoded.get() + when PayloadV1: + # Use Waku v1 payload encoding/encryption + let + keyInfo = KeyInfo(kind: Symmetric, symKey: DefaultSymKey) + decodedPayload = decodePayload(decoded.get(), keyInfo) + + if decodedPayload.isOK(): + let payload = string.fromBytes(decodedPayload.get().payload) + echo &"{payload}" + info "Hit subscribe handler", topic, payload, + contentTopic = msg.contentTopic + else: + debug "Invalid encoded WakuMessage payload", + error = decodedPayload.error + else: + # No payload encoding/encryption from Waku + let payload = string.fromBytes(msg.payload) + echo &"{payload}" + info "Hit subscribe handler", topic, payload, + contentTopic = msg.contentTopic + else: + trace "Invalid encoded WakuMessage", error = decoded.error let topic = cast[Topic](DefaultTopic) await node.subscribe(topic, handler) diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 5102f8786..1d5b39d9f 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -5,5 +5,6 @@ import ./v2/test_wakunode, ./v2/test_waku_store, ./v2/test_waku_filter, - ./v2/test_rpc_waku, - ./v2/test_waku_pagination + ./v2/test_waku_pagination, + ./v2/test_waku_payload, + ./v2/test_rpc_waku diff --git a/tests/v2/test_waku_payload.nim b/tests/v2/test_waku_payload.nim new file mode 100644 index 000000000..a3e9b40b3 --- /dev/null +++ b/tests/v2/test_waku_payload.nim @@ -0,0 +1,111 @@ +{.used.} + +import + std/unittest, + ../../waku/node/v2/waku_payload, + ../test_helpers + +procSuite "Waku Payload": + let rng = newRng() + + test "Encode/Decode without encryption (version 0)": + ## This would be the usual way when no encryption is done or when it is done + ## on the application layer. + + # Encoding + let + version = 0'u32 + payload = @[byte 0, 1, 2] + msg = WakuMessage(payload: payload, version: version) + pb = msg.encode() + + # Decoding + let msgDecoded = WakuMessage.init(pb.buffer) + check msgDecoded.isOk() + + let + keyInfo = KeyInfo(kind:None) + decoded = decodePayload(msgDecoded.get(), keyInfo) + + check: + decoded.isOk() + decoded.get().payload == payload + + test "Encode/Decode without encryption (version 0) with encodePayload": + ## This is a bit silly and only there for completeness + + # Encoding + let + version = 0'u32 + payload = Payload(payload: @[byte 0, 1, 2]) + encodedPayload = payload.encode(version, rng[]) + + check encodedPayload.isOk() + let + msg = WakuMessage(payload: encodedPayload.get(), version: version) + pb = msg.encode() + + # Decoding + let msgDecoded = WakuMessage.init(pb.buffer) + check msgDecoded.isOk() + + let + keyInfo = KeyInfo(kind:None) + decoded = decodePayload(msgDecoded.get(), keyInfo) + + check: + decoded.isOk() + decoded.get().payload == payload.payload + + test "Encode/Decode with encryption (version 1)": + # Encoding + let + privKey = PrivateKey.random(rng[]) + version = 1'u32 + payload = Payload(payload: @[byte 0, 1, 2], + dst: some(privKey.toPublicKey())) + encodedPayload = payload.encode(version, rng[]) + + check encodedPayload.isOk() + let + msg = WakuMessage(payload: encodedPayload.get(), version: version) + pb = msg.encode() + + # Decoding + let msgDecoded = WakuMessage.init(pb.buffer) + check msgDecoded.isOk() + + let + keyInfo = KeyInfo(kind: Asymmetric, privKey: privKey) + decoded = decodePayload(msgDecoded.get(), keyInfo) + + check: + decoded.isOk() + decoded.get().payload == payload.payload + + test "Encode with unsupported version": + let + version = 2'u32 + payload = Payload(payload: @[byte 0, 1, 2]) + encodedPayload = payload.encode(version, rng[]) + + check encodedPayload.isErr() + + test "Decode with unsupported version": + # Encoding + let + version = 2'u32 + payload = @[byte 0, 1, 2] + msg = WakuMessage(payload: payload, version: version) + pb = msg.encode() + + # Decoding + let msgDecoded = WakuMessage.init(pb.buffer) + check msgDecoded.isOk() + + let + keyInfo = KeyInfo(kind:None) + decoded = decodePayload(msgDecoded.get(), keyInfo) + + check: + decoded.isErr() diff --git a/waku/node/v2/waku_payload.nim b/waku/node/v2/waku_payload.nim new file mode 100644 index 000000000..28527a4fd --- /dev/null +++ b/waku/node/v2/waku_payload.nim @@ -0,0 +1,73 @@ +import + std/options, + eth/keys, + eth/p2p/rlpx_protocols/whisper/whisper_types, + ./waku_types + +export whisper_types, waku_types, keys, options + +type + KeyKind* = enum + Symmetric + Asymmetric + None + + KeyInfo* = object + case kind*: KeyKind + of Symmetric: + symKey*: SymKey + of Asymmetric: + privKey*: PrivateKey + of None: + discard + +# TODO: +# - This is using `DecodedPayload` from Waku v1 / Whisper and could be altered +# by making that a case object also, e.g. useful for the version 0, but +# especially in the future if there would be yet another version. +# - Also reworking that API to use Result instead of Option could make this +# cleaner. +# - For now this `KeyInfo` is a bit silly also, but perhaps with v2 or +# adjustments to Waku v1 encoding, it can be better. +proc decodePayload*(message: WakuMessage, keyInfo: KeyInfo): + WakuResult[DecodedPayload] = + case message.version + of 0: + return ok(DecodedPayload(payload:message.payload)) + of 1: + case keyInfo.kind + of Symmetric: + let decoded = message.payload.decode(none[PrivateKey](), + some(keyInfo.symKey)) + if decoded.isSome(): + return ok(decoded.get()) + else: + return err("Couldn't decrypt using symmetric key") + of Asymmetric: + let decoded = message.payload.decode(some(keyInfo.privkey), + none[SymKey]()) + if decoded.isSome(): + return ok(decoded.get()) + else: + return err("Couldn't decrypt using asymmetric key") + of None: + discard + else: + return err("Unsupported WakuMessage version") + +# TODO: same story as for `decodedPayload`, but then regarding the `Payload` +# object. +proc encode*(payload: Payload, version: uint32, rng: var BrHmacDrbgContext): + WakuResult[seq[byte]] = + case version + of 0: + # This is rather silly + return ok(payload.payload) + of 1: + let encoded = encode(rng, payload) + if encoded.isSome(): + return ok(encoded.get()) + else: + return err("Couldn't encode the payload") + else: + return err("Unsupported WakuMessage version") diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 0af484955..663aa0dba 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -144,6 +144,8 @@ type listenStr*: string #multiaddrStrings*: seq[string] + WakuResult*[T] = Result[T, cstring] + # Encoding and decoding ------------------------------------------------------- proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =