chore: Discovery in libwaku (#2711)

* cwaku_example: add discoveryv5-discovery bool option
* libwaku: implement discovery capabilities
* node_lifecycle_request.nim: better control of possible errors when parsing config
This commit is contained in:
Ivan FB 2024-05-21 18:37:50 +02:00 committed by GitHub
parent 5ee4cba534
commit 7464684842
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 232 additions and 14 deletions

View File

@ -267,8 +267,8 @@ int main(int argc, char** argv) {
show_help_and_exit();
}
char jsonConfig[2048];
snprintf(jsonConfig, 2048, "{ \
char jsonConfig[5000];
snprintf(jsonConfig, 5000, "{ \
\"listenAddress\": \"%s\", \
\"tcpPort\": %d, \
\"nodekey\": \"%s\", \
@ -277,7 +277,14 @@ int main(int argc, char** argv) {
\"storeMessageDbUrl\": \"%s\", \
\"storeMessageRetentionPolicy\": \"%s\", \
\"storeMaxNumDbConnections\": %d , \
\"logLevel\": \"DEBUG\" \
\"logLevel\": \"DEBUG\", \
\"discv5Discovery\": true, \
\"discv5BootstrapNodes\": \
[\"enr:-QESuEB4Dchgjn7gfAvwB00CxTA-nGiyk-aALI-H4dYSZD3rUk7bZHmP8d2U6xDiQ2vZffpo45Jp7zKNdnwDUx6g4o6XAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOvD3S3jUNICsrOILlmhENiWAMmMVlAl6-Q8wRB7hidY4N0Y3CCdl-DdWRwgiMohXdha3UyDw\", \"enr:-QEkuEBIkb8q8_mrorHndoXH9t5N6ZfD-jehQCrYeoJDPHqT0l0wyaONa2-piRQsi3oVKAzDShDVeoQhy0uwN1xbZfPZAYJpZIJ2NIJpcIQiQlleim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQKnGt-GSgqPSf3IAPM7bFgTlpczpMZZLF3geeoNNsxzSoN0Y3CCdl-DdWRwgiMohXdha3UyDw\"], \
\"discv5UdpPort\": 9999, \
\"dnsDiscovery\": true, \
\"dnsDiscoveryUrl\": \"enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im\", \
\"dnsDiscoveryNameServers\": [\"8.8.8.8\", \"1.0.0.1\"] \
}", cfgNode.host,
cfgNode.port,
cfgNode.key,
@ -313,6 +320,11 @@ int main(int argc, char** argv) {
event_handler,
userData) );
WAKU_CALL( waku_discv5_update_bootnodes(ctx,
"[\"enr:-QEkuEBIkb8q8_mrorHndoXH9t5N6ZfD-jehQCrYeoJDPHqT0l0wyaONa2-piRQsi3oVKAzDShDVeoQhy0uwN1xbZfPZAYJpZIJ2NIJpcIQiQlleim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQKnGt-GSgqPSf3IAPM7bFgTlpczpMZZLF3geeoNNsxzSoN0Y3CCdl-DdWRwgiMohXdha3UyDw\",\"enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ66F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQUlqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw\"]",
event_handler,
userData) );
show_main_menu();
while(1) {
handle_user_input();

View File

@ -92,6 +92,25 @@ int waku_listen_addresses(void* ctx,
WakuCallBack callback,
void* userData);
// Returns a list of multiaddress given a url to a DNS discoverable ENR tree
// Parameters
// char* entTreeUrl: URL containing a discoverable ENR tree
// char* nameDnsServer: The nameserver to resolve the ENR tree url.
// int timeoutMs: Timeout value in milliseconds to execute the call.
int waku_dns_discovery(void* ctx,
const char* entTreeUrl,
const char* nameDnsServer,
int timeoutMs,
WakuCallBack callback,
void* userData);
// Updates the bootnode list used for discovering new peers via DiscoveryV5
// bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
int waku_discv5_update_bootnodes(void* ctx,
char* bootnodes,
WakuCallBack callback,
void* userData);
#ifdef __cplusplus
}
#endif

View File

@ -17,6 +17,7 @@ import
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/requests/debug_node_request,
./waku_thread/inter_thread_communication/requests/discovery_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc,
./callback
@ -397,5 +398,52 @@ proc waku_listen_addresses(
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc waku_dns_discovery(
ctx: ptr Context,
entTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
ctx[].userData = userData
let bootstrapPeers = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createRetrieveBootstrapNodesRequest(
DiscoveryMsgType.GET_BOOTSTRAP_NODES, entTreeUrl, nameDnsServer, timeoutMs
),
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let msg = $bootstrapPeers
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc waku_discv5_update_bootnodes(
ctx: ptr Context, bootnodes: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
ctx[].userData = userData
let resp = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createUpdateBootstrapNodesRequest(
DiscoveryMsgType.UPDATE_DISCV5_BOOTSTRAP_NODES, bootnodes
),
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let msg = $resp
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
### End of exported procs
################################################################################

View File

@ -0,0 +1,99 @@
import std/[json, sequtils]
import chronos, stew/results, libp2p/multiaddress
import
../../../../waku/factory/waku,
../../../../waku/discovery/waku_dnsdisc,
../../../../waku/discovery/waku_discv5,
../../../../waku/waku_core/peers,
../../../alloc
type DiscoveryMsgType* = enum
GET_BOOTSTRAP_NODES
UPDATE_DISCV5_BOOTSTRAP_NODES
type DiscoveryRequest* = object
operation: DiscoveryMsgType
## used in GET_BOOTSTRAP_NODES
enrTreeUrl: cstring
nameDnsServer: cstring
timeoutMs: cint
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
nodes: cstring
proc createShared(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
nodes: cstring,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].enrTreeUrl = enrTreeUrl.alloc()
ret[].nameDnsServer = nameDnsServer.alloc()
ret[].timeoutMs = timeoutMs
ret[].nodes = nodes.alloc()
return ret
proc createRetrieveBootstrapNodesRequest*(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
): ptr type T =
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "")
proc createUpdateBootstrapNodesRequest*(
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
): ptr type T =
return T.createShared(op, "", "", 0, nodes)
proc destroyShared(self: ptr DiscoveryRequest) =
deallocShared(self[].enrTreeUrl)
deallocShared(self[].nameDnsServer)
deallocShared(self)
proc retrieveBootstrapNodes(
enrTreeUrl: string, ipDnsServer: string
): Result[seq[string], string] =
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
let discoveredPeers: seq[RemotePeerInfo] = retrieveDynamicBootstrapNodes(
true, enrTreeUrl, dnsNameServers
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/" & $discPeer)
return ok(multiAddresses)
proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, string] =
waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr:
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()
proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of GET_BOOTSTRAP_NODES:
let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr:
return err($error)
return ok($(%*nodes))
of UPDATE_DISCV5_BOOTSTRAP_NODES:
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
return err($error)
return ok("discovery request processed correctly")
return err("discovery request not handled")

View File

@ -36,17 +36,28 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
var errorResp: string
var jsonNode: JsonNode
try:
let jsonNode = parseJson($configJson)
for confField, confValue in fieldPairs(conf):
if jsonNode.contains(confField):
# Make sure string doesn't contain the leading or trailing " character
let formattedString = ($jsonNode[confField]).strip(chars = {'\"'})
# Override conf field with the value set in the json-string
confValue = parseCmdArg(typeof(confValue), formattedString)
jsonNode = parseJson($configJson)
except Exception:
return err("exception parsing configuration: " & getCurrentExceptionMsg())
return err(
"exception in createWaku when calling parseJson: " & getCurrentExceptionMsg() &
" configJson string: " & $configJson
)
for confField, confValue in fieldPairs(conf):
if jsonNode.contains(confField):
# Make sure string doesn't contain the leading or trailing " character
let formattedString = ($jsonNode[confField]).strip(chars = {'\"'})
# Override conf field with the value set in the json-string
try:
confValue = parseCmdArg(typeof(confValue), formattedString)
except Exception:
return err(
"exception in createWaku when parsing configuration. exc: " &
getCurrentExceptionMsg() & ". string that could not be parsed: " &
formattedString & ". expected type: " & $typeof(confValue)
)
let wakuRes = Waku.init(conf).valueOr:
error "waku initialization failed", error = error

View File

@ -10,7 +10,8 @@ import
./requests/peer_manager_request,
./requests/protocols/relay_request,
./requests/protocols/store_request,
./requests/debug_node_request
./requests/debug_node_request,
./requests/discovery_request
type RequestType* {.pure.} = enum
LIFECYCLE
@ -18,6 +19,7 @@ type RequestType* {.pure.} = enum
RELAY
STORE
DEBUG
DISCOVERY
type InterThreadRequest* = object
reqType: RequestType
@ -52,6 +54,8 @@ proc process*(
cast[ptr StoreRequest](request[].reqContent).process(waku)
of DEBUG:
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])
of DISCOVERY:
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
return await retFut

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[sequtils, strutils, options, sets, net],
std/[sequtils, strutils, options, sets, net, json],
stew/results,
chronos,
chronicles,
@ -373,3 +373,28 @@ proc setupDiscoveryV5*(
WakuDiscoveryV5.new(
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
)
proc updateBootstrapRecords*(
self: var WakuDiscoveryV5, newRecordsString: string
): Result[void, string] =
## newRecordsString - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
var newRecords = newSeq[waku_enr.Record]()
var jsonNode: JsonNode
try:
jsonNode = parseJson(newRecordsString)
except Exception:
return err("exception parsing json enr records: " & getCurrentExceptionMsg())
if jsonNode.kind != JArray:
return err("updateBootstrapRecords should receive a json array containing ENRs")
for enr in jsonNode:
let enrWithoutQuotes = ($enr).replace("\"", "")
var bootstrapNodeEnr: waku_enr.Record
if not bootstrapNodeEnr.fromURI(enrWithoutQuotes):
return err("wrong enr given: " & enrWithoutQuotes)
self.protocol.bootstrapRecords = newRecords
return ok()