mirror of https://github.com/waku-org/nwaku.git
429 lines
13 KiB
Nim
429 lines
13 KiB
Nim
{.push raises: [].}
|
|
|
|
import
|
|
std/[sequtils, strutils, options, sets, net, json],
|
|
results,
|
|
chronos,
|
|
chronicles,
|
|
metrics,
|
|
libp2p/multiaddress,
|
|
eth/keys as eth_keys,
|
|
eth/p2p/discoveryv5/node,
|
|
eth/p2p/discoveryv5/protocol
|
|
import
|
|
../node/peer_manager/peer_manager,
|
|
../waku_core,
|
|
../waku_enr,
|
|
../factory/external_config
|
|
|
|
export protocol, waku_enr
|
|
|
|
declarePublicGauge waku_discv5_discovered, "number of nodes discovered"
|
|
declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"]
|
|
|
|
logScope:
|
|
topics = "waku discv5"
|
|
|
|
## Config
|
|
|
|
type WakuDiscoveryV5Config* = object
|
|
discv5Config*: Option[DiscoveryConfig]
|
|
address*: IpAddress
|
|
port*: Port
|
|
privateKey*: eth_keys.PrivateKey
|
|
bootstrapRecords*: seq[waku_enr.Record]
|
|
autoupdateRecord*: bool
|
|
|
|
## Protocol
|
|
|
|
type WakuDiscv5Predicate* =
|
|
proc(record: waku_enr.Record): bool {.closure, gcsafe, raises: [].}
|
|
|
|
type WakuDiscoveryV5* = ref object
|
|
conf: WakuDiscoveryV5Config
|
|
protocol*: protocol.Protocol
|
|
listening*: bool
|
|
predicate: Option[WakuDiscv5Predicate]
|
|
peerManager: Option[PeerManager]
|
|
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
|
|
|
|
proc shardingPredicate*(
|
|
record: Record, bootnodes: seq[Record] = @[]
|
|
): Option[WakuDiscv5Predicate] =
|
|
## Filter peers based on relay sharding information
|
|
let typedRecord = record.toTyped().valueOr:
|
|
debug "peer filtering failed", reason = error
|
|
return none(WakuDiscv5Predicate)
|
|
|
|
let nodeShard = typedRecord.relaySharding().valueOr:
|
|
debug "no relay sharding information, peer filtering disabled"
|
|
return none(WakuDiscv5Predicate)
|
|
|
|
debug "peer filtering updated"
|
|
|
|
let predicate = proc(record: waku_enr.Record): bool =
|
|
bootnodes.contains(record) or # Temp. Bootnode exception
|
|
(
|
|
record.getCapabilities().len > 0 and #RFC 31 requirement
|
|
nodeShard.shardIds.anyIt(record.containsShard(nodeShard.clusterId, it))
|
|
) #RFC 64 guideline
|
|
|
|
return some(predicate)
|
|
|
|
proc new*(
|
|
T: type WakuDiscoveryV5,
|
|
rng: ref HmacDrbgContext,
|
|
conf: WakuDiscoveryV5Config,
|
|
record: Option[waku_enr.Record],
|
|
peerManager: Option[PeerManager] = none(PeerManager),
|
|
queue: AsyncEventQueue[SubscriptionEvent] =
|
|
newAsyncEventQueue[SubscriptionEvent](30),
|
|
): T =
|
|
let protocol = newProtocol(
|
|
rng = rng,
|
|
config = conf.discv5Config.get(protocol.defaultDiscoveryConfig),
|
|
bindPort = conf.port,
|
|
bindIp = conf.address,
|
|
privKey = conf.privateKey,
|
|
bootstrapRecords = conf.bootstrapRecords,
|
|
enrAutoUpdate = conf.autoupdateRecord,
|
|
previousRecord = record,
|
|
enrIp = none(IpAddress),
|
|
enrTcpPort = none(Port),
|
|
enrUdpPort = none(Port),
|
|
)
|
|
|
|
let shardPredOp =
|
|
if record.isSome():
|
|
shardingPredicate(record.get(), conf.bootstrapRecords)
|
|
else:
|
|
none(WakuDiscv5Predicate)
|
|
|
|
WakuDiscoveryV5(
|
|
conf: conf,
|
|
protocol: protocol,
|
|
listening: false,
|
|
predicate: shardPredOp,
|
|
peerManager: peerManager,
|
|
topicSubscriptionQueue: queue,
|
|
)
|
|
|
|
proc updateENRShards(
|
|
wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool
|
|
): Result[void, string] =
|
|
## Add or remove shards from the Discv5 ENR
|
|
let newShardOp = topicsToRelayShards(newTopics).valueOr:
|
|
return err("ENR update failed: " & error)
|
|
|
|
let newShard = newShardOp.valueOr:
|
|
return ok()
|
|
|
|
let typedRecord = wd.protocol.localNode.record.toTyped().valueOr:
|
|
return err("ENR update failed: " & $error)
|
|
|
|
let currentShardsOp = typedRecord.relaySharding()
|
|
|
|
let resultShard =
|
|
if add and currentShardsOp.isSome():
|
|
let currentShard = currentShardsOp.get()
|
|
|
|
if currentShard.clusterId != newShard.clusterId:
|
|
return err("ENR update failed: clusterId id mismatch")
|
|
|
|
RelayShards.init(
|
|
currentShard.clusterId, currentShard.shardIds & newShard.shardIds
|
|
).valueOr:
|
|
return err("ENR update failed: " & error)
|
|
elif not add and currentShardsOp.isSome():
|
|
let currentShard = currentShardsOp.get()
|
|
|
|
if currentShard.clusterId != newShard.clusterId:
|
|
return err("ENR update failed: clusterId id mismatch")
|
|
|
|
let currentSet = toHashSet(currentShard.shardIds)
|
|
let newSet = toHashSet(newShard.shardIds)
|
|
|
|
let indices = toSeq(currentSet - newSet)
|
|
|
|
if indices.len == 0:
|
|
return err("ENR update failed: cannot remove all shards")
|
|
|
|
RelayShards.init(currentShard.clusterId, indices).valueOr:
|
|
return err("ENR update failed: " & error)
|
|
elif add and currentShardsOp.isNone():
|
|
newShard
|
|
else:
|
|
return ok()
|
|
|
|
let (field, value) =
|
|
if resultShard.shardIds.len >= ShardingIndicesListMaxLength:
|
|
(ShardingBitVectorEnrField, resultShard.toBitVector())
|
|
else:
|
|
let list = resultShard.toIndicesList().valueOr:
|
|
return err("ENR update failed: " & $error)
|
|
|
|
(ShardingIndicesListEnrField, list)
|
|
|
|
wd.protocol.updateRecord([(field, value)]).isOkOr:
|
|
return err("ENR update failed: " & $error)
|
|
|
|
return ok()
|
|
|
|
proc logDiscv5FoundPeers(discoveredRecords: seq[waku_enr.Record]) =
|
|
for record in discoveredRecords:
|
|
let recordUri = record.toURI()
|
|
let capabilities = record.getCapabilities()
|
|
|
|
let typedRecord = record.toTyped().valueOr:
|
|
warn "Could not parse to typed record", error = error, enr = recordUri
|
|
continue
|
|
|
|
let peerInfo = record.toRemotePeerInfo().valueOr:
|
|
warn "Could not generate remote peer info", error = error, enr = recordUri
|
|
continue
|
|
|
|
let addrs = peerInfo.constructMultiaddrStr()
|
|
|
|
let rs = typedRecord.relaySharding()
|
|
let shardsStr =
|
|
if rs.isSome():
|
|
$rs.get()
|
|
else:
|
|
"no shards found"
|
|
|
|
notice "Received discv5 node",
|
|
addrs = addrs, enr = recordUri, capabilities = capabilities, shards = shardsStr
|
|
|
|
proc findRandomPeers*(
|
|
wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)
|
|
): Future[seq[waku_enr.Record]] {.async.} =
|
|
## Find random peers to connect to using Discovery v5
|
|
let discoveredNodes = await wd.protocol.queryRandom()
|
|
|
|
var discoveredRecords = discoveredNodes.mapIt(it.record)
|
|
|
|
when defined(debugDiscv5):
|
|
logDiscv5FoundPeers(discoveredRecords)
|
|
|
|
# Filter out nodes that do not match the predicate
|
|
if overridePred.isSome():
|
|
discoveredRecords = discoveredRecords.filter(overridePred.get())
|
|
elif wd.predicate.isSome():
|
|
discoveredRecords = discoveredRecords.filter(wd.predicate.get())
|
|
|
|
waku_discv5_discovered.inc(discoveredRecords.len)
|
|
|
|
return discoveredRecords
|
|
|
|
proc searchLoop(wd: WakuDiscoveryV5) {.async.} =
|
|
## Continuously add newly discovered nodes
|
|
|
|
let peerManager = wd.peerManager.valueOr:
|
|
return
|
|
|
|
info "Starting discovery v5 search"
|
|
|
|
while wd.listening:
|
|
trace "running discv5 discovery loop"
|
|
let discoveredRecords = await wd.findRandomPeers()
|
|
|
|
var discoveredPeers: seq[RemotePeerInfo]
|
|
var wrongRecordsReasons: seq[tuple[record: string, errorDescription: string]]
|
|
## this is to store the reasons why certain records could not be converted to RemotePeerInfo
|
|
|
|
for record in discoveredRecords:
|
|
let peerInfo = record.toRemotePeerInfo().valueOr:
|
|
## in case of error, we keep track of it for debugging purposes
|
|
wrongRecordsReasons.add(($record, $error))
|
|
waku_discv5_errors.inc(labelValues = [$error])
|
|
continue
|
|
|
|
discoveredPeers.add(peerInfo)
|
|
|
|
trace "discv5 discovered peers",
|
|
num_discovered_peers = discoveredPeers.len,
|
|
peers = toSeq(discoveredPeers.mapIt(shortLog(it.peerId)))
|
|
|
|
trace "discv5 discarded wrong records",
|
|
wrong_records =
|
|
wrongRecordsReasons.mapIt("(" & it.record & "," & it.errorDescription & ")")
|
|
|
|
for peer in discoveredPeers:
|
|
# Peers added are filtered by the peer manager
|
|
peerManager.addPeer(peer, PeerOrigin.Discv5)
|
|
|
|
# Discovery `queryRandom` can have a synchronous fast path for example
|
|
# when no peers are in the routing table. Don't run it in continuous loop.
|
|
#
|
|
# Also, give some time to dial the discovered nodes and update stats, etc.
|
|
await sleepAsync(5.seconds)
|
|
|
|
proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
|
|
## Listen for pubsub topics subscriptions changes
|
|
|
|
let key = wd.topicSubscriptionQueue.register()
|
|
|
|
while wd.listening:
|
|
let events = await wd.topicSubscriptionQueue.waitEvents(key)
|
|
|
|
# Since we don't know the events we will receive we have to anticipate.
|
|
|
|
let subs = events.filterIt(it.kind == PubsubSub).mapIt(it.topic)
|
|
let unsubs = events.filterIt(it.kind == PubsubUnsub).mapIt(it.topic)
|
|
|
|
if subs.len == 0 and unsubs.len == 0:
|
|
continue
|
|
|
|
let unsubRes = wd.updateENRShards(unsubs, false)
|
|
let subRes = wd.updateENRShards(subs, true)
|
|
|
|
if subRes.isErr():
|
|
debug "ENR shard addition failed", reason = $subRes.error
|
|
|
|
if unsubRes.isErr():
|
|
debug "ENR shard removal failed", reason = $unsubRes.error
|
|
|
|
if subRes.isErr() and unsubRes.isErr():
|
|
continue
|
|
|
|
debug "ENR updated successfully"
|
|
|
|
wd.predicate =
|
|
shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords)
|
|
|
|
wd.topicSubscriptionQueue.unregister(key)
|
|
|
|
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises: []).} =
|
|
if wd.listening:
|
|
return err("already listening")
|
|
|
|
info "Starting discovery v5 service"
|
|
|
|
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
|
|
try:
|
|
wd.protocol.open()
|
|
except CatchableError:
|
|
return err("failed to open udp port: " & getCurrentExceptionMsg())
|
|
|
|
wd.listening = true
|
|
|
|
trace "start discv5 service"
|
|
wd.protocol.start()
|
|
|
|
asyncSpawn wd.searchLoop()
|
|
asyncSpawn wd.subscriptionsListener()
|
|
|
|
debug "Successfully started discovery v5 service"
|
|
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
|
|
|
|
ok()
|
|
|
|
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
|
|
if not wd.listening:
|
|
return
|
|
|
|
info "Stopping discovery v5 service"
|
|
|
|
wd.listening = false
|
|
trace "Stop listening on discv5 port"
|
|
await wd.protocol.closeWait()
|
|
|
|
debug "Successfully stopped discovery v5 service"
|
|
|
|
## Helper functions
|
|
|
|
proc parseBootstrapAddress(address: string): Result[enr.Record, cstring] =
|
|
logScope:
|
|
address = address
|
|
|
|
if address[0] == '/':
|
|
return err("MultiAddress bootstrap addresses are not supported")
|
|
|
|
let lowerCaseAddress = toLowerAscii(address)
|
|
if lowerCaseAddress.startsWith("enr:"):
|
|
var enrRec: enr.Record
|
|
if not enrRec.fromURI(address):
|
|
return err("Invalid ENR bootstrap record")
|
|
|
|
return ok(enrRec)
|
|
elif lowerCaseAddress.startsWith("enode:"):
|
|
return err("ENode bootstrap addresses are not supported")
|
|
else:
|
|
return err("Ignoring unrecognized bootstrap address type")
|
|
|
|
proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record]) =
|
|
# Ignore empty lines or lines starting with #
|
|
if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#':
|
|
return
|
|
|
|
let enrRes = parseBootstrapAddress(bootstrapAddr)
|
|
if enrRes.isErr():
|
|
debug "ignoring invalid bootstrap address", reason = enrRes.error
|
|
return
|
|
|
|
bootstrapEnrs.add(enrRes.value)
|
|
|
|
proc setupDiscoveryV5*(
|
|
myENR: enr.Record,
|
|
nodePeerManager: PeerManager,
|
|
nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent],
|
|
conf: WakuNodeConf,
|
|
dynamicBootstrapNodes: seq[RemotePeerInfo],
|
|
rng: ref HmacDrbgContext,
|
|
key: crypto.PrivateKey,
|
|
): WakuDiscoveryV5 =
|
|
let dynamicBootstrapEnrs =
|
|
dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
|
|
|
var discv5BootstrapEnrs: seq[enr.Record]
|
|
|
|
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
|
for enrUri in conf.discv5BootstrapNodes:
|
|
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
|
|
|
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
|
|
|
let discv5Config = DiscoveryConfig.init(
|
|
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop
|
|
)
|
|
|
|
let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)
|
|
|
|
let discv5Conf = WakuDiscoveryV5Config(
|
|
discv5Config: some(discv5Config),
|
|
address: conf.listenAddress,
|
|
port: discv5UdpPort,
|
|
privateKey: eth_keys.PrivateKey(key.skkey),
|
|
bootstrapRecords: discv5BootstrapEnrs,
|
|
autoupdateRecord: conf.discv5EnrAutoUpdate,
|
|
)
|
|
|
|
WakuDiscoveryV5.new(
|
|
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
|
|
)
|
|
|
|
proc updateBootstrapRecords*(
|
|
self: var WakuDiscoveryV5, newRecordsString: string
|
|
): Result[void, string] =
|
|
## newRecordsString - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
|
|
var newRecords = newSeq[waku_enr.Record]()
|
|
|
|
var jsonNode: JsonNode
|
|
try:
|
|
jsonNode = parseJson(newRecordsString)
|
|
except Exception:
|
|
return err("exception parsing json enr records: " & getCurrentExceptionMsg())
|
|
|
|
if jsonNode.kind != JArray:
|
|
return err("updateBootstrapRecords should receive a json array containing ENRs")
|
|
|
|
for enr in jsonNode:
|
|
let enrWithoutQuotes = ($enr).replace("\"", "")
|
|
var bootstrapNodeEnr: waku_enr.Record
|
|
if not bootstrapNodeEnr.fromURI(enrWithoutQuotes):
|
|
return err("wrong enr given: " & enrWithoutQuotes)
|
|
|
|
self.protocol.bootstrapRecords = newRecords
|
|
|
|
return ok()
|