mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-11 23:35:08 +00:00
feat(lightpush): dandelion support
- first draft implementation
This commit is contained in:
parent
5e90085242
commit
5266c7061e
@ -251,6 +251,11 @@ type
|
||||
desc: "Peer multiaddr to request lightpush of published messages.",
|
||||
defaultValue: ""
|
||||
name: "lightpushnode" }: string
|
||||
|
||||
dandelion* {.
|
||||
desc: "Enable dandelion stem relaying: true|false",
|
||||
defaultValue: false
|
||||
name: "dandelion" }: bool
|
||||
|
||||
## JSON-RPC config
|
||||
|
||||
|
@ -567,7 +567,7 @@ proc mountRelay*(node: WakuNode,
|
||||
|
||||
info "relay mounted successfully"
|
||||
|
||||
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountLightPush*(node: WakuNode, dandelion: bool = false) {.async, raises: [Defect, LPError].} =
|
||||
info "mounting light push"
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
@ -575,7 +575,9 @@ proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
|
||||
else:
|
||||
debug "mounting lightpush with relay"
|
||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
|
||||
if dandelion:
|
||||
debug "activate lightpush dandelion relay"
|
||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay, dandelion)
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start lightpush too.
|
||||
@ -1095,8 +1097,8 @@ when isMainModule:
|
||||
setStorePeer(node, conf.storenode)
|
||||
|
||||
# NOTE Must be mounted after relay
|
||||
if (conf.lightpushnode != "") or (conf.lightpush):
|
||||
waitFor mountLightPush(node)
|
||||
if (conf.lightpushnode != "") or (conf.lightpush) or (conf.dandelion):
|
||||
waitFor mountLightPush(node, conf.dandelion)
|
||||
|
||||
if conf.lightpushnode != "":
|
||||
setLightPushPeer(node, conf.lightpushnode)
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/[options, times, random],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
@ -32,6 +32,10 @@ const
|
||||
const
|
||||
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
const
|
||||
DandelionQ = 0.2 # Dandelion q paramter
|
||||
EpochDuration = chronos.minutes(10)
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
@ -45,75 +49,16 @@ type
|
||||
|
||||
WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
WakuDandelionStem = ref object
|
||||
isStemState: bool
|
||||
dandelionRelay: RemotePeerInfo
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
requestHandler*: PushRequestHandler
|
||||
relayReference*: WakuRelay
|
||||
|
||||
|
||||
proc init*(wl: WakuLightPush) =
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
let message = await conn.readLp(MaxRpcSize.int)
|
||||
let res = PushRPC.init(message)
|
||||
if res.isErr():
|
||||
error "failed to decode rpc"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
let rpc = res.get()
|
||||
|
||||
if rpc.request != PushRequest():
|
||||
info "lightpush push request"
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
|
||||
let
|
||||
pubSubTopic = rpc.request.pubSubTopic
|
||||
message = rpc.request.message
|
||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||
|
||||
var response: PushResponse
|
||||
if not wl.relayReference.isNil():
|
||||
let data = message.encode().buffer
|
||||
|
||||
# Assumimng success, should probably be extended to check for network, peers, etc
|
||||
discard wl.relayReference.publish(pubSubTopic, data)
|
||||
response = PushResponse(is_success: true, info: "Totally.")
|
||||
else:
|
||||
debug "No relay protocol present, unsuccesssful push"
|
||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||
|
||||
|
||||
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
if rpc.response != PushResponse():
|
||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||
if rpc.response.isSuccess:
|
||||
info "lightpush message success"
|
||||
else:
|
||||
info "lightpush message failure", info=rpc.response.info
|
||||
|
||||
wl.handler = handle
|
||||
wl.codec = WakuLightPushCodec
|
||||
|
||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
||||
debug "init"
|
||||
let rng = crypto.newRng()
|
||||
let wl = WakuLightPush(rng: rng,
|
||||
peerManager: peerManager,
|
||||
requestHandler: handler,
|
||||
relayReference: relay)
|
||||
wl.init()
|
||||
|
||||
return wl
|
||||
|
||||
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
wakuDandelionStem: Option[WakuDandelionStem]
|
||||
|
||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||
@ -146,3 +91,110 @@ proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[P
|
||||
return err(dialFailure)
|
||||
|
||||
return await wl.request(req, peerOpt.get())
|
||||
|
||||
proc updateEpoch(wl: WakuLightPush) =
|
||||
randomize()
|
||||
let wd = wl.wakuDandelionStem.get()
|
||||
if rand(1.0) < DandelionQ:
|
||||
wd.isStemState = false
|
||||
else:
|
||||
wd.isStemState = true
|
||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec) # TODO: select random peer from pubSubTopic mesh; all Dandelion supporting nodes have to support the WakuLightPushCodec
|
||||
# todo: if peerOpt.isNone: ; retry until we get working peer
|
||||
wd.dandelionRelay = peerOpt.get()
|
||||
|
||||
proc startDandelionEpochLoop(wl: WakuLightPush) =
|
||||
updateEpoch(wl)
|
||||
|
||||
let currentTime = getTime().toUnix()
|
||||
let timeToNextEpochBoundry = chronos.seconds(
|
||||
EpochDuration.seconds - (currentTime mod EpochDuration.seconds))
|
||||
|
||||
# https://github.com/nim-lang/Nim/issues/17369
|
||||
var executeUpdateEpoch: proc(data: pointer) {.gcsafe, raises: [Defect].}
|
||||
executeUpdateEpoch = proc(udata: pointer) {.gcsafe.} =
|
||||
updateEpoch(wl)
|
||||
discard setTimer(Moment.fromNow(EpochDuration), executeUpdateEpoch)
|
||||
|
||||
discard setTimer(Moment.fromNow(timeToNextEpochBoundry), executeUpdateEpoch)
|
||||
|
||||
|
||||
proc initProtocolHandler*(wl: WakuLightPush) =
|
||||
|
||||
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
let message = await conn.readLp(MaxRpcSize.int)
|
||||
let res = PushRPC.init(message)
|
||||
if res.isErr():
|
||||
error "failed to decode rpc"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
let rpc = res.get()
|
||||
|
||||
if rpc.request != PushRequest():
|
||||
info "lightpush push request"
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
|
||||
let
|
||||
pubSubTopic = rpc.request.pubSubTopic
|
||||
message = rpc.request.message
|
||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||
|
||||
var response: PushResponse
|
||||
|
||||
if wl.wakuDandelionStem.isSome and wl.wakuDandelionStem.get().isStemState: # Node is in Dandelion Stem State
|
||||
let rpc = PushRequest(pubSubTopic: pubsubTopic, message: message)
|
||||
discard wl.request(rpc, wl.wakuDandelionStem.get().dandelionRelay)
|
||||
response = PushResponse(is_success: true, info: "Totally.") # do not tell that we are in stem state
|
||||
|
||||
if not wl.relayReference.isNil():
|
||||
let data = message.encode().buffer
|
||||
|
||||
# Assumimng success, should probably be extended to check for network, peers, etc
|
||||
discard wl.relayReference.publish(pubSubTopic, data)
|
||||
response = PushResponse(is_success: true, info: "Totally.")
|
||||
else:
|
||||
debug "No relay protocol present, unsuccesssful push"
|
||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||
|
||||
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
if rpc.response != PushResponse():
|
||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||
if rpc.response.isSuccess:
|
||||
info "lightpush message success"
|
||||
else:
|
||||
info "lightpush message failure", info=rpc.response.info
|
||||
|
||||
wl.handler = handler
|
||||
wl.codec = WakuLightPushCodec
|
||||
|
||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil, dandelion: bool = false): T =
|
||||
debug "init"
|
||||
let rng = crypto.newRng()
|
||||
|
||||
var wdOption: Option[WakuDandelionStem]
|
||||
if dandelion:
|
||||
let wd = WakuDandelionStem()
|
||||
wdOption = some(wd)
|
||||
|
||||
let wl = WakuLightPush(rng: rng,
|
||||
peerManager: peerManager,
|
||||
requestHandler: handler,
|
||||
relayReference: relay,
|
||||
wakuDandelionStem: wdOption)
|
||||
wl.initProtocolHandler()
|
||||
|
||||
if dandelion:
|
||||
wl.startDandelionEpochLoop()
|
||||
|
||||
return wl
|
||||
|
||||
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user