From e4b310467c2d6917de14715db2f3d8c976ce411c Mon Sep 17 00:00:00 2001 From: Emil Ivanichkov Date: Sat, 10 Feb 2024 02:53:59 +0200 Subject: [PATCH] feat(waku examples): Add examples of pairing with `nwaku` instance --- build/.gitkeep | 0 build/data/.gitkeep | 0 libs/waku-utils/example/agentA.nim | 214 ----------------------- libs/waku-utils/example/agentB.nim | 203 --------------------- libs/waku-utils/example/nwaku/agentA.nim | 182 +++++++++++++++++++ libs/waku-utils/example/nwaku/agentB.nim | 190 ++++++++++++++++++++ 6 files changed, 372 insertions(+), 417 deletions(-) create mode 100644 build/.gitkeep create mode 100644 build/data/.gitkeep delete mode 100644 libs/waku-utils/example/agentA.nim delete mode 100644 libs/waku-utils/example/agentB.nim create mode 100644 libs/waku-utils/example/nwaku/agentA.nim create mode 100644 libs/waku-utils/example/nwaku/agentB.nim diff --git a/build/.gitkeep b/build/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/build/data/.gitkeep b/build/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/libs/waku-utils/example/agentA.nim b/libs/waku-utils/example/agentA.nim deleted file mode 100644 index 8162cfe..0000000 --- a/libs/waku-utils/example/agentA.nim +++ /dev/null @@ -1,214 +0,0 @@ -import - std/[tables,times,sequtils], - stew/byteutils, - stew/shims/net, - chronicles, - chronos, - confutils, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr, - testutils/unittests - - -import - waku/common/logging, - waku/node/peer_manager, - waku/waku_core, - waku/waku_node, - waku/waku_enr, - waku/waku_discv5, - waku/common/protobuf, - waku/utils/noise as waku_message_utils, - waku/waku_noise/noise_types, - waku/waku_noise/noise_utils, - waku/waku_noise/noise_handshake_processing, - waku/waku_core - -import ../waku_handshake_utils - -proc now*(): Timestamp = - getNanosecondTime(getTime().toUnixFloat()) - -# An accesible bootstrap node. See wakuv2.prod fleets.status.im - - -const bootstrapNode = "enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9D" & - "OGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgn" & - "Y0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY" & - "24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQG" & - "H0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvY" & - "j5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP" - -# careful if running pub and sub in the same machine -const wakuPort = 60000 -const discv5Port = 9000 - - - -proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = - var readyForFinalization = false - - ######################### - # Content Topic information - let contentTopicInfo = ContentTopicInfo( - applicationName: "waku-noise-sessions", - applicationVersion: "0.1", - shardId: "10",) - - ################################ - # Alice static/ephemeral key initialization and commitment - let aliceInfo = initAgentKeysAndCommitment(rng) - let s = aliceInfo.commitment - - let qr = readFile("qr.txt") - let qrMessageNameTag = cast[seq[byte]](readFile("qrMessageNametag.txt")) - echo qrMessageNameTag - - # We set the contentTopic from the content topic parameters exchanged in the QR - let contentTopic = initContentTopicFromQr(qr) - - var aliceHS = initHS(aliceInfo, qr, true) - - var - sentTransportMessage: seq[byte] - aliceStep: HandshakeStepResult - wakuMsg: Result[WakuMessage, cstring] - readPayloadV2: PayloadV2 - aliceMessageNametag: MessageNametag - aliceHSResult: HandshakeResult - - - # use notice to filter all waku messaging - setupLogLevel(logging.LogLevel.NOTICE) - notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port - let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() - ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init(lightpush = false, filter = false, store = false, relay = true) - - var enrBuilder = EnrBuilder.init(nodeKey) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - error "failed to create enr record", error=recordRes.error - quit(QuitFailure) - else: recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withNodeKey(nodeKey) - builder.withRecord(record) - builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() - let node = builder.build().tryGet() - - var bootstrapNodeEnr: enr.Record - discard bootstrapNodeEnr.fromURI(bootstrapNode) - - let discv5Conf = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: ip, - port: Port(discv5Port), - privateKey: keys.PrivateKey(nodeKey.skkey), - bootstrapRecords: @[bootstrapNodeEnr], - autoupdateRecord: true, - ) - - # assumes behind a firewall, so not care about being discoverable - let wakuDiscv5 = WakuDiscoveryV5.new( - node.rng, - discv5Conf, - some(node.enr), - some(node.peerManager), - node.topicSubscriptionQueue, - ) - - await node.start() - await node.mountRelay() - node.peerManager.start() - - (await wakuDiscv5.start()).isOkOr: - error "failed to start discv5", error = error - quit(1) - - # wait for a minimum of peers to be connected, otherwise messages wont be gossiped - while true: - let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) - if numConnectedPeers >= 6: - notice "publisher is ready", connectedPeers=numConnectedPeers, required=6 - break - notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 - await sleepAsync(5000) - - # Make sure it matches the publisher. Use default value - # see spec: https://rfc.vac.dev/spec/23/ - let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") - - ############################################### - # We prepare a Waku message from Alice's payload2 - echo "qrMessageNametag ", qrMessageNametag - echo "aliceMessageNametag ", aliceMessageNametag - - wakuMsg = prepareHandShakeInitiatorMsg(rng, contentTopic, aliceInfo, - qrMessageNameTag, aliceMessageNametag, - aliceHS, aliceStep) - echo "aliceMessageNametag ", aliceMessageNametag - - await publishHandShakeInitiatorMsg(node, pubSubTopic, contentTopic, wakuMsg.get()) - echo "aliceMessageNametag ", aliceMessageNametag - - # aliceMessageNametag = toMessageNametag(aliceHS) - let step2Nametag = aliceMessageNametag - echo "step2Nametag ", step2Nametag - proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - # let payloadStr = string.fromBytes(msg.payload) - if msg.contentTopic == contentTopic: - readPayloadV2 = decodePayloadV2(msg).get() - if readPayloadV2.messageNametag == step2Nametag: - echo "aliceMessageNametag ", aliceMessageNametag - - handleHandShakeMsg(rng, pubSubTopic, contentTopic,step = 2, readPayloadV2, - aliceStep, aliceHS, aliceMessageNametag) - echo "aliceMessageNametag ", aliceMessageNametag - - # await sleepAsync(5000) - let handShakeMsgStep3 = prepareHandShakeMsg(rng, contentTopic, aliceInfo, - aliceMessageNametag, aliceHS, - aliceStep, step = 3) - echo "aliceMessageNametag ", aliceMessageNametag - - await publishHandShakeMsg( node, pubSubTopic, contentTopic, handShakeMsgStep3.get(), 3) - readyForFinalization = true - echo "aliceMessageNametag ", aliceMessageNametag - - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) - - while true: - if readyForFinalization: - notice "Finalizing handshake" - aliceHSResult = finalizeHandshake(aliceHS) - await sleepAsync(5000) - break - await sleepAsync(5000) - - var - payload2: PayloadV2 - realMessage: seq[byte] - readMessage: seq[byte] - - # Bob writes to Alice - realMessage = @[(byte)42,42,42,42] - let realMessageContentTopic = "/" & contentTopicInfo.applicationName & "/" & contentTopicInfo.applicationVersion & "/wakunoise/1/sessions_shard-" & contentTopicInfo.shardId & "/real" & "/proto" - payload2 = writeMessage(aliceHSResult, realMessage, outboundMessageNametagBuffer = aliceHSResult.nametagsOutbound) - echo aliceHSResult.h - wakuMsg = encodePayloadV2( payload2, realMessageContentTopic) - await node.publish(some(pubSubTopic), wakuMsg.get) - notice "Sending real message", payload=payload2, - pubsubTopic=pubsubTopic, - contentTopic=realMessageContentTopic - - -when isMainModule: - let rng = crypto.newRng() - asyncSpawn setupAndPublish(rng) - runForever() diff --git a/libs/waku-utils/example/agentB.nim b/libs/waku-utils/example/agentB.nim deleted file mode 100644 index b6af5c9..0000000 --- a/libs/waku-utils/example/agentB.nim +++ /dev/null @@ -1,203 +0,0 @@ -import - std/[tables, sequtils], - stew/byteutils, - stew/shims/net, - chronicles, - chronos, - confutils, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr - -import - waku/common/logging, - waku/node/peer_manager, - waku/waku_core, - waku/waku_node, - waku/waku_enr, - waku/waku_discv5, - waku/common/protobuf, - waku/utils/noise as waku_message_utils, - waku/waku_noise/noise_types, - waku/waku_noise/noise_utils, - waku/waku_noise/noise_handshake_processing, - waku/waku_core - -import ../waku_handshake_utils - -# An accesible bootstrap node. See wakuv2.prod fleets.status.im -const bootstrapNode = "enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZl" & - "K0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS" & - "3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmct" & - "Yy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGh" & - "Ao0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1" & - "ZHCCIyiFd2FrdTIP" - -# careful if running pub and sub in the same machine -const wakuPort = 50000 -const discv5Port = 8000 - - -proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = - var readyForFinalization = false - - ################################ - # Bob static/ephemeral key initialization and commitment - let bobInfo = initAgentKeysAndCommitment(rng) - let r = bobInfo.commitment - - ######################### - # Content Topic information - let contentTopicInfo = ContentTopicInfo( - applicationName: "waku-noise-sessions", - applicationVersion: "0.1", - shardId: "10",) - - let (qr, qrMessageNametag) = initQr(rng, contentTopicInfo, bobInfo) - writeFile("qr.txt", qr) - writeFile("qrMessageNametag.txt", qrMessageNametag) - echo qrMessageNametag - - # We set the contentTopic from the content topic parameters exchanged in the QR - let contentTopic = initContentTopicFromQr(qr) - echo "contentTopic: ", contentTopic - var bobHS = initHS(bobInfo, qr) - - - var - bobStep: HandshakeStepResult - wakuMsg: Result[WakuMessage, cstring] - readPayloadV2: PayloadV2 - bobMessageNametag: MessageNametag - bobHSResult: HandshakeResult - - - # use notice to filter all waku messaging - setupLogLevel(logging.LogLevel.NOTICE) - notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port - let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init(lightpush = false, filter = false, store = false, relay = true) - - var enrBuilder = EnrBuilder.init(nodeKey) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - error "failed to create enr record", error=recordRes.error - quit(QuitFailure) - else: recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withNodeKey(nodeKey) - builder.withRecord(record) - builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() - let node = builder.build().tryGet() - - var bootstrapNodeEnr: enr.Record - discard bootstrapNodeEnr.fromURI(bootstrapNode) - - let discv5Conf = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: ip, - port: Port(discv5Port), - privateKey: keys.PrivateKey(nodeKey.skkey), - bootstrapRecords: @[bootstrapNodeEnr], - autoupdateRecord: true, - ) - - # assumes behind a firewall, so not care about being discoverable - let wakuDiscv5 = WakuDiscoveryV5.new( - node.rng, - discv5Conf, - some(node.enr), - some(node.peerManager), - node.topicSubscriptionQueue, - ) - - await node.start() - await node.mountRelay() - node.peerManager.start() - - (await wakuDiscv5.start()).isOkOr: - error "failed to start discv5", error = error - quit(1) - - # wait for a minimum of peers to be connected, otherwise messages wont be gossiped - while true: - let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) - if numConnectedPeers >= 6: - notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6 - break - notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 - await sleepAsync(5000) - - # Make sure it matches the publisher. Use default value - # see spec: https://rfc.vac.dev/spec/23/ - let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") - var step2Nameteag: MessageNametag - echo "qrMessageNametag ", qrMessageNametag - proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - # let payloadStr = string.fromBytes(msg.payload) - if msg.contentTopic == contentTopic: - readPayloadV2 = decodePayloadV2(msg).get() - if readPayloadV2.messageNametag == qrMessageNametag: - echo "bobMessageNametag ", bobMessageNametag - handleHandShakeInitiatorMsg(rng, pubSubTopic, contentTopic, readPayloadV2, - bobStep, bobHS, bobMessageNametag, - qrMessageNametag) - echo "bobMessageNametag ", bobMessageNametag - step2Nameteag = bobMessageNametag - wakuMsg = prepareHandShakeMsg(rng, contentTopic, bobInfo, - bobMessageNametag, bobHS, bobStep, - step = 2) - echo "bobMessageNametag ", bobMessageNametag - await publishHandShakeMsg(node, pubSubTopic, contentTopic, - wakuMsg.get(), step = 2) - - elif readPayloadV2.messageNametag != step2Nameteag: - # bobMessageNametag = toMessageNametag(bobHS) - handleHandShakeMsg(rng, pubSubTopic, contentTopic, step = 3, readPayloadV2, - bobStep, bobHS, bobMessageNametag) - # notice "step 3 message received", payload=readPayloadV2, - # pubsubTopic=pubsubTopic, - # contentTopic=msg.contentTopic, - # timestamp=msg.timestamp - # # Bob reads Alice's payloads, and returns the (decrypted) transport message Alice sent to him - # bobStep = stepHandshake(rng[], bobHS, readPayloadV2 = readPayloadV2, messageNametag = bobMessageNametag).get() - readyForFinalization = true - - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) - while true: - if readyForFinalization: - notice "Finalizing handshake" - bobHSResult = finalizeHandshake(bobHS) - - proc realMessageHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - let realMessageContentTopic = "/" & contentTopicInfo.applicationName & "/" & contentTopicInfo.applicationVersion & "/wakunoise/1/sessions_shard-" & contentTopicInfo.shardId & "/real" & "/proto" - - if msg.contentTopic == realMessageContentTopic: - readPayloadV2 = decodePayloadV2(msg).get() - notice "Received real message", payload=readPayloadV2, - pubsubTopic=pubsubTopic, - contentTopic=msg.contentTopic, - timestamp=msg.timestamp - let readMessage = readMessage(bobHSResult, readPayloadV2, inboundMessageNametagBuffer = bobHSResult.nametagsInbound).get() - echo readMessage - echo bobHSResult.h - - - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(realMessageHandler)) - break - await sleepAsync(5000) - - - - - - -when isMainModule: - let rng = crypto.newRng() - asyncSpawn setupAndSubscribe(rng) - runForever() diff --git a/libs/waku-utils/example/nwaku/agentA.nim b/libs/waku-utils/example/nwaku/agentA.nim new file mode 100644 index 0000000..f86f4fa --- /dev/null +++ b/libs/waku-utils/example/nwaku/agentA.nim @@ -0,0 +1,182 @@ +import + std/[tables, times, sequtils], + stew/shims/net, + chronicles, + chronos, + confutils, + libp2p/crypto/crypto, + eth/[keys, p2p/discoveryv5/enr], + nimcrypto/utils + +import + waku/[waku_core, waku_node, waku_enr, waku_discv5], + waku/waku_noise/[noise_types, noise_utils, noise_handshake_processing], + waku/utils/noise, + waku/node/peer_manager, + waku/common/[logging, protobuf] + +import ../../waku_handshake_utils + +const bootstrapNode = "enr:-P-4QGVNANzbhCI49du6Moyw98AjuMhKoOpE_Jges9JlCq-I" & + "CAVadktjfcNpuhQgT0g1cu86_S3nbM7eYkCsqDAQG7UBgmlkgnY0" & + "gmlwhI_G-a6KbXVsdGlhZGRyc7hgAC02KG5vZGUtMDEuZG8tYW1z" & + "My5zdGF0dXMucHJvZC5zdGF0dXNpbS5uZXQGdl8ALzYobm9kZS0w" & + "MS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1c2ltLm5ldAYBu94D" & + "iXNlY3AyNTZrMaECoVyonsTGEQvVioM562Q1fjzTb_vKD152PPId" & + "sV7sM6SDdGNwgnZfg3VkcIIjKIV3YWt1Mg8" +# careful if running pub and sub in the same machine +const + wakuPort = 60000 + discv5Port = 9000 + requiredConnectedPeers = 2 + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") + +proc exampleNwakuAgentA(rng: ref HmacDrbgContext) {.async.} = + setupLogLevel(logging.LogLevel.NOTICE) + + var readyForFinalization = false + + # agentA static/ephemeral key initialization and commitment + let agentAInfo = initAgentKeysAndCommitment(rng) + + # Read the QR + let + qr = readFile("build/data/qr.txt") + (_, _, _, readEphemeralKey, _) = fromQr(qr) + qrMessageNameTag = cast[seq[byte]](readFile("build/data/qrMessageNametag.txt")) + # We set the contentTopic from the content topic parameters exchanged in the QR + contentTopic = initContentTopicFromQr(qr) + + notice "Starting `nwaku`-`nwaku` pairing example. Agent A", + wakuPort = wakuPort, discv5Port = discv5Port + + notice "Initial information parsed from the QR", contentTopic = contentTopic, + qrMessageNameTag = qrMessageNameTag + + var + agentAHS = initHS(agentAInfo, qr, true) + agentAHSResult: HandshakeResult + + # Setup the Waku node + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(lightpush = false, filter = false, + store = false, relay = true) + + var enrBuilder = EnrBuilder.init(nodeKey) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + var bootstrapNodeEnr: enr.Record + discard bootstrapNodeEnr.fromURI(bootstrapNode) + + let discv5Conf = WakuDiscoveryV5Config(discv5Config: none(DiscoveryConfig), + address: ip, port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true) + + # assumes behind a firewall, so not care about being discoverable + let wakuDiscv5 = WakuDiscoveryV5.new(node.rng, discv5Conf, some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue) + + await node.start() + await node.mountRelay() + node.peerManager.start() + + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error + quit(1) + + # Wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.peerStore[ + ConnectionBook].book.values().countIt(it == Connected) + if numConnectedPeers >= requiredConnectedPeers: + notice "Node is ready", connectedPeers = numConnectedPeers, + required = requiredConnectedPeers + break + notice "Waiting for the node to be ready", + connectedPeers = numConnectedPeers, required = requiredConnectedPeers + await sleepAsync(5000) + + # Perform the handshake + agentAHSResult = await initiatorHandshake(rng, node, pubSubTopic, contentTopic, + qr, qrMessageNameTag, agentAInfo) + + await sleepAsync(1000) # Just in case there is some kind of delay on the other side + + ## Fake lost messages + let + message1 = @[(byte)1, 42, 42, 42] + payload1 = writeMessage(agentAHSResult, message1, + agentAHSResult.nametagsOutbound) + wakuMessage1 = encodePayloadV2(payload1, contentTopic) + notice "Sending first message" + await node.publish(some(pubSubTopic), wakuMessage1.get) + + let + lostMessage1 = @[(byte)1, 5, 5, 5] + payloadLost1 = writeMessage(agentAHSResult, lostMessage1, + agentAHSResult.nametagsOutbound) + wakuLostMessage1 = encodePayloadV2(payloadLost1, contentTopic) + + let + lostMessage2 = @[(byte)2, 5, 5, 5] + payloadLost2 = writeMessage(agentAHSResult, lostMessage2, + agentAHSResult.nametagsOutbound) + wakuLostMessage2 = encodePayloadV2(payloadLost2, contentTopic) + + let + message2 = @[(byte)2, 42, 42, 42] + payload2 = writeMessage(agentAHSResult, message2, + agentAHSResult.nametagsOutbound) + wakuMessage2 = encodePayloadV2(payload2, contentTopic) + notice "Sending second message" + await node.publish(some(pubSubTopic), wakuMessage2.get) + + let + lostMessage3 = @[(byte)3, 5, 5, 5] + payloadLost3 = writeMessage(agentAHSResult, lostMessage3, + agentAHSResult.nametagsOutbound) + wakuLostMessage3 = encodePayloadV2(payloadLost3, contentTopic) + + await sleepAsync(10000) + notice "Sending first lost message" + await node.publish(some(pubSubTopic), wakuLostMessage1.get) + + let + message3 = @[(byte)3, 42, 42, 42] + payload3 = writeMessage(agentAHSResult, message3, + agentAHSResult.nametagsOutbound) + wakuMessage3 = encodePayloadV2(payload3, contentTopic) + notice "Sending third message" + await node.publish(some(pubSubTopic), wakuMessage3.get) + + await sleepAsync(10000) + notice "Sending second lost message" + await node.publish(some(pubSubTopic), wakuLostMessage2.get) + + await sleepAsync(1000) + notice "Sending third lost message" + await node.publish(some(pubSubTopic), wakuLostMessage3.get) + +when isMainModule: + let rng = crypto.newRng() + asyncSpawn exampleNwakuAgentA(rng) + runForever() diff --git a/libs/waku-utils/example/nwaku/agentB.nim b/libs/waku-utils/example/nwaku/agentB.nim new file mode 100644 index 0000000..4af86e0 --- /dev/null +++ b/libs/waku-utils/example/nwaku/agentB.nim @@ -0,0 +1,190 @@ +import + std/[tables, times, sequtils], + stew/shims/net, + chronicles, + chronos, + confutils, + libp2p/crypto/crypto, + eth/[keys, p2p/discoveryv5/enr], + nimcrypto/utils + +import + waku/[waku_core, waku_node, waku_enr, waku_discv5], + waku/waku_noise/[noise_types, noise_utils, noise_handshake_processing], + waku/utils/noise, + waku/node/peer_manager, + waku/common/[logging, protobuf] + +import ../../waku_handshake_utils + +const bootstrapNode = "enr:-P-4QGVNANzbhCI49du6Moyw98AjuMhKoOpE_Jges9JlCq-I" & + "CAVadktjfcNpuhQgT0g1cu86_S3nbM7eYkCsqDAQG7UBgmlkgnY0" & + "gmlwhI_G-a6KbXVsdGlhZGRyc7hgAC02KG5vZGUtMDEuZG8tYW1z" & + "My5zdGF0dXMucHJvZC5zdGF0dXNpbS5uZXQGdl8ALzYobm9kZS0w" & + "MS5kby1hbXMzLnN0YXR1cy5wcm9kLnN0YXR1c2ltLm5ldAYBu94D" & + "iXNlY3AyNTZrMaECoVyonsTGEQvVioM562Q1fjzTb_vKD152PPId" & + "sV7sM6SDdGNwgnZfg3VkcIIjKIV3YWt1Mg8" +const + wakuPort = 50000 + discv5Port = 8000 + requiredConnectedPeers = 2 + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") + +proc exampleNwakuAgentB(rng: ref HmacDrbgContext) {.async.} = + setupLogLevel(logging.LogLevel.NOTICE) + + var readyForFinalization = false + + let agentBInfo = initAgentKeysAndCommitment(rng) + let r = agentBInfo.commitment + + ######################### + # Content Topic information + let contentTopicInfo = ContentTopicInfo( + applicationName: "waku-noise-sessions", + applicationVersion: "0.1", + shardId: "10", ) + + let (qr, qrMessageNametag) = initQr(rng, contentTopicInfo, agentBInfo) + writeFile("build/data/qr.txt", qr) + writeFile("build/data/qrMessageNametag.txt", qrMessageNametag) + + # We set the contentTopic from the content topic parameters exchanged in the QR + let contentTopic = initContentTopicFromQr(qr) + + notice "Starting `nwaku`-`nwaku` pairing example. Agent A", + wakuPort = wakuPort, discv5Port = discv5Port + + notice "Initial information parsed from the QR", contentTopic = contentTopic, + qrMessageNameTag = qrMessageNameTag + + var + agentBHS = initHS(agentBInfo, qr) + agentBStep: HandshakeStepResult + wakuMsg: Result[WakuMessage, cstring] + readPayloadV2: PayloadV2 + agentBMessageNametag: MessageNametag + agentBHSResult: HandshakeResult + + # Setup the Waku node + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(lightpush = false, filter = false, + store = false, relay = true) + + var enrBuilder = EnrBuilder.init(nodeKey) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + var bootstrapNodeEnr: enr.Record + discard bootstrapNodeEnr.fromURI(bootstrapNode) + + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: ip, + port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true, + ) + + # assumes behind a firewall, so not care about being discoverable + let wakuDiscv5 = WakuDiscoveryV5.new( + node.rng, + discv5Conf, + some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue, + ) + + await node.start() + await node.mountRelay() + node.peerManager.start() + + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error + quit(1) + + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.peerStore[ + ConnectionBook].book.values().countIt(it == Connected) + if numConnectedPeers >= requiredConnectedPeers: + notice "subscriber is ready", connectedPeers = numConnectedPeers, + required = requiredConnectedPeers + break + notice "waiting to be ready", connectedPeers = numConnectedPeers, + required = requiredConnectedPeers + await sleepAsync(5000) + + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") + var step2Nametag: MessageNametag + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + # let payloadStr = string.fromBytes(msg.payload) + if msg.contentTopic == contentTopic: + readPayloadV2 = decodePayloadV2(msg).get() + if readPayloadV2.messageNametag == qrMessageNametag: + handleHandShakeInitiatorMsg(rng, pubSubTopic, contentTopic, readPayloadV2, + agentBStep, agentBHS, agentBMessageNametag, + qrMessageNametag) + step2Nametag = agentBMessageNametag + wakuMsg = prepareHandShakeMsg(rng, contentTopic, agentBInfo, + agentBMessageNametag, agentBHS, + agentBStep, + step = 2) + await publishHandShakeMsg(node, pubSubTopic, contentTopic, + wakuMsg.get(), step = 2) + + agentBMessageNametag = toMessageNametag(agentBHS) + elif readPayloadV2.messageNametag == agentBMessageNametag: + handleHandShakeMsg(rng, pubSubTopic, contentTopic, step = 3, readPayloadV2, + agentBStep, agentBHS, agentBMessageNametag) + readyForFinalization = true + + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) + + var handshakeFinalized = false + while true: + if readyForFinalization: + notice "Finalizing handshake" + agentBHSResult = finalizeHandshake(agentBHS) + notice "Handshake finalized successfully" + handshakeFinalized = true + break + await sleepAsync(5000) + + if handshakeFinalized: + proc realMessageHandler(topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async.} = + if msg.contentTopic == contentTopic: + readPayloadV2 = decodePayloadV2(msg).get() + notice "Received real message", payload = readPayloadV2, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp + let readMessage = readMessage(agentBHSResult, readPayloadV2, + agentBHSResult.nametagsInbound).get() + echo readMessage + + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(realMessageHandler)) + +when isMainModule: + let rng = crypto.newRng() + asyncSpawn exampleNwakuAgentB(rng) + runForever()