diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index a77971889..af5a7d38f 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -251,6 +251,11 @@ type desc: "Peer multiaddr to request lightpush of published messages.", defaultValue: "" name: "lightpushnode" }: string + + dandelion* {. + desc: "Enable dandelion stem relaying: true|false", + defaultValue: false + name: "dandelion" }: bool ## JSON-RPC config diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index f2073baa2..daef6f5d0 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -567,7 +567,7 @@ proc mountRelay*(node: WakuNode, info "relay mounted successfully" -proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = +proc mountLightPush*(node: WakuNode, dandelion: bool = false) {.async, raises: [Defect, LPError].} = info "mounting light push" if node.wakuRelay.isNil: @@ -575,7 +575,9 @@ proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil) else: debug "mounting lightpush with relay" - node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay) + if dandelion: + debug "activate lightpush dandelion relay" + node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay, dandelion) if node.started: # Node has started already. Let's start lightpush too. @@ -1095,8 +1097,8 @@ when isMainModule: setStorePeer(node, conf.storenode) # NOTE Must be mounted after relay - if (conf.lightpushnode != "") or (conf.lightpush): - waitFor mountLightPush(node) + if (conf.lightpushnode != "") or (conf.lightpush) or (conf.dandelion): + waitFor mountLightPush(node, conf.dandelion) if conf.lightpushnode != "": setLightPushPeer(node, conf.lightpushnode) diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim index 557f7badb..1770f52ea 100644 --- a/waku/v2/protocol/waku_lightpush/protocol.nim +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -1,7 +1,7 @@ {.push raises: [Defect].} import - std/options, + std/[options, times, random], stew/results, chronicles, chronos, @@ -32,6 +32,10 @@ const const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead +const + DandelionQ = 0.2 # Dandelion q paramter + EpochDuration = chronos.minutes(10) + # Error types (metric label values) const dialFailure = "dial_failure" @@ -45,75 +49,16 @@ type WakuLightPushResult*[T] = Result[T, string] + WakuDandelionStem = ref object + isStemState: bool + dandelionRelay: RemotePeerInfo + WakuLightPush* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager requestHandler*: PushRequestHandler relayReference*: WakuRelay - - -proc init*(wl: WakuLightPush) = - - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let message = await conn.readLp(MaxRpcSize.int) - let res = PushRPC.init(message) - if res.isErr(): - error "failed to decode rpc" - waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) - return - - let rpc = res.get() - - if rpc.request != PushRequest(): - info "lightpush push request" - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - - let - pubSubTopic = rpc.request.pubSubTopic - message = rpc.request.message - debug "PushRequest", pubSubTopic=pubSubTopic, msg=message - - var response: PushResponse - if not wl.relayReference.isNil(): - let data = message.encode().buffer - - # Assumimng success, should probably be extended to check for network, peers, etc - discard wl.relayReference.publish(pubSubTopic, data) - response = PushResponse(is_success: true, info: "Totally.") - else: - debug "No relay protocol present, unsuccesssful push" - response = PushResponse(is_success: false, info: "No relay protocol") - - - let rpc = PushRPC(requestId: rpc.requestId, response: response) - await conn.writeLp(rpc.encode().buffer) - - if rpc.response != PushResponse(): - waku_lightpush_messages.inc(labelValues = ["PushResponse"]) - if rpc.response.isSuccess: - info "lightpush message success" - else: - info "lightpush message failure", info=rpc.response.info - - wl.handler = handle - wl.codec = WakuLightPushCodec - -proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T = - debug "init" - let rng = crypto.newRng() - let wl = WakuLightPush(rng: rng, - peerManager: peerManager, - requestHandler: handler, - relayReference: relay) - wl.init() - - return wl - - -proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = - wlp.peerManager.addPeer(peer, WakuLightPushCodec) - waku_lightpush_peers.inc() - + wakuDandelionStem: Option[WakuDandelionStem] proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} = let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec) @@ -146,3 +91,110 @@ proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[P return err(dialFailure) return await wl.request(req, peerOpt.get()) + +proc updateEpoch(wl: WakuLightPush) = + randomize() + let wd = wl.wakuDandelionStem.get() + if rand(1.0) < DandelionQ: + wd.isStemState = false + else: + wd.isStemState = true + let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec) # TODO: select random peer from pubSubTopic mesh; all Dandelion supporting nodes have to support the WakuLightPushCodec + # todo: if peerOpt.isNone: ; retry until we get working peer + wd.dandelionRelay = peerOpt.get() + +proc startDandelionEpochLoop(wl: WakuLightPush) = + updateEpoch(wl) + + let currentTime = getTime().toUnix() + let timeToNextEpochBoundry = chronos.seconds( + EpochDuration.seconds - (currentTime mod EpochDuration.seconds)) + + # https://github.com/nim-lang/Nim/issues/17369 + var executeUpdateEpoch: proc(data: pointer) {.gcsafe, raises: [Defect].} + executeUpdateEpoch = proc(udata: pointer) {.gcsafe.} = + updateEpoch(wl) + discard setTimer(Moment.fromNow(EpochDuration), executeUpdateEpoch) + + discard setTimer(Moment.fromNow(timeToNextEpochBoundry), executeUpdateEpoch) + + +proc initProtocolHandler*(wl: WakuLightPush) = + + proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let message = await conn.readLp(MaxRpcSize.int) + let res = PushRPC.init(message) + if res.isErr(): + error "failed to decode rpc" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return + + let rpc = res.get() + + if rpc.request != PushRequest(): + info "lightpush push request" + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + + let + pubSubTopic = rpc.request.pubSubTopic + message = rpc.request.message + debug "PushRequest", pubSubTopic=pubSubTopic, msg=message + + var response: PushResponse + + if wl.wakuDandelionStem.isSome and wl.wakuDandelionStem.get().isStemState: # Node is in Dandelion Stem State + let rpc = PushRequest(pubSubTopic: pubsubTopic, message: message) + discard wl.request(rpc, wl.wakuDandelionStem.get().dandelionRelay) + response = PushResponse(is_success: true, info: "Totally.") # do not tell that we are in stem state + + if not wl.relayReference.isNil(): + let data = message.encode().buffer + + # Assumimng success, should probably be extended to check for network, peers, etc + discard wl.relayReference.publish(pubSubTopic, data) + response = PushResponse(is_success: true, info: "Totally.") + else: + debug "No relay protocol present, unsuccesssful push" + response = PushResponse(is_success: false, info: "No relay protocol") + + let rpc = PushRPC(requestId: rpc.requestId, response: response) + await conn.writeLp(rpc.encode().buffer) + + if rpc.response != PushResponse(): + waku_lightpush_messages.inc(labelValues = ["PushResponse"]) + if rpc.response.isSuccess: + info "lightpush message success" + else: + info "lightpush message failure", info=rpc.response.info + + wl.handler = handler + wl.codec = WakuLightPushCodec + +proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil, dandelion: bool = false): T = + debug "init" + let rng = crypto.newRng() + + var wdOption: Option[WakuDandelionStem] + if dandelion: + let wd = WakuDandelionStem() + wdOption = some(wd) + + let wl = WakuLightPush(rng: rng, + peerManager: peerManager, + requestHandler: handler, + relayReference: relay, + wakuDandelionStem: wdOption) + wl.initProtocolHandler() + + if dandelion: + wl.startDandelionEpochLoop() + + return wl + + +proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = + wlp.peerManager.addPeer(peer, WakuLightPushCodec) + waku_lightpush_peers.inc() + + +