mirror of https://github.com/waku-org/nwaku.git
chore: Enabling to use a full node for lightpush via rest api without lightpush client configured (#2626)
* Enabling to use a full node for lightpush via rest api without light push client configured
This commit is contained in:
parent
1d7ff2881b
commit
2a4c0f1543
|
@ -44,6 +44,7 @@ import
|
||||||
../waku_lightpush/client as lightpush_client,
|
../waku_lightpush/client as lightpush_client,
|
||||||
../waku_lightpush/common,
|
../waku_lightpush/common,
|
||||||
../waku_lightpush/protocol,
|
../waku_lightpush/protocol,
|
||||||
|
../waku_lightpush/self_req_handler,
|
||||||
../waku_enr,
|
../waku_enr,
|
||||||
../waku_peer_exchange,
|
../waku_peer_exchange,
|
||||||
../waku_rln_relay,
|
../waku_rln_relay,
|
||||||
|
@ -913,7 +914,7 @@ proc mountLightPush*(
|
||||||
|
|
||||||
if publishedCount == 0:
|
if publishedCount == 0:
|
||||||
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
||||||
debug("Lightpush request has not been published to any peers")
|
debug "Lightpush request has not been published to any peers"
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
@ -942,15 +943,30 @@ proc lightpushPublish*(
|
||||||
## Returns whether relaying was successful or not.
|
## Returns whether relaying was successful or not.
|
||||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||||
## functionality.
|
## functionality.
|
||||||
if node.wakuLightpushClient.isNil():
|
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
|
||||||
return err("waku lightpush client is nil")
|
error "failed to publish message as lightpush not available"
|
||||||
|
return err("Waku lightpush not available")
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
let internalPublish = proc(
|
||||||
|
node: WakuNode,
|
||||||
|
pubsubTopic: PubsubTopic,
|
||||||
|
message: WakuMessage,
|
||||||
|
peer: RemotePeerInfo,
|
||||||
|
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
|
if not node.wakuLightpushClient.isNil():
|
||||||
debug "publishing message with lightpush",
|
debug "publishing message with lightpush",
|
||||||
pubsubTopic = pubsubTopic.get(),
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = message.contentTopic,
|
contentTopic = message.contentTopic,
|
||||||
peer = peer.peerId
|
peer = peer.peerId
|
||||||
return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer)
|
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||||
|
|
||||||
|
if not node.wakuLightPush.isNil():
|
||||||
|
debug "publishing message with self hosted lightpush",
|
||||||
|
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
||||||
|
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)
|
||||||
|
|
||||||
|
if pubsubTopic.isSome():
|
||||||
|
return await internalPublish(node, pubsubTopic.get(), message, peer)
|
||||||
|
|
||||||
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic)
|
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic)
|
||||||
|
|
||||||
|
@ -961,9 +977,7 @@ proc lightpushPublish*(
|
||||||
topicMapRes.get()
|
topicMapRes.get()
|
||||||
|
|
||||||
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
||||||
debug "publishing message with lightpush",
|
return await internalPublish(node, $pubsub, message, peer)
|
||||||
pubsubTopic = pubsub, contentTopic = message.contentTopic, peer = peer.peerId
|
|
||||||
return await node.wakuLightpushClient.publish($pubsub, message, peer)
|
|
||||||
|
|
||||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||||
proc lightpushPublish*(
|
proc lightpushPublish*(
|
||||||
|
@ -971,16 +985,19 @@ proc lightpushPublish*(
|
||||||
): Future[WakuLightPushResult[void]] {.
|
): Future[WakuLightPushResult[void]] {.
|
||||||
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
|
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
|
||||||
.} =
|
.} =
|
||||||
if node.wakuLightpushClient.isNil():
|
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
|
||||||
let msg = "waku lightpush client is nil"
|
error "failed to publish message as lightpush not available"
|
||||||
error "failed to publish message", msg = msg
|
return err("waku lightpush not available")
|
||||||
return err(msg)
|
|
||||||
|
|
||||||
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo)
|
||||||
|
if not node.wakuLightpushClient.isNil():
|
||||||
|
peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
||||||
if peerOpt.isNone():
|
if peerOpt.isNone():
|
||||||
let msg = "no suitable remote peers"
|
let msg = "no suitable remote peers"
|
||||||
error "failed to publish message", msg = msg
|
error "failed to publish message", msg = msg
|
||||||
return err(msg)
|
return err(msg)
|
||||||
|
elif not node.wakuLightPush.isNil():
|
||||||
|
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))
|
||||||
|
|
||||||
let publishRes =
|
let publishRes =
|
||||||
await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
|
await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
|
||||||
|
|
|
@ -177,7 +177,10 @@ proc startRestServerProtocolSupport*(
|
||||||
rest_store_legacy_api.installStoreApiHandlers(router, node, storeDiscoHandler)
|
rest_store_legacy_api.installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||||
|
|
||||||
## Light push API
|
## Light push API
|
||||||
if conf.lightpushnode != "" and node.wakuLightpushClient != nil:
|
## Install it either if lightpushnode (lightpush service node) is configured and client is mounted)
|
||||||
|
## or install it to be used with self-hosted lightpush service
|
||||||
|
if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or
|
||||||
|
(conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil):
|
||||||
let lightDiscoHandler =
|
let lightDiscoHandler =
|
||||||
if wakuDiscv5.isSome():
|
if wakuDiscv5.isSome():
|
||||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
|
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
|
||||||
|
|
|
@ -16,6 +16,7 @@ import
|
||||||
../../waku/node/peer_manager,
|
../../waku/node/peer_manager,
|
||||||
../../../waku_node,
|
../../../waku_node,
|
||||||
../../waku/waku_lightpush/common,
|
../../waku/waku_lightpush/common,
|
||||||
|
../../waku/waku_lightpush/self_req_handler,
|
||||||
../../handlers,
|
../../handlers,
|
||||||
../serdes,
|
../serdes,
|
||||||
../responses,
|
../responses,
|
||||||
|
@ -35,6 +36,9 @@ const NoPeerNoDiscoError =
|
||||||
const NoPeerNoneFoundError =
|
const NoPeerNoneFoundError =
|
||||||
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
|
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
|
||||||
|
|
||||||
|
proc useSelfHostedLightPush(node: WakuNode): bool =
|
||||||
|
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
|
||||||
|
|
||||||
#### Request handlers
|
#### Request handlers
|
||||||
|
|
||||||
const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"
|
const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"
|
||||||
|
@ -60,7 +64,11 @@ proc installLightPushRequestHandler*(
|
||||||
let msg = req.message.toWakuMessage().valueOr:
|
let msg = req.message.toWakuMessage().valueOr:
|
||||||
return RestApiResponse.badRequest("Invalid message: " & $error)
|
return RestApiResponse.badRequest("Invalid message: " & $error)
|
||||||
|
|
||||||
let peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId)
|
||||||
|
if useSelfHostedLightPush(node):
|
||||||
|
discard
|
||||||
|
else:
|
||||||
|
peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||||
let handler = discHandler.valueOr:
|
let handler = discHandler.valueOr:
|
||||||
return NoPeerNoDiscoError
|
return NoPeerNoDiscoError
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
## Notice that the REST /lightpush requests normally assume that the node
|
||||||
|
## is acting as a lightpush-client that will trigger the service provider node
|
||||||
|
## to relay the message.
|
||||||
|
## In this module, we allow that a lightpush service node (full node) can be
|
||||||
|
## triggered directly through the REST /lightpush endpoint.
|
||||||
|
## The typical use case for that is when using `nwaku-compose`,
|
||||||
|
## which spawn a full service Waku node
|
||||||
|
## that could be used also as a lightpush client, helping testing and development.
|
||||||
|
|
||||||
|
import stew/results, chronos, chronicles, std/options, metrics
|
||||||
|
import
|
||||||
|
../waku_core,
|
||||||
|
./protocol,
|
||||||
|
./common,
|
||||||
|
./rpc,
|
||||||
|
./rpc_codec,
|
||||||
|
./protocol_metrics,
|
||||||
|
../utils/requests
|
||||||
|
|
||||||
|
proc handleSelfLightPushRequest*(
|
||||||
|
self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
|
||||||
|
): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
|
## Handles the lightpush requests made by the node to itself.
|
||||||
|
## Normally used in REST-lightpush requests
|
||||||
|
|
||||||
|
try:
|
||||||
|
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
|
||||||
|
let selfPeerId = self.peerManager.switch.peerInfo.peerId
|
||||||
|
|
||||||
|
let req = PushRequest(pubSubTopic: pubSubTopic, message: message)
|
||||||
|
let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req))
|
||||||
|
|
||||||
|
let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer)
|
||||||
|
|
||||||
|
if respRpc.response.isNone():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
|
||||||
|
return err(emptyResponseBodyFailure)
|
||||||
|
|
||||||
|
let response = respRpc.response.get()
|
||||||
|
if not response.isSuccess:
|
||||||
|
if response.info.isSome():
|
||||||
|
return err(response.info.get())
|
||||||
|
else:
|
||||||
|
return err("unknown failure")
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
except Exception:
|
||||||
|
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())
|
Loading…
Reference in New Issue