Merge branch 'master' into add-new-message-hash-column

This commit is contained in:
Abhimanyu 2023-10-30 12:34:09 +01:00 committed by GitHub
commit a20bf19534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 582 additions and 191 deletions

View File

@ -59,7 +59,7 @@ jobs:
echo "commit_hash=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
docker login -u ${QUAY_USER} -p ${QUAY_PASSWORD} quay.io
docker build -t ${IMAGE} -f docker/binaries/Dockerfile.bn.amd64 --label quay.expires-after=7d .
docker build -t ${IMAGE} -f docker/binaries/Dockerfile.bn.amd64 --label quay.expires-after=30d .
docker push ${IMAGE}
env:
QUAY_PASSWORD: ${{ secrets.QUAY_PASSWORD }}

View File

@ -1,3 +1,26 @@
## v0.21.1 (2023-10-26)
This patch release addresses the following issues:
- WSS connections being suddenly terminated under rare conditions
- Ability for the user to control announced multiaddresses
### Changes
- adding ext-multiaddr-only CLI flag ([#2141](https://github.com/waku-org/nwaku/issues/2141)) ([e2dfc2ed](https://github.com/waku-org/nwaku/commit/e2dfc2ed))
- bumping nim-libp2p to include WSS fix ([#2150](https://github.com/waku-org/nwaku/issues/2150)) ([18b5149a](https://github.com/waku-org/nwaku/commit/18b5149a))
This is a patch release that is fully backwards-compatible with release `v0.21.0`.
It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
| Protocol | Spec status | Protocol id |
| ---: | :---: | :--- |
| [`11/WAKU2-RELAY`](https://rfc.vac.dev/spec/11/) | `stable` | `/vac/waku/relay/2.0.0` |
| [`12/WAKU2-FILTER`](https://rfc.vac.dev/spec/12/) | `draft` | `/vac/waku/filter/2.0.0-beta1` <br />`/vac/waku/filter-subscribe/2.0.0-beta1` <br />`/vac/waku/filter-push/2.0.0-beta1` |
| [`13/WAKU2-STORE`](https://rfc.vac.dev/spec/13/) | `draft` | `/vac/waku/store/2.0.0-beta4` |
| [`19/WAKU2-LIGHTPUSH`](https://rfc.vac.dev/spec/19/) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
The Waku v1 implementation has been removed from this repository and can be found in a separate [Waku Legacy](https://github.com/waku-org/waku-legacy) repository.
## v0.21.0 (2023-10-18)
> Note: This is the last release supporting the `--topic` option. It is being deprecated in favor of a more specific options `--pubsub-topic` & `--content-topic`

View File

@ -8,6 +8,8 @@ import
stew/results,
chronicles,
chronos,
libp2p/wire,
libp2p/multicodec,
libp2p/crypto/crypto,
libp2p/nameresolving/dnsresolver,
libp2p/protocols/pubsub/gossipsub,
@ -29,7 +31,7 @@ import
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
@ -117,39 +119,8 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
quit(QuitFailure)
else: netConfigRes.get()
var enrBuilder = EnrBuilder.init(key)
let recordRes = enrConfiguration(conf, netConfig, key)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let topics =
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
let shardsRes = conf.contentTopics.mapIt(getShard(it))
for res in shardsRes:
if res.isErr():
error "failed to shard content topic", error=res.error
quit(QuitFailure)
let shards = shardsRes.mapIt(it.get())
conf.pubsubTopics & shards
else:
conf.topics
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error=addShardedTopics.error
quit(QuitFailure)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create record", error=recordRes.error
@ -329,6 +300,72 @@ proc setupWakuApp*(app: var App): AppResult[void] =
ok()
proc getPorts(listenAddrs: seq[MultiAddress]):
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
var tcpPort, websocketPort = none(Port)
for a in listenAddrs:
if a.isWsAddress():
if websocketPort.isNone():
let wsAddress = initTAddress(a).valueOr:
return err("getPorts wsAddr error:" & $error)
websocketPort = some(wsAddress.port)
elif tcpPort.isNone():
let tcpAddress = initTAddress(a).valueOr:
return err("getPorts tcpAddr error:" & $error)
tcpPort = some(tcpAddress.port)
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
proc updateNetConfig(app: var App): AppResult[void] =
var conf = app.conf
let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr:
return err("Could not retrieve ports " & error)
if tcpPort.isSome():
conf.tcpPort = tcpPort.get()
if websocketPort.isSome():
conf.websocketPort = websocketPort.get()
# Rebuild NetConfig with bound port values
let netConf = networkConfiguration(conf, clientId).valueOr:
return err("Could not update NetConfig: " & error)
app.netConf = netConf
return ok()
proc updateEnr(app: var App): AppResult[void] =
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
return err(error)
app.record = record
app.node.enr = record
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())
return ok()
proc updateApp(app: var App): AppResult[void] =
if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0):
updateNetConfig(app).isOkOr:
return err("error calling updateNetConfig: " & $error)
updateEnr(app).isOkOr:
return err("error calling updateEnr: " & $error)
app.node.announcedAddresses = app.netConf.announcedAddresses
printNodeNetworkInfo(app.node)
return ok()
## Mount protocols
@ -548,7 +585,18 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
return ok()
proc startApp*(app: App): Future[AppResult[void]] {.async.} =
proc startApp*(app: var App): AppResult[void] =
try:
(waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr:
return err(error)
except CatchableError:
return err("exception starting node: " & getCurrentExceptionMsg())
# Update app data that is set dynamically on node start
app.updateApp().isOkOr:
return err("Error in updateApp: " & $error)
if app.wakuDiscv5.isSome():
let wakuDiscv5 = app.wakuDiscv5.get()
@ -559,17 +607,40 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} =
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
return await startNode(
app.node,
app.conf,
app.dynamicBootstrapNodes
)
return ok()
## Monitoring and external interfaces
proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] =
let server = ? newRestHttpServer(address, port)
# Used to register api endpoints that are not currently installed as keys,
# values are holding error messages to be returned to the client
var notInstalledTab: Table[string, string] = initTable[string, string]()
proc requestErrorHandler(error: RestRequestError,
request: HttpRequestRef):
Future[HttpResponseRef] {.async.} =
case error
of RestRequestError.Invalid:
return await request.respond(Http400, "Invalid request", HttpTable.init())
of RestRequestError.NotFound:
let rootPath = request.rawPath.split("/")[1]
if notInstalledTab.hasKey(rootPath):
return await request.respond(Http404, notInstalledTab[rootPath], HttpTable.init())
else:
return await request.respond(Http400, "Bad request initiated. Invalid path or method used.", HttpTable.init())
of RestRequestError.InvalidContentBody:
return await request.respond(Http400, "Invalid content body", HttpTable.init())
of RestRequestError.InvalidContentType:
return await request.respond(Http400, "Invalid content type", HttpTable.init())
of RestRequestError.Unexpected:
return defaultResponse()
return defaultResponse()
let server = ? newRestHttpServer(address, port, requestErrorHandler = requestErrorHandler)
## Admin REST API
installAdminApiHandlers(server.router, app.node)
@ -596,6 +667,8 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
installRelayApiHandlers(server.router, app.node, cache)
else:
notInstalledTab["relay"] = "/relay endpoints are not available. Please check your configuration: --relay"
## Filter REST API
if conf.filternode != "" and
@ -606,15 +679,40 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)
let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)
let filterDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
else: none(DiscoveryHandler)
rest_filter_api.installFilterRestApiHandlers(
server.router,
app.node,
filterCache,
filterDiscoHandler,
)
else:
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"
## Store REST API
installStoreApiHandlers(server.router, app.node)
let storeDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
else: none(DiscoveryHandler)
installStoreApiHandlers(server.router, app.node, storeDiscoHandler)
## Light push API
if conf.lightpushnode != "" and
app.node.wakuLightpushClient != nil:
rest_lightpush_api.installLightPushRequestHandler(server.router, app.node)
let lightDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
else: none(DiscoveryHandler)
rest_lightpush_api.installLightPushRequestHandler(server.router, app.node, lightDiscoHandler)
else:
notInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
server.start()
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"

View File

@ -92,6 +92,11 @@ type
desc: "External multiaddresses to advertise to the network. Argument may be repeated."
name: "ext-multiaddr" }: seq[string]
extMultiAddrsOnly* {.
desc: "Only announce external multiaddresses",
defaultValue: false,
name: "ext-multiaddr-only" }: bool
maxConnections* {.
desc: "Maximum allowed number of libp2p connections."
defaultValue: 50

View File

@ -4,15 +4,61 @@ import
libp2p/crypto/crypto,
libp2p/multiaddress,
libp2p/nameresolving/dnsresolver,
std/options,
std/[options, sequtils],
stew/results,
stew/shims/net
import
../../waku/common/utils/nat,
../../waku/node/config,
../../waku/waku_enr/capabilities,
../../waku/waku_enr,
../../waku/waku_core,
./external_config
proc enrConfiguration*(conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey):
Result[enr.Record, string] =
var enrBuilder = EnrBuilder.init(key)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let topics =
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
let shardsRes = conf.contentTopics.mapIt(getShard(it))
for res in shardsRes:
if res.isErr():
error "failed to shard content topic", error=res.error
return err($res.error)
let shards = shardsRes.mapIt(it.get())
conf.pubsubTopics & shards
else:
conf.topics
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error=addShardedTopics.error
return err($addShardedTopics.error)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create record", error=recordRes.error
return err($recordRes.error)
else: recordRes.get()
return ok(record)
proc validateExtMultiAddrs*(vals: seq[string]):
Result[seq[MultiAddress], string] =
var multiaddrs: seq[MultiAddress]
@ -111,6 +157,7 @@ proc networkConfiguration*(conf: WakuNodeConf,
extIp = extIp,
extPort = extPort,
extMultiAddrs = extMultiAddrs,
extMultiAddrsOnly = conf.extMultiAddrsOnly,
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
wsEnabled = conf.websocketSupport,
wssEnabled = conf.websocketSecureSupport,

View File

@ -87,7 +87,7 @@ when isMainModule:
debug "5/7 Starting node and mounted protocols"
let res6 = waitFor wakunode2.startApp()
let res6 = wakunode2.startApp()
if res6.isErr():
error "5/7 Starting node and protocols failed", error=res6.error
quit(QuitFailure)

View File

@ -71,7 +71,7 @@ Ensure all items in this list are ticked:
### After the release
1. Announce the release on Twitter, Discord and other channels.
2. Deploy the release image to [Dockerhub](https://hub.docker.com/layers/wakuorg/nwaku/a5f8b9/images/sha256-88691a8f82bd6a4242fa99053a65b7fc4762b23a2b4e879d0f8b578c798a0e09?context=explore) by triggering [the manual Jenkins deployment job](https://ci.infra.status.im/job/nim-waku/job/manual/build).
2. Deploy the release image to [Dockerhub](https://hub.docker.com/r/wakuorg/nwaku) by triggering [the manual Jenkins deployment job](https://ci.infra.status.im/job/nim-waku/job/docker-manual/).
> Ensure the following build parameters are set:
> - `MAKE_TARGET`: `wakunode2`
> - `IMAGE_TAG`: the release tag (e.g. `v0.16.0`)
@ -87,3 +87,24 @@ Ensure all items in this list are ticked:
- Deploy release to the `wakuv2.prod` fleet from [Jenkins](https://ci.infra.status.im/job/nim-waku/job/deploy-wakuv2-prod/).
- Ensure that nodes successfully start up and monitor health using [Grafana](https://grafana.infra.status.im/d/qrp_ZCTGz/nim-waku-v2?orgId=1) and [Kibana](https://kibana.infra.status.im/goto/a7728e70-eb26-11ec-81d1-210eb3022c76).
- If necessary, revert by deploying the previous release. Download logs and open a bug report issue.
### Performing a patch release
1. Cherry-pick the relevant commits from master to the release branch
```
git cherry-pick <commit-hash>
```
2. Create a release-candidate tag with the same name as release and `-rc.N` suffix
3. Update `CHANGELOG.md`. From the release branch, use the helper Make target after having cherry-picked the commits.
```
make release-notes
```
Create a new branch and raise a PR with the changelog updates to master.
4. Once the release-candidate has been validated and changelog PR got merged, cherry-pick the changelog update from master to the release branch. Create a final release tag and push it.
5. Create a [Github release](https://github.com/waku-org/nwaku/releases) from the release tag and follow the same post-release process as usual.

View File

@ -338,19 +338,63 @@ suite "Waku NetConfig":
check:
netConfig.enrMultiaddrs.contains(dns4TcpEndPoint(dns4DomainName, extPort))
asyncTest "wsHostAddress is not announced if a WS address is provided in extMultiAddrs":
asyncTest "wsHostAddress is not announced if a WS/WSS address is provided in extMultiAddrs":
var
conf = defaultTestWakuNodeConf()
extAddIp = ValidIpAddress.init("1.2.3.4")
extAddPort = Port(1234)
wsEnabled = true
wssEnabled = false
extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))]
var netConfigRes = NetConfig.init(
bindIp = conf.listenAddress,
bindPort = conf.tcpPort,
extMultiAddrs = extMultiAddrs,
wsEnabled = wsEnabled
)
assert netConfigRes.isOk(), $netConfigRes.error
var netConfig = netConfigRes.get()
check:
netConfig.announcedAddresses.len == 2 # Bind address + extAddress
netConfig.announcedAddresses[1] == extMultiAddrs[0]
# Now same test for WSS external address
wssEnabled = true
extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))]
netConfigRes = NetConfig.init(
bindIp = conf.listenAddress,
bindPort = conf.tcpPort,
extMultiAddrs = extMultiAddrs,
wssEnabled = wssEnabled
)
assert netConfigRes.isOk(), $netConfigRes.error
netConfig = netConfigRes.get()
check:
netConfig.announcedAddresses.len == 2 # Bind address + extAddress
netConfig.announcedAddresses[1] == extMultiAddrs[0]
asyncTest "Only extMultiAddrs are published when enabling extMultiAddrsOnly flag":
let
conf = defaultTestWakuNodeConf()
extAddIp = ValidIpAddress.init("1.2.3.4")
extAddPort = Port(1234)
wssEnabled = false
extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))]
extMultiAddrs = @[ip4TcpEndPoint(extAddIp, extAddPort)]
let netConfigRes = NetConfig.init(
bindIp = conf.listenAddress,
bindPort = conf.tcpPort,
extMultiAddrs = extMultiAddrs
extMultiAddrs = extMultiAddrs,
extMultiAddrsOnly = true
)
assert netConfigRes.isOk(), $netConfigRes.error
@ -358,6 +402,6 @@ suite "Waku NetConfig":
let netConfig = netConfigRes.get()
check:
netConfig.announcedAddresses.len == 2 # Bind address + extAddress
netConfig.announcedAddresses[1] == extMultiAddrs[0]
netConfig.announcedAddresses.len == 1 # ExtAddress
netConfig.announcedAddresses[0] == extMultiAddrs[0]

View File

@ -12,7 +12,9 @@ import
import
../testlib/common,
../testlib/wakucore,
../testlib/wakunode,
../testlib/wakunode
include
../../apps/wakunode2/app
suite "Wakunode2 - App":
@ -27,7 +29,7 @@ suite "Wakunode2 - App":
## Then
check:
version == app.git_version
version == git_version
suite "Wakunode2 - App initialization":
test "peer persistence setup should be successfully mounted":
@ -53,7 +55,7 @@ suite "Wakunode2 - App initialization":
require wakunode2.setupDyamicBootstrapNodes().isOk()
require wakunode2.setupWakuApp().isOk()
require isOk(waitFor wakunode2.setupAndMountProtocols())
require isOk(waitFor wakunode2.startApp())
require isOk(wakunode2.startApp())
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
## Then
@ -67,3 +69,43 @@ suite "Wakunode2 - App initialization":
## Cleanup
waitFor wakunode2.stop()
test "app properly handles dynamic port configuration":
## Given
var conf = defaultTestWakuNodeConf()
conf.tcpPort = Port(0)
## When
var wakunode2 = App.init(rng(), conf)
require wakunode2.setupPeerPersistence().isOk()
require wakunode2.setupDyamicBootstrapNodes().isOk()
require wakunode2.setupWakuApp().isOk()
require isOk(waitFor wakunode2.setupAndMountProtocols())
require isOk(wakunode2.startApp())
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
## Then
let
node = wakunode2.node
typedNodeEnr = node.enr.toTypedRecord()
typedAppEnr = wakunode2.record.toTypedRecord()
assert typedNodeEnr.isOk(), $typedNodeEnr.error
assert typedAppEnr.isOk(), $typedAppEnr.error
check:
# App started properly
not node.isNil()
node.wakuArchive.isNil()
node.wakuStore.isNil()
not node.wakuStoreClient.isNil()
not node.rendezvous.isNil()
# DS structures are updated with dynamic ports
wakunode2.netConf.bindPort != Port(0)
wakunode2.netConf.enrPort.get() != Port(0)
typedNodeEnr.get().tcp.get() != 0
typedAppEnr.get().tcp.get() != 0
## Cleanup
waitFor wakunode2.stop()

View File

@ -513,7 +513,7 @@ procSuite "Waku v2 Rest API - Store":
response.status == 412
$response.contentType == $MIMETYPE_TEXT
response.data.messages.len == 0
response.data.error_message.get == "Missing known store-peer node"
response.data.error_message.get == NoPeerNoDiscError.errobj.message
# Now add the storenode from "config"
node.peerManager.addServicePeer(remotePeerInfo,

2
vendor/nim-presto vendored

@ -1 +1 @@
Subproject commit 2ae448ff5b0808c8f562c6f0a70bbd7a05407a37
Subproject commit 81250a419bc097a9e93f2ab69de60543eee07138

View File

@ -59,7 +59,7 @@ proc formatListenAddress(inputMultiAdd: MultiAddress): MultiAddress =
# If MultiAddress contains "0.0.0.0", replace it for "127.0.0.1"
return MultiAddress.init(inputStr.replace("0.0.0.0", "127.0.0.1")).get()
proc isWsAddress(ma: MultiAddress): bool =
proc isWsAddress*(ma: MultiAddress): bool =
let
isWs = ma.contains(multiCodec("ws")).get()
isWss = ma.contains(multiCodec("wss")).get()
@ -75,6 +75,7 @@ proc init*(T: type NetConfig,
extIp = none(ValidIpAddress),
extPort = none(Port),
extMultiAddrs = newSeq[MultiAddress](),
extMultiAddrsOnly: bool = false,
wsBindPort: Port = Port(8000),
wsEnabled: bool = false,
wssEnabled: bool = false,
@ -125,21 +126,22 @@ proc init*(T: type NetConfig,
var announcedAddresses = newSeq[MultiAddress]()
if hostExtAddress.isSome():
announcedAddresses.add(hostExtAddress.get())
else:
announcedAddresses.add(formatListenAddress(hostAddress)) # We always have at least a bind address for the host
if not extMultiAddrsOnly:
if hostExtAddress.isSome():
announcedAddresses.add(hostExtAddress.get())
else:
announcedAddresses.add(formatListenAddress(hostAddress)) # We always have at least a bind address for the host
if wsExtAddress.isSome():
announcedAddresses.add(wsExtAddress.get())
elif wsHostAddress.isSome() and not containsWsAddress(extMultiAddrs):
# Only publish wsHostAddress if a WS address is not set in extMultiAddrs
announcedAddresses.add(wsHostAddress.get())
# External multiaddrs that the operator may have configured
if extMultiAddrs.len > 0:
announcedAddresses.add(extMultiAddrs)
if wsExtAddress.isSome():
announcedAddresses.add(wsExtAddress.get())
elif wsHostAddress.isSome() and not containsWsAddress(extMultiAddrs):
# Only publish wsHostAddress if a WS address is not set in extMultiAddrs
announcedAddresses.add(wsHostAddress.get())
let
# enrMultiaddrs are just addresses which cannot be represented in ENR, as described in
# https://rfc.vac.dev/spec/31/#many-connection-types

View File

@ -1073,6 +1073,26 @@ proc mountRendezvous*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.rendezvous)
proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"):
return true
return false
proc printNodeNetworkInfo*(node: WakuNode): void =
let peerInfo = node.switch.peerInfo
var listenStr = ""
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
@ -1081,16 +1101,15 @@ proc start*(node: WakuNode) {.async.} =
waku_version.set(1, labelValues=[git_version])
info "Starting Waku node", version=git_version
let peerInfo = node.switch.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
var listenStr = ""
var zeroPortPresent = false
for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr
if isBindIpWithZeroPort(address):
zeroPortPresent = true
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()
if not zeroPortPresent:
printNodeNetworkInfo(node)
else:
info "Listening port is dynamically allocated, address and ENR generation postponed"
# Perform relay-specific startup tasks TODO: this should be rethought
if not node.wakuRelay.isNil():

View File

@ -1,23 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles
import
../waku_relay,
../waku_core,
./message_cache
##### Message handler
proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)

View File

@ -0,0 +1,50 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
chronicles,
std/[options, sequtils],
stew/results
import
../waku_discv5,
../waku_relay,
../waku_core,
./message_cache
### Discovery
type DiscoveryHandler* = proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.}
proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): DiscoveryHandler =
proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.} =
#Discv5 is already filtering peers by shards no need to pass a predicate.
let findPeers = discv5.findRandomPeers()
if not await findPeers.withTimeout(60.seconds):
return err("discovery process timed out!")
var peers = findPeers.read()
peers.keepItIf(it.supportsCapability(cap))
if peers.len == 0:
return ok(none(RemotePeerInfo))
let remotePeerInfo = peers[0].toRemotePeerInfo().valueOr:
return err($error)
return ok(some(remotePeerInfo))
### Message Cache
proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)

