deploy: 2053a7a51d915bc69453c485a9e1bdb0f3d02f0a

This commit is contained in:
kaiserd 2022-03-17 17:05:11 +00:00
parent ba9c1d8cd3
commit c9065e020e
5 changed files with 121 additions and 57 deletions

View File

@ -52,7 +52,7 @@ procSuite "Waku Discovery v5":
some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), some(extIp), some(nodeTcpPort1), some(nodeUdpPort1),
bindIp, bindIp,
nodeUdpPort1, nodeUdpPort1,
@[], newSeq[string](),
false, false,
keys.PrivateKey(nodeKey1.skkey), keys.PrivateKey(nodeKey1.skkey),
flags, flags,

View File

@ -45,7 +45,7 @@ proc parseBootstrapAddress(address: TaintedString):
else: else:
return err "Ignoring unrecognized bootstrap address type" return err "Ignoring unrecognized bootstrap address type"
proc addBootstrapNode(bootstrapAddr: string, proc addBootstrapNode*(bootstrapAddr: string,
bootstrapEnrs: var seq[enr.Record]) = bootstrapEnrs: var seq[enr.Record]) =
# Ignore empty lines or lines starting with # # Ignore empty lines or lines starting with #
if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#': if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#':
@ -103,18 +103,13 @@ proc new*(T: type WakuDiscoveryV5,
extTcpPort, extUdpPort: Option[Port], extTcpPort, extUdpPort: Option[Port],
bindIP: ValidIpAddress, bindIP: ValidIpAddress,
discv5UdpPort: Port, discv5UdpPort: Port,
bootstrapNodes: seq[string], bootstrapEnrs: seq[enr.Record],
enrAutoUpdate = false, enrAutoUpdate = false,
privateKey: keys.PrivateKey, privateKey: keys.PrivateKey,
flags: WakuEnrBitfield, flags: WakuEnrBitfield,
enrFields: openArray[(string, seq[byte])], enrFields: openArray[(string, seq[byte])],
rng: ref BrHmacDrbgContext, rng: ref BrHmacDrbgContext,
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T =
var bootstrapEnrs: seq[enr.Record]
for node in bootstrapNodes:
addBootstrapNode(node, bootstrapEnrs)
## TODO: consider loading from a configurable bootstrap file ## TODO: consider loading from a configurable bootstrap file
## We always add the waku field as specified ## We always add the waku field as specified
@ -134,6 +129,38 @@ proc new*(T: type WakuDiscoveryV5,
return WakuDiscoveryV5(protocol: protocol, listening: false) return WakuDiscoveryV5(protocol: protocol, listening: false)
# constructor that takes bootstrap Enrs in Enr Uri form
proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
extTcpPort, extUdpPort: Option[Port],
bindIP: ValidIpAddress,
discv5UdpPort: Port,
bootstrapNodes: seq[string],
enrAutoUpdate = false,
privateKey: keys.PrivateKey,
flags: WakuEnrBitfield,
enrFields: openArray[(string, seq[byte])],
rng: ref BrHmacDrbgContext,
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T =
var bootstrapEnrs: seq[enr.Record]
for node in bootstrapNodes:
addBootstrapNode(node, bootstrapEnrs)
return WakuDiscoveryV5.new(
extIP, extTcpPort, extUdpPort,
bindIP,
discv5UdpPort,
bootstrapEnrs,
enrAutoUpdate,
privateKey,
flags,
enrFields,
rng,
discv5Config
)
proc open*(wakuDiscv5: WakuDiscoveryV5) {.raises: [Defect, CatchableError].} = proc open*(wakuDiscv5: WakuDiscoveryV5) {.raises: [Defect, CatchableError].} =
debug "Opening Waku discovery v5 ports" debug "Opening Waku discovery v5 ports"

View File

@ -963,7 +963,7 @@ when isMainModule:
# Setup functions # # Setup functions #
################### ###################
# 1/6 Setup storage # 1/7 Setup storage
proc setupStorage(conf: WakuNodeConf): proc setupStorage(conf: WakuNodeConf):
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] = SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] =
@ -1013,9 +1013,35 @@ when isMainModule:
ok(storeTuple) ok(storeTuple)
# 2/6 Initialize node # 2/7 Retrieve dynamic bootstrap nodes
proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] =
# DNS discovery
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl
var nameServers: seq[TransportAddress]
for ip in conf.dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
resolver)
if wakuDnsDiscovery.isOk:
return wakuDnsDiscovery.get().findPeers()
.mapErr(proc (e: cstring): string = $e)
else:
warn "Failed to init Waku DNS discovery"
# 3/7 Initialize node
proc initNode(conf: WakuNodeConf, proc initNode(conf: WakuNodeConf,
pStorage: WakuPeerStorage = nil): SetupResult[WakuNode] = pStorage: WakuPeerStorage = nil,
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] =
## Setup a basic Waku v2 node based on a supplied configuration ## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage. ## file. Optionally include persistent peer storage.
@ -1075,11 +1101,26 @@ when isMainModule:
discoveryConfig = DiscoveryConfig.init( discoveryConfig = DiscoveryConfig.init(
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop) conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop)
# select dynamic bootstrap nodes that have an ENR containing a udp port.
# Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
var discv5BootstrapEnrs: seq[enr.Record]
for n in dynamicBootstrapNodes:
if n.enr.isSome():
let
enr = n.enr.get()
tenrRes = enr.toTypedRecord()
if tenrRes.isOk() and (tenrRes.get().udp.isSome() or tenrRes.get().udp6.isSome()):
discv5BootstrapEnrs.add(enr)
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
node.wakuDiscv5 = WakuDiscoveryV5.new( node.wakuDiscv5 = WakuDiscoveryV5.new(
extIP, extPort, some(discv5UdpPort), extIP, extPort, some(discv5UdpPort),
conf.listenAddress, conf.listenAddress,
discv5UdpPort, discv5UdpPort,
conf.discv5BootstrapNodes, discv5BootstrapEnrs,
conf.discv5EnrAutoUpdate, conf.discv5EnrAutoUpdate,
keys.PrivateKey(conf.nodekey.skkey), keys.PrivateKey(conf.nodekey.skkey),
wakuFlags, wakuFlags,
@ -1090,7 +1131,7 @@ when isMainModule:
ok(node) ok(node)
# 3/6 Mount and initialize configured protocols # 4/7 Mount and initialize configured protocols
proc setupProtocols(node: var WakuNode, proc setupProtocols(node: var WakuNode,
conf: WakuNodeConf, conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] = mStorage: WakuMessageStore = nil): SetupResult[bool] =
@ -1161,8 +1202,9 @@ when isMainModule:
ok(true) # Success ok(true) # Success
# 4/6 Start node and mounted protocols # 5/7 Start node and mounted protocols
proc startNode(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = proc startNode(node: WakuNode, conf: WakuNodeConf,
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): SetupResult[bool] =
## Start a configured node and all mounted protocols. ## Start a configured node and all mounted protocols.
## Resume history, connect to static nodes and start ## Resume history, connect to static nodes and start
## keep-alive, if configured. ## keep-alive, if configured.
@ -1170,7 +1212,7 @@ when isMainModule:
# Start Waku v2 node # Start Waku v2 node
waitFor node.start() waitFor node.start()
# start discv5 and connect to discovered nodes # Start discv5 and connect to discovered nodes
if conf.discv5Discovery: if conf.discv5Discovery:
if not waitFor node.startDiscv5(): if not waitFor node.startDiscv5():
error "could not start Discovery v5" error "could not start Discovery v5"
@ -1183,38 +1225,16 @@ when isMainModule:
if conf.staticnodes.len > 0: if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes, "static") waitFor connectToNodes(node, conf.staticnodes, "static")
# Connect to discovered nodes info "Connecting to dynamic bootstrap peers"
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "": waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl
var nameServers: seq[TransportAddress]
for ip in conf.dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
resolver)
if wakuDnsDiscovery.isOk:
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
waitFor connectToNodes(node, discoveredPeers.get(), "dnsdisc")
else:
warn "Failed to init Waku DNS discovery"
# Start keepalive, if enabled # Start keepalive, if enabled
if conf.keepAlive: if conf.keepAlive:
node.startKeepalive() node.startKeepalive()
ok(true) # Success ok(true) # Success
# 5/6 Start monitoring tools and external interfaces # 6/7 Start monitoring tools and external interfaces
proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] =
## Start configured external interfaces and monitoring tools ## Start configured external interfaces and monitoring tools
## on a Waku v2 node, including the RPC API and metrics ## on a Waku v2 node, including the RPC API and metrics
@ -1242,7 +1262,7 @@ when isMainModule:
# Node setup # # Node setup #
############## ##############
debug "1/6 Setting up storage" debug "1/7 Setting up storage"
var var
pStorage: WakuPeerStorage pStorage: WakuPeerStorage
@ -1251,44 +1271,53 @@ when isMainModule:
let setupStorageRes = setupStorage(conf) let setupStorageRes = setupStorage(conf)
if setupStorageRes.isErr: if setupStorageRes.isErr:
error "1/6 Setting up storage failed. Continuing without storage." error "1/7 Setting up storage failed. Continuing without storage."
else: else:
(pStorage, mStorage) = setupStorageRes.get() (pStorage, mStorage) = setupStorageRes.get()
debug "2/6 Initializing node" debug "2/7 Retrieve dynamic bootstrap nodes"
var dynamicBootstrapNodes: seq[RemotePeerInfo]
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf)
if dynamicBootstrapNodesRes.isErr:
error "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes."
else:
dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
let initNodeRes = initNode(conf, pStorage) debug "3/7 Initializing node"
let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes)
if initNodeRes.isErr: if initNodeRes.isErr:
error "2/6 Initializing node failed. Quitting." error "3/7 Initializing node failed. Quitting."
quit(QuitFailure) quit(QuitFailure)
else: else:
node = initNodeRes.get() node = initNodeRes.get()
debug "3/6 Mounting protocols" debug "4/7 Mounting protocols"
let setupProtocolsRes = setupProtocols(node, conf, mStorage) let setupProtocolsRes = setupProtocols(node, conf, mStorage)
if setupProtocolsRes.isErr: if setupProtocolsRes.isErr:
error "3/6 Mounting protocols failed. Continuing in current state." error "4/7 Mounting protocols failed. Continuing in current state."
debug "4/6 Starting node and mounted protocols" debug "5/7 Starting node and mounted protocols"
let startNodeRes = startNode(node, conf) let startNodeRes = startNode(node, conf, dynamicBootstrapNodes)
if startNodeRes.isErr: if startNodeRes.isErr:
error "4/6 Starting node and mounted protocols failed. Continuing in current state." error "5/7 Starting node and mounted protocols failed. Continuing in current state."
debug "5/6 Starting monitoring and external interfaces" debug "6/7 Starting monitoring and external interfaces"
let startExternalRes = startExternal(node, conf) let startExternalRes = startExternal(node, conf)
if startExternalRes.isErr: if startExternalRes.isErr:
error "5/6 Starting monitoring and external interfaces failed. Continuing in current state." error "6/7 Starting monitoring and external interfaces failed. Continuing in current state."
debug "6/6 Setting up shutdown hooks" debug "7/7 Setting up shutdown hooks"
# 6/6 Setup graceful shutdown hooks # 7/7 Setup graceful shutdown hooks
## Setup shutdown hooks for this process. ## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown. ## Stop node gracefully on shutdown.

