diff --git a/.gitmodules b/.gitmodules index bde56a76e..4829fdb3c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -194,3 +194,7 @@ url = https://github.com/waku-org/waku-rlnv2-contract.git ignore = untracked branch = master +[submodule "vendor/mix"] + path = vendor/mix + url = https://github.com/vacp2p/mix/ + branch = mix-waku-integ diff --git a/examples/lightpush_publisher_mix.nim b/examples/lightpush_publisher_mix.nim new file mode 100644 index 000000000..607c78e12 --- /dev/null +++ b/examples/lightpush_publisher_mix.nim @@ -0,0 +1,127 @@ +import + std/[tables, times, sequtils], + stew/byteutils, + stew/shims/net, + chronicles, + results, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import ../vendor/mix/src/entry_connection, + ../vendor/mix/src/protocol + +import + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_core/codecs, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + waku_lightpush/client + ] + +proc now*(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +# careful if running pub and sub in the same machine +const wakuPort = 60000 + +const clusterId = 2 +const shardId = @[0'u16] + +const + LightpushPeer = + "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu" + LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0") + LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto") + +proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = + # use notice to filter all waku messaging + setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + + notice "starting publisher", wakuPort = wakuPort + + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + let relayShards = RelayShards.init(clusterId, shardId).valueOr: + error "Relay shards initialization failed", error = error + quit(QuitFailure) + + var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withWakuRelaySharding(relayShards).expect( + "Building ENR with relay sharding failed" + ) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") + node.mountLightPushClient() + ( + await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a") + ).isOkOr: + error "failed to mount waku mix protocol: ", error = $error + return + + let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr: + error "Failed to initialize PeerId", err = error + return + + let conn = MixEntryConnection.newConn( + "/ip4/127.0.0.1/tcp/60001", + destPeerId, + ProtocolType.fromString(WakuLightPushCodec), + node.mix) + + await node.start() + node.peerManager.start() + + notice "publisher service started" + while true: + let text = "hi there i'm a publisher using mix" + let message = WakuMessage( + payload: toBytes(text), # content of the message + contentTopic: LightpushContentTopic, # content topic to publish to + ephemeral: true, # tell store nodes to not store it + timestamp: now(), + ) # current timestamp + + let res = + await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn) + + if res.isOk: + notice "published message", + text = text, + timestamp = message.timestamp, + psTopic = LightpushPubsubTopic, + contentTopic = LightpushContentTopic + else: + error "failed to publish message", error = res.error + + await sleepAsync(5000) + break + +when isMainModule: + let rng = crypto.newRng() + asyncSpawn setupAndPublish(rng) + runForever() diff --git a/waku.nimble b/waku.nimble index 6cf804098..accef59e5 100644 --- a/waku.nimble +++ b/waku.nimble @@ -132,6 +132,7 @@ task example2, "Build Waku examples": buildBinary "subscriber", "examples/" buildBinary "filter_subscriber", "examples/" buildBinary "lightpush_publisher", "examples/" + buildBinary "lightpush_publisher_mix", "examples/" task chat2, "Build example Waku chat usage": # NOTE For debugging, set debug level. For chat usage we want minimal log diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 9bc073426..abdff75c7 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -664,6 +664,16 @@ with the drawback of consuming some more bandwidth.""", name: "rendezvous" .}: bool + #Mix config + mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}: + Option[string] + #TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs +#[ mixBootstrapNodes* {. + desc: + "Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.", + name: "mix-bootstrap-node" + .}: seq[string] ]# + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d2d6b1d99..76f91ad38 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -426,6 +426,17 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + #mount mix + let mixPrivKey:string = + if conf.mixkey.isSome(): + conf.mixkey.get() + else: + error "missing mix key" + return err("missing mix key") + ( + await node.mountMix(mixPrivKey) + ).isOkOr: + return err("failed to mount waku mix protocol: " & $error) return ok() ## Start node diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e10d705ff..d1b8b7332 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -9,9 +9,11 @@ import stew/byteutils, eth/keys, nimcrypto, + nimcrypto/utils as ncrutils, bearssl/rand, eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, + libp2p/crypto/curve25519, libp2p/protocols/ping, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, @@ -21,7 +23,13 @@ import libp2p/transports/transport, libp2p/transports/tcptransport, libp2p/transports/wstransport, - libp2p/utility + libp2p/utility, + ../../vendor/mix/src/mix_node, + ../../vendor/mix/src/mix_protocol, + ../../vendor/mix/src/curve25519, + ../../vendor/mix/src/protocol + + import ../waku_core, ../waku_core/topics/sharding, @@ -121,6 +129,8 @@ type topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] contentTopicHandlers: Table[ContentTopic, TopicHandler] rateLimitSettings*: ProtocolRateLimitSettings + mix*: MixProtocol + mixbootNodes*: Table[PeerId, MixPubInfo] proc new*( T: type WakuNode, @@ -206,6 +216,65 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() +proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] = + var mixNodes = initTable[PeerId, MixPubInfo]() + # MixNode Multiaddrs and PublicKeys: + let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", + "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", + "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", + "/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", + "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", + ] + let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c", + "9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a", + "275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c", + "e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18", + "8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" + ] + for index, mixNodeMultiaddr in bootNodesMultiaddrs: + let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr) + if peerIdRes.isErr: + error "Failed to get peer id from multiaddress: " , error = peerIdRes.error + let peerId = peerIdRes.get() + if peerID == exceptPeerID: + continue + let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index]))) + + mixNodes[peerId] = mixNodePubInfo + info "using mix bootstrap nodes ", bootNodes = mixNodes + return mixNodes + + + # Mix Protocol +proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} = + info "mounting mix protocol", nodeId = node.info #TODO log the config used + info "mixPrivKey", mixPrivKey = mixPrivKey + + let mixKey = intoCurve25519Key(ncrutils.fromHex(mixPrivKey)) + let mixPubKey = public(mixKey) + + let localaddrStr = node.announcedAddresses[0].toString().valueOr: + return err("Failed to convert multiaddress to string.") + info "local addr", localaddr = localaddrStr + + let localMixNodeInfo = initMixNodeInfo( + localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey, + node.switch.peerInfo.privateKey.skkey, + ) + + let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId)) + if protoRes.isErr: + error "Mix protocol initialization failed", err = protoRes.error + return + node.mix = protoRes.value + + let catchRes = catch: + node.switch.mount(node.mix) + if catchRes.isErr(): + return err(catchRes.error.msg) + + return ok() + ## Waku Sync proc mountStoreSync*( diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 7aa2d9fa9..d97973c71 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -1,7 +1,7 @@ {.push raises: [].} import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils -import libp2p/peerid +import libp2p/peerid, libp2p/stream/connection import ../waku_core/peers, ../node/peer_manager, @@ -110,3 +110,21 @@ proc publishToAny*( obs.onMessagePublished(pubSubTopic, message) return lightpushSuccessResult(publishedCount) + + +proc publishWithConn*( + wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection +): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + ## This proc is similar to the publish one but in this case + ## we use existing connection to publish. + + info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) + let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest)) + await conn.writeLP(rpc.encode().buffer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return lightpushSuccessResult(1) \ No newline at end of file