View File

@ -17,7 +17,7 @@ import
../../../waku_rln_relay/rln/wrappers,
../../../waku_node,
../../message_cache,
../../cache_handlers,
../../handlers,
../message
from std/times import getTime

View File

@ -21,6 +21,7 @@ import
../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type,
../../message_cache,
../../handlers,
../serdes,
../responses,
./types
@ -145,11 +146,18 @@ proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type
return resp.get()
proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache):
Future[RestApiResponse]
{.async.} =
const NoPeerNoDiscoError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & no discovery method")
const NoPeerNoneFoundError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & none discovered")
proc filterPostPutSubscriptionRequestHandler(
node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
): Future[RestApiResponse] {.async.} =
## handles any filter subscription requests, adds or modifies.
let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
@ -159,14 +167,17 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
let req: FilterSubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
if peerOpt.isNone():
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
peerOpt.get())
peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)
let subFut = node.filterSubscribe(req.pubsubTopic, req.contentFilters, peer)
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
@ -178,29 +189,36 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
return makeRestResponse(req.requestId, subFut.read())
proc installFilterPostSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPostSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)
proc installFilterPutSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPutSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic
debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)
proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
@ -213,13 +231,18 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
let req: FilterUnsubscribeRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)
let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peer)
let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
@ -233,9 +256,12 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteAllSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
@ -248,13 +274,18 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
let req: FilterUnsubscribeAllRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)
let unsubFut = node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
@ -268,18 +299,26 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}"
proc installFilterPingSubscriberHandler(router: var RestRouter,
node: WakuNode) =
proc installFilterPingSubscriberHandler(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse:
## Checks if a node has valid subscription or not.
debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId
let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("No suitable remote filter peers"))
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoDiscoError)
let pingFutRes = node.wakuFilterClient.ping(peerOpt.get())
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
peerOp.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoneFoundError)
let pingFutRes = node.wakuFilterClient.ping(peer)
if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to ping filter service peer due to timeout!"
@ -325,12 +364,15 @@ proc installFilterGetMessagesHandler(router: var RestRouter,
return resp.get()
proc installFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPingSubscriberHandler(router, node)
installFilterPostSubscriptionsHandler(router, node, cache)
installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
proc installFilterRestApiHandlers*(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
installFilterPingSubscriberHandler(router, node, discHandler)
installFilterPostSubscriptionsHandler(router, node, cache, discHandler)
installFilterPutSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteAllSubscriptionsHandler(router, node, cache, discHandler)
installFilterGetMessagesHandler(router, node, cache)

View File

@ -18,6 +18,7 @@ import
../../waku/node/peer_manager,
../../../waku_node,
../../waku/waku_lightpush,
../../handlers,
../serdes,
../responses,
./types
@ -29,6 +30,12 @@ logScope:
const futTimeoutForPushRequestProcessing* = 5.seconds
const NoPeerNoDiscoError = RestApiResponse.serviceUnavailable(
"No suitable service peer & no discovery method")
const NoPeerNoneFoundError = RestApiResponse.serviceUnavailable(
"No suitable service peer & none discovered")
#### Request handlers
const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"
@ -50,8 +57,11 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR
return ok(requestResult.get())
proc installLightPushRequestHandler*(router: var RestRouter,
node: WakuNode) =
proc installLightPushRequestHandler*(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPost, ROUTE_LIGHTPUSH) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Send a request to push a waku message
@ -63,24 +73,29 @@ proc installLightPushRequestHandler*(router: var RestRouter,
return decodedBody.error()
let req: PushRequest = decodedBody.value()
let msg = req.message.toWakuMessage()
if msg.isErr():
let msg = req.message.toWakuMessage().valueOr:
return RestApiResponse.badRequest("Invalid message: {msg.error}")
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
return RestApiResponse.serviceUnavailable("No suitable remote lightpush peers")
let peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscoError
let subFut = node.lightpushPublish(req.pubsubTopic,
msg.value(),
peerOpt.get())
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
peerOp.valueOr:
return NoPeerNoneFoundError
let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer)
if not await subFut.withTimeout(futTimeoutForPushRequestProcessing):
error "Failed to request a message push due to timeout!"
return RestApiResponse.serviceUnavailable("Push request timed out")
if subFut.value().isErr():
return RestApiResponse.serviceUnavailable(fmt("Failed to request a message push: {subFut.value().error}"))
return RestApiResponse.serviceUnavailable(
fmt("Failed to request a message push: {subFut.value().error}")
)
return RestApiResponse.ok()

View File

@ -17,7 +17,7 @@ import
../../../waku_rln_relay,
../../../node/waku_node,
../../message_cache,
../../cache_handlers,
../../handlers,
../serdes,
../responses,
./types

View File

@ -59,7 +59,8 @@ proc getRouter(allowedOrigin: Option[string]): RestRouter =
proc init*(T: type RestServerRef,
ip: ValidIpAddress, port: Port,
allowedOrigin=none(string),
conf=RestServerConf.default()): RestServerResult[T] =
conf=RestServerConf.default(),
requestErrorHandler: RestRequestErrorHandler = nil): RestServerResult[T] =
let address = initTAddress(ip, port)
let serverFlags = {
HttpServerFlags.QueryCommaSeparatedArray,
@ -82,7 +83,8 @@ proc init*(T: type RestServerRef,
serverFlags = serverFlags,
httpHeadersTimeout = headersTimeout,
maxHeadersSize = maxHeadersSize,
maxRequestBodySize = maxRequestBodySize
maxRequestBodySize = maxRequestBodySize,
requestErrorHandler = requestErrorHandler
)
except CatchableError:
return err(getCurrentExceptionMsg())
@ -92,5 +94,7 @@ proc init*(T: type RestServerRef,
proc newRestHttpServer*(ip: ValidIpAddress, port: Port,
allowedOrigin=none(string),
conf=RestServerConf.default()): RestServerResult[RestServerRef] =
RestServerRef.init(ip, port, allowedOrigin, conf)
conf=RestServerConf.default(),
requestErrorHandler: RestRequestErrorHandler = nil):
RestServerResult[RestServerRef] =
RestServerRef.init(ip, port, allowedOrigin, conf, requestErrorHandler)

View File

@ -15,6 +15,7 @@ import
../../../waku_store/common,
../../../waku_node,
../../../node/peer_manager,
../../handlers,
../responses,
../serdes,
./types
@ -26,6 +27,9 @@ logScope:
const futTimeout* = 5.seconds # Max time to wait for futures
const NoPeerNoDiscError* = RestApiResponse.preconditionFailed(
"No suitable service peer & no discovery method")
# Queries the store-node with the query parameters and
# returns a RestApiResponse that is sent back to the api client.
proc performHistoryQuery(selfNode: WakuNode,
@ -182,10 +186,12 @@ proc toOpt(self: Option[Result[string, cstring]]): Option[string] =
if self.isSome() and self.get().value != "":
return some(self.get().value)
# Subscribes the rest handler to attend "/store/v1/messages" requests
proc installStoreV1Handler(router: var RestRouter,
node: WakuNode) =
proc installStoreApiHandlers*(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
# Handles the store-query request according to the passed parameters
router.api(MethodGet,
@ -209,18 +215,20 @@ proc installStoreV1Handler(router: var RestRouter,
# /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic
# Parse the peer address parameter
var parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt())
if not parsedPeerAddr.isOk():
return RestApiResponse.badRequest(parsedPeerAddr.error)
let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr:
return RestApiResponse.badRequest(error)
var peerOpt = none(RemotePeerInfo)
if parsedPeerAddr.value.isSome():
peerOpt = parsedPeerAddr.value
else:
# The user didn't specify any store peer address.
peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
return RestApiResponse.preconditionFailed("Missing known store-peer node")
let peerAddr = parsedPeerAddr.valueOr:
node.peerManager.selectPeer(WakuStoreCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscError
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)
peerOp.valueOr:
return RestApiResponse.preconditionFailed(
"No suitable service peer & none discovered")
# Parse the rest of the parameters and create a HistoryQuery
let histQuery = createHistoryQuery(
@ -238,10 +246,4 @@ proc installStoreV1Handler(router: var RestRouter,
if not histQuery.isOk():
return RestApiResponse.badRequest(histQuery.error)
return await node.performHistoryQuery(histQuery.value,
peerOpt.get())
# Registers the Api Handlers
proc installStoreApiHandlers*(router: var RestRouter,
node: WakuNode) =
installStoreV1Handler(router, node)
return await node.performHistoryQuery(histQuery.value, peerAddr)