View File

@ -200,6 +200,7 @@ proc fwdPage(storeQueue: StoreQueueRef,
outSeq = @[] outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD) outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError) return (outSeq, outPagingInfo, outError)
# Advance walker once more # Advance walker once more
@ -271,6 +272,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
outSeq = @[] outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD) outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError) return (outSeq, outPagingInfo, outError)
# Step walker one more step back # Step walker one more step back

View File

@ -17,6 +17,7 @@ type
RemotePeerInfo* = ref object of RootObj RemotePeerInfo* = ref object of RootObj
peerId*: PeerID peerId*: PeerID
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
enr*: Option[enr.Record]
protocols*: seq[string] protocols*: seq[string]
func `$`*(remotePeerInfo: RemotePeerInfo): string = func `$`*(remotePeerInfo: RemotePeerInfo): string =
@ -26,11 +27,13 @@ proc init*(
p: typedesc[RemotePeerInfo], p: typedesc[RemotePeerInfo],
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress] = @[], addrs: seq[MultiAddress] = @[],
enr: Option[enr.Record] = none(enr.Record),
protocols: seq[string] = @[]): RemotePeerInfo = protocols: seq[string] = @[]): RemotePeerInfo =
let remotePeerInfo = RemotePeerInfo( let remotePeerInfo = RemotePeerInfo(
peerId: peerId, peerId: peerId,
addrs: addrs, addrs: addrs,
enr: enr,
protocols: protocols) protocols: protocols)
return remotePeerInfo return remotePeerInfo
@ -38,12 +41,14 @@ proc init*(
proc init*(p: typedesc[RemotePeerInfo], proc init*(p: typedesc[RemotePeerInfo],
peerId: string, peerId: string,
addrs: seq[MultiAddress] = @[], addrs: seq[MultiAddress] = @[],
enr: Option[enr.Record] = none(enr.Record),
protocols: seq[string] = @[]): RemotePeerInfo protocols: seq[string] = @[]): RemotePeerInfo
{.raises: [Defect, ResultError[cstring], LPError].} = {.raises: [Defect, ResultError[cstring], LPError].} =
let remotePeerInfo = RemotePeerInfo( let remotePeerInfo = RemotePeerInfo(
peerId: PeerID.init(peerId).tryGet(), peerId: PeerID.init(peerId).tryGet(),
addrs: addrs, addrs: addrs,
enr: enr,
protocols: protocols) protocols: protocols)
return remotePeerInfo return remotePeerInfo
@ -145,11 +150,12 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
if addrs.len == 0: if addrs.len == 0:
return err("enr: no addresses in record") return err("enr: no addresses in record")
return ok(RemotePeerInfo.init(peerId, addrs)) return ok(RemotePeerInfo.init(peerId, addrs, some(enr)))
## Converts the local peerInfo to dialable RemotePeerInfo ## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections ## Useful for testing or internal connections
proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
RemotePeerInfo.init(peerInfo.peerId, RemotePeerInfo.init(peerInfo.peerId,
peerInfo.addrs, peerInfo.addrs,
none(enr.Record), # we could generate an ENR from PeerInfo
peerInfo.protocols) peerInfo.protocols)