feat: lightpush v3 (#3279)

* Separate new lightpush protocol
New RPC defined
Rename al occurence of old lightpush to legacy lightpush, fix rest tests of lightpush
New lightpush protocol added back
Setup new lightpush protocol, mounting and rest api for it

	modified:   apps/chat2/chat2.nim
	modified:   tests/node/test_wakunode_lightpush.nim
	modified:   tests/node/test_wakunode_sharding.nim
	modified:   tests/test_peer_manager.nim
	modified:   tests/test_wakunode_lightpush.nim
	renamed:    tests/waku_lightpush/lightpush_utils.nim -> tests/waku_lightpush_legacy/lightpush_utils.nim
	renamed:    tests/waku_lightpush/test_all.nim -> tests/waku_lightpush_legacy/test_all.nim
	renamed:    tests/waku_lightpush/test_client.nim -> tests/waku_lightpush_legacy/test_client.nim
	renamed:    tests/waku_lightpush/test_ratelimit.nim -> tests/waku_lightpush_legacy/test_ratelimit.nim
	modified:   tests/wakunode_rest/test_all.nim
	renamed:    tests/wakunode_rest/test_rest_lightpush.nim -> tests/wakunode_rest/test_rest_lightpush_legacy.nim
	modified:   waku/factory/node_factory.nim
	modified:   waku/node/waku_node.nim
	modified:   waku/waku_api/rest/admin/handlers.nim
	modified:   waku/waku_api/rest/builder.nim
	new file:   waku/waku_api/rest/legacy_lightpush/client.nim
	new file:   waku/waku_api/rest/legacy_lightpush/handlers.nim
	new file:   waku/waku_api/rest/legacy_lightpush/types.nim
	modified:   waku/waku_api/rest/lightpush/client.nim
	modified:   waku/waku_api/rest/lightpush/handlers.nim
	modified:   waku/waku_api/rest/lightpush/types.nim
	modified:   waku/waku_core/codecs.nim
	modified:   waku/waku_lightpush.nim
	modified:   waku/waku_lightpush/callbacks.nim
	modified:   waku/waku_lightpush/client.nim
	modified:   waku/waku_lightpush/common.nim
	modified:   waku/waku_lightpush/protocol.nim
	modified:   waku/waku_lightpush/rpc.nim
	modified:   waku/waku_lightpush/rpc_codec.nim
	modified:   waku/waku_lightpush/self_req_handler.nim
	new file:   waku/waku_lightpush_legacy.nim
	renamed:    waku/waku_lightpush/README.md -> waku/waku_lightpush_legacy/README.md
	new file:   waku/waku_lightpush_legacy/callbacks.nim
	new file:   waku/waku_lightpush_legacy/client.nim
	new file:   waku/waku_lightpush_legacy/common.nim
	new file:   waku/waku_lightpush_legacy/protocol.nim
	new file:   waku/waku_lightpush_legacy/protocol_metrics.nim
	new file:   waku/waku_lightpush_legacy/rpc.nim
	new file:   waku/waku_lightpush_legacy/rpc_codec.nim
	new file:   waku/waku_lightpush_legacy/self_req_handler.nim

Adapt to non-invasive libp2p observers

cherry pick latest lightpush (v1) changes into legacy lightpush code after rebase to latest master

Fix vendor dependencies from origin/master after failed rebase of them

Adjust examples, test to new lightpush - keep using of legacy

Fixup error code mappings

Fix REST admin interface with distinct legacy and new lightpush

Fix lightpush v2 tests

* Utilize new publishEx interface of pubsub libp2p

* Adapt to latest libp2p pubslih design changes. publish returns an outcome as Result error.

* Fix review findings

* Fix tests, re-added lost one

* Fix rebase

* Apply suggestions from code review

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Addressing review comments

* Fix incentivization tests

* Fix build failed on libwaku

* Change new lightpush endpoint version to 3 instead of 2. Noticed that old and new lightpush metrics can cause trouble in monitoring dashboards so decided to give new name as v3 for the new lightpush metrics and change legacy ones back - temporarly till old lightpush will be decommissioned

* Fixing flaky test with rate limit timing

* Fixing logscope of lightpush and legacy lightpush

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
NagyZoltanPeter 2025-03-05 12:07:56 +01:00 committed by GitHub
parent c07e278d82
commit e0b563ffe5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
62 changed files with 2180 additions and 490 deletions

View File

@ -33,8 +33,8 @@ import
import
waku/[
waku_core,
waku_lightpush/common,
waku_lightpush/rpc,
waku_lightpush_legacy/common,
waku_lightpush_legacy/rpc,
waku_enr,
discovery/waku_dnsdisc,
waku_store_legacy,
@ -227,9 +227,9 @@ proc publish(c: Chat, line: string) =
c.node.wakuRlnRelay.lastEpoch = proof.epoch
try:
if not c.node.wakuLightPush.isNil():
if not c.node.wakuLegacyLightPush.isNil():
# Attempt lightpush
(waitFor c.node.lightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
(waitFor c.node.legacyLightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish lightpush message", error = error
else:
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
@ -502,8 +502,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if conf.lightpushnode != "":
let peerInfo = parsePeerInfo(conf.lightpushnode)
if peerInfo.isOk():
await mountLightPush(node)
node.mountLightPushClient()
await mountLegacyLightPush(node)
node.mountLegacyLightPushClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
else:
error "LightPush not mounted. Couldn't parse conf.lightpushnode",

View File

@ -145,7 +145,7 @@ proc publishMessages(
lightpushContentTopic,
renderMsgSize,
)
let wlpRes = await wakuNode.lightpushPublish(
let wlpRes = await wakuNode.legacyLightpushPublish(
some(lightpushPubsubTopic), message, actualServicePeer
)
@ -209,7 +209,7 @@ proc setupAndPublish*(
if isNil(wakuNode.wakuLightpushClient):
# if we have not yet initialized lightpush client, then do it as the only way we can get here is
# by having a service peer discovered.
wakuNode.mountLightPushClient()
wakuNode.mountLegacyLightPushClient()
# give some time to receiver side to set up
let waitTillStartTesting = conf.startPublishingAfter.seconds

View File

@ -202,7 +202,7 @@ when isMainModule:
var codec = WakuLightPushCodec
# mounting relevant client, for PX filter client must be mounted ahead
if conf.testFunc == TesterFunctionality.SENDER:
wakuApp.node.mountLightPushClient()
wakuApp.node.mountLegacyLightPushClient()
codec = WakuLightPushCodec
else:
waitFor wakuApp.node.mountFilterClient()

View File

@ -70,7 +70,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
let node = builder.build().tryGet()
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
node.mountLightPushClient()
node.mountLegacyLightPushClient()
await node.start()
node.peerManager.start()
@ -87,8 +87,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
let lightpushPeer = parsePeerInfo(LightpushPeer).get()
let res =
await node.lightpushPublish(some(LightpushPubsubTopic), message, lightpushPeer)
let res = await node.legacyLightpushPublish(
some(LightpushPubsubTopic), message, lightpushPeer
)
if res.isOk:
notice "published message",

View File

@ -2,12 +2,13 @@ import options
import chronicles, chronos, results
import
../../../../../waku/waku_core/message/message,
../../../../../waku/waku_core/codecs,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_lightpush/client,
../../../../../waku/waku_lightpush/common,
../../../../../waku/waku_lightpush_legacy/client,
../../../../../waku/waku_lightpush_legacy/common,
../../../../../waku/node/peer_manager/peer_manager,
../../../../alloc
@ -98,7 +99,7 @@ proc process*(
return err(errorMsg)
let msgHashHex = (
await waku.node.wakuLightpushClient.publish(
await waku.node.wakuLegacyLightpushClient.publish(
pubsubTopic, msg, peer = peerOpt.get()
)
).valueOr:

View File

@ -115,14 +115,13 @@ proc process*(
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
let errorMsg = "Message not sent because no peers found."
(await waku.node.wakuRelay.publish(pubsubTopic, msg)).isOkOr:
let errorMsg = "Message not sent." & $error
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
elif numPeers > 0:
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
of LIST_CONNECTED_PEERS:
let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error

View File

@ -65,6 +65,7 @@ import
./node/test_all,
./waku_filter_v2/test_all,
./waku_peer_exchange/test_all,
./waku_lightpush_legacy/test_all,
./waku_lightpush/test_all,
./waku_relay/test_all,
./incentivization/test_all
@ -72,7 +73,6 @@ import
import
# Waku v2 tests
./test_wakunode,
./test_wakunode_lightpush,
./test_peer_store_extended,
./test_message_cache,
./test_peer_manager,
@ -98,7 +98,7 @@ import
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_lightpush,
./wakunode_rest/test_rest_lightpush_legacy,
./wakunode_rest/test_rest_admin,
./wakunode_rest/test_rest_cors,
./wakunode_rest/test_rest_health

View File

@ -11,7 +11,7 @@ import
import
waku/[node/peer_manager, waku_core],
waku/incentivization/[rpc, reputation_manager],
waku/waku_lightpush/rpc
waku/waku_lightpush_legacy/rpc
suite "Waku Incentivization PoC Reputation":
var manager {.threadvar.}: ReputationManager

View File

@ -1,5 +1,6 @@
import
./test_wakunode_filter,
./test_wakunode_legacy_lightpush,
./test_wakunode_lightpush,
./test_wakunode_peer_exchange,
./test_wakunode_store,

View File

@ -0,0 +1,233 @@
{.used.}
import
std/[options, tables, sequtils, tempfiles, strutils],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
chronicles,
std/strformat,
os,
libp2p/[peerstore, crypto/crypto]
import
waku/[
waku_core,
node/peer_manager,
node/waku_node,
waku_filter_v2,
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_lightpush_legacy,
waku_lightpush_legacy/common,
waku_lightpush_legacy/client,
waku_lightpush_legacy/protocol_metrics,
waku_lightpush_legacy/rpc,
waku_rln_relay,
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
../resources/payloads
suite "Waku Legacy Lightpush - End To End":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler
server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage
asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
await server.mountLegacyLightpush() # without rln-relay
client.mountLegacyLightpushClient()
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()
asyncTeardown:
await server.stop()
suite "Assessment of Message Relaying Mechanisms":
asyncTest "Via 11/WAKU2-RELAY from Relay/Full Node":
# Given a light lightpush client
let lightpushClient =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
lightpushClient.mountLegacyLightpushClient()
# When the client publishes a message
let publishResponse = await lightpushClient.legacyLightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)
if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()
# Then the message is not relayed but not due to RLN
assert publishResponse.isErr(), "We expect an error response"
assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer),
"incorrect error response"
suite "Waku LightPush Validation Tests":
asyncTest "Validate message size exceeds limit":
let msgOverLimit = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024),
)
# When the client publishes an over-limit message
let publishResponse = await client.legacyLightpushPublish(
some(pubsubTopic), msgOverLimit, serverRemotePeerInfo
)
check:
publishResponse.isErr()
publishResponse.error ==
fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
suite "RLN Proofs as a Lightpush Service":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler
server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage
asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
# mount rln-relay
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
)
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLegacyLightPush()
client.mountLegacyLightPushClient()
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()
asyncTeardown:
await server.stop()
suite "Lightpush attaching RLN proofs":
asyncTest "Message is published when RLN enabled":
# Given a light lightpush client
let lightpushClient =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
lightpushClient.mountLegacyLightPushClient()
# When the client publishes a message
let publishResponse = await lightpushClient.legacyLightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)
if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()
# Then the message is not relayed but not due to RLN
assert publishResponse.isErr(), "We expect an error response"
check publishResponse.error == protocol_metrics.notPublishedAnyPeer
suite "Waku Legacy Lightpush message delivery":
asyncTest "Legacy lightpush message flow succeed":
## Setup
let
lightNodeKey = generateSecp256k1Key()
lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0))
bridgeNodeKey = generateSecp256k1Key()
bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0))
destNodeKey = generateSecp256k1Key()
destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0))
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLegacyLightPush()
lightNode.mountLegacyLightPushClient()
discard await lightNode.peerManager.dialPeer(
bridgeNode.peerInfo.toRemotePeerInfo(), WakuLegacyLightPushCodec
)
await sleepAsync(100.milliseconds)
await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
## Given
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
# Wait for subscription to take effect
await sleepAsync(100.millis)
## When
let res = await lightNode.legacyLightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
## Then
check await completionFutRelay.withTimeout(5.seconds)
## Cleanup
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())

View File

@ -19,15 +19,13 @@ import
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_lightpush,
waku_lightpush/common,
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_rln_relay,
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
../resources/payloads
const PublishedToOnePeer = 1
suite "Waku Lightpush - End To End":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
@ -45,9 +43,9 @@ suite "Waku Lightpush - End To End":
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
return ok(PublishedToOnePeer)
let
serverKey = generateSecp256k1Key()
@ -80,16 +78,16 @@ suite "Waku Lightpush - End To End":
# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
some(pubsubTopic), message, some(serverRemotePeerInfo)
)
if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()
echo "Publish failed: ", publishResponse.error.code
# Then the message is not relayed but not due to RLN
assert publishResponse.isErr(), "We expect an error response"
assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer),
assert (publishResponse.error.code == NO_PEERS_TO_RELAY),
"incorrect error response"
suite "Waku LightPush Validation Tests":
@ -101,13 +99,14 @@ suite "Waku Lightpush - End To End":
# When the client publishes an over-limit message
let publishResponse = await client.lightpushPublish(
some(pubsubTopic), msgOverLimit, serverRemotePeerInfo
some(pubsubTopic), msgOverLimit, some(serverRemotePeerInfo)
)
check:
publishResponse.isErr()
publishResponse.error ==
fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
publishResponse.error.code == INVALID_MESSAGE_ERROR
publishResponse.error.desc ==
some(fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes")
suite "RLN Proofs as a Lightpush Service":
var
@ -126,9 +125,9 @@ suite "RLN Proofs as a Lightpush Service":
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
return ok(PublishedToOnePeer)
let
serverKey = generateSecp256k1Key()
@ -151,8 +150,8 @@ suite "RLN Proofs as a Lightpush Service":
await server.mountRelay()
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLightpush()
client.mountLightpushClient()
await server.mountLightPush()
client.mountLightPushClient()
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
@ -167,11 +166,11 @@ suite "RLN Proofs as a Lightpush Service":
# Given a light lightpush client
let lightpushClient =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
lightpushClient.mountLightpushClient()
lightpushClient.mountLightPushClient()
# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
some(pubsubTopic), message, some(serverRemotePeerInfo)
)
if not publishResponse.isOk():
@ -179,5 +178,55 @@ suite "RLN Proofs as a Lightpush Service":
# Then the message is not relayed but not due to RLN
assert publishResponse.isErr(), "We expect an error response"
assert (publishResponse.error == protocol_metrics.notPublishedAnyPeer),
"incorrect error response"
check publishResponse.error.code == NO_PEERS_TO_RELAY
suite "Waku Lightpush message delivery":
asyncTest "lightpush message flow succeed":
## Setup
let
lightNodeKey = generateSecp256k1Key()
lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0))
bridgeNodeKey = generateSecp256k1Key()
bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0))
destNodeKey = generateSecp256k1Key()
destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0))
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()
discard await lightNode.peerManager.dialPeer(
bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec
)
await sleepAsync(100.milliseconds)
await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
## Given
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
# Wait for subscription to take effect
await sleepAsync(100.millis)
## When
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
## Then
check await completionFutRelay.withTimeout(5.seconds)
## Cleanup
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())

View File

@ -286,7 +286,7 @@ suite "Sharding":
asyncTest "lightpush":
# Given a connected server and client subscribed to the same pubsub topic
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()
let
@ -299,7 +299,7 @@ suite "Sharding":
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic")
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(topic), msg, server.switch.peerInfo.toRemotePeerInfo()
)
@ -409,7 +409,7 @@ suite "Sharding":
asyncTest "lightpush (automatic sharding filtering)":
# Given a connected server and client using the same content topic (with two different formats)
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()
let
@ -424,7 +424,7 @@ suite "Sharding":
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo()
)
@ -567,7 +567,7 @@ suite "Sharding":
asyncTest "lightpush - exclusion (automatic sharding filtering)":
# Given a connected server and client using different content topics
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()
let
@ -584,7 +584,7 @@ suite "Sharding":
# When a peer publishes a message in the server's subscribed topic (the client, for testing easeness)
let
msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo()
)
@ -854,12 +854,12 @@ suite "Sharding":
(await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Protocol with Unconfigured PubSub Topic Fails":
# Given a
# Given a
let
contentTopic = "myContentTopic"
topic = "/waku/2/rs/0/1"
# Using a different topic to simulate "unconfigured" pubsub topic
# but to have a handler (and be able to assert the test)
# but to have a handler (and be able to assert the test)
serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0")
clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0")
@ -878,7 +878,7 @@ suite "Sharding":
asyncTest "Waku LightPush Sharding (Static Sharding)":
# Given a connected server and client using two different pubsub topics
client.mountLightPushClient()
client.mountLegacyLightPushClient()
await server.mountLightpush()
# Given a connected server and client subscribed to multiple pubsub topics
@ -898,7 +898,7 @@ suite "Sharding":
# When a peer publishes a message (the client, for testing easeness) in topic1
let
msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
lightpublishRespnse = await client.lightpushPublish(
lightpublishRespnse = await client.legacyLightpushPublish(
some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo()
)
@ -916,7 +916,7 @@ suite "Sharding":
clientHandler2.reset()
let
msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
lightpublishResponse2 = await client.lightpushPublish(
lightpublishResponse2 = await client.legacyLightpushPublish(
some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo()
)

View File

@ -770,7 +770,7 @@ procSuite "Peer Manager":
# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyLightPushCodec)
node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec)
# relay peers (should not be added)
@ -788,7 +788,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId
# but the relay peer is not
@ -917,13 +917,13 @@ procSuite "Peer Manager":
selectedPeer2.get().peerId == peers[0].peerId
# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLightPushCodec)
let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer3.isSome() == false
# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLightPushCodec)
pm.addServicePeer(peers[2], WakuLegacyLightPushCodec)
# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
@ -931,7 +931,7 @@ procSuite "Peer Manager":
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId
let selectedPeer5 = pm.selectPeer(WakuLightPushCodec)
let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId

View File

@ -1,7 +1,7 @@
{.used.}
import std/[options, sequtils], stew/results, testutils/unittests
import waku/waku_core, waku/waku_enr, ./testlib/wakucore, waku/waku_core/codecs
import waku/waku_core, waku/waku_enr, ./testlib/wakucore
suite "Waku ENR - Capabilities bitfield":
test "check capabilities support":

View File

@ -1,58 +0,0 @@
{.used.}
import std/options, stew/shims/net as stewNet, testutils/unittests, chronos
import
waku/[waku_core, waku_lightpush/common, node/peer_manager, waku_node],
./testlib/wakucore,
./testlib/wakunode
suite "WakuNode - Lightpush":
asyncTest "Lightpush message return success":
## Setup
let
lightNodeKey = generateSecp256k1Key()
lightNode = newTestWakuNode(lightNodeKey, parseIpAddress("0.0.0.0"), Port(0))
bridgeNodeKey = generateSecp256k1Key()
bridgeNode = newTestWakuNode(bridgeNodeKey, parseIpAddress("0.0.0.0"), Port(0))
destNodeKey = generateSecp256k1Key()
destNode = newTestWakuNode(destNodeKey, parseIpAddress("0.0.0.0"), Port(0))
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()
discard await lightNode.peerManager.dialPeer(
bridgeNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec
)
await sleepAsync(100.milliseconds)
await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
## Given
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
# Wait for subscription to take effect
await sleepAsync(100.millis)
## When
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
## Then
check await completionFutRelay.withTimeout(5.seconds)
## Cleanup
await allFutures(lightNode.stop(), bridgeNode.stop(), destNode.stop())

View File

@ -7,7 +7,7 @@ import
libp2p/peerid,
libp2p/errors,
confutils/toml/std/net
import waku/[waku_core, waku_core/codecs, waku_enr], ../testlib/wakucore
import waku/[waku_core, waku_enr], ../testlib/wakucore
suite "Waku Core - Peers":
test "Peer info parses correctly":

View File

@ -5,6 +5,7 @@ import std/options, chronicles, chronos, libp2p/crypto/crypto
import
waku/node/peer_manager,
waku/waku_core,
waku/waku_core/topics/sharding,
waku/waku_lightpush,
waku/waku_lightpush/[client, common],
waku/common/rate_limit/setting,
@ -17,7 +18,8 @@ proc newTestWakuLightpushNode*(
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)
wakuSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
proto = WakuLightPush.new(peerManager, rng, handler, wakuSharding, rateLimitSetting)
await proto.start()
switch.mount(proto)

View File

@ -13,10 +13,7 @@ import
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
@ -42,12 +39,14 @@ suite "Waku Lightpush Client":
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
let msgLen = message.encode().buffer.len
if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024:
return err("length greater than maxMessageSize")
return
lighpushErrorResult(PAYLOAD_TOO_LARGE, "length greater than maxMessageSize")
handlerFuture.complete((pubsubTopic, message))
return ok()
# return that we published the message to 1 peer.
return ok(1)
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
@ -80,7 +79,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -92,8 +91,9 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse2 =
await client.publish(pubsub_topics.CURRENT, message2, serverRemotePeerInfo)
let publishResponse2 = await client.publish(
some(pubsub_topics.CURRENT), message2, serverRemotePeerInfo
)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -106,7 +106,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse3 = await client.publish(
pubsub_topics.CURRENT_NESTED, message3, serverRemotePeerInfo
some(pubsub_topics.CURRENT_NESTED), message3, serverRemotePeerInfo
)
# Then the message is received by the server
@ -119,8 +119,9 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse4 =
await client.publish(pubsub_topics.SHARDING, message4, serverRemotePeerInfo)
let publishResponse4 = await client.publish(
some(pubsub_topics.SHARDING), message4, serverRemotePeerInfo
)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -133,7 +134,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse5 =
await client.publish(pubsub_topics.PLAIN, message5, serverRemotePeerInfo)
await client.publish(some(pubsub_topics.PLAIN), message5, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -146,7 +147,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse6 =
await client.publish(pubsub_topics.LEGACY, message6, serverRemotePeerInfo)
await client.publish(some(pubsub_topics.LEGACY), message6, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -159,7 +160,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse7 = await client.publish(
pubsub_topics.LEGACY_NESTED, message7, serverRemotePeerInfo
some(pubsub_topics.LEGACY_NESTED), message7, serverRemotePeerInfo
)
# Then the message is received by the server
@ -173,7 +174,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse8 = await client.publish(
pubsub_topics.LEGACY_ENCODING, message8, serverRemotePeerInfo
some(pubsub_topics.LEGACY_ENCODING), message8, serverRemotePeerInfo
)
# Then the message is received by the server
@ -187,7 +188,7 @@ suite "Waku Lightpush Client":
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse9 =
await client.publish(pubsubTopic, message9, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message9, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -221,7 +222,7 @@ suite "Waku Lightpush Client":
# When publishing the 1KiB payload
let publishResponse1 =
await client.publish(pubsubTopic, message1, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message1, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse1
@ -230,7 +231,7 @@ suite "Waku Lightpush Client":
# When publishing the 10KiB payload
handlerFuture = newPushHandlerFuture()
let publishResponse2 =
await client.publish(pubsubTopic, message2, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message2, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse2
@ -239,7 +240,7 @@ suite "Waku Lightpush Client":
# When publishing the 100KiB payload
handlerFuture = newPushHandlerFuture()
let publishResponse3 =
await client.publish(pubsubTopic, message3, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message3, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse3
@ -248,7 +249,7 @@ suite "Waku Lightpush Client":
# When publishing the 1MiB + 63KiB + 911B payload (1113999B)
handlerFuture = newPushHandlerFuture()
let publishResponse4 =
await client.publish(pubsubTopic, message4, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message4, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse4
@ -257,11 +258,12 @@ suite "Waku Lightpush Client":
# When publishing the 1MiB + 63KiB + 912B payload (1114000B)
handlerFuture = newPushHandlerFuture()
let publishResponse5 =
await client.publish(pubsubTopic, message5, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message5, serverRemotePeerInfo)
# Then the message is not received by the server
check:
not publishResponse5.isOk()
publishResponse5.isErr()
publishResponse5.error.code == PAYLOAD_TOO_LARGE
(await handlerFuture.waitForResult()).isErr()
asyncTest "Invalid Encoding Payload":
@ -271,16 +273,12 @@ suite "Waku Lightpush Client":
# When publishing the payload
let publishResponse = await server.handleRequest(clientPeerId, fakeBuffer)
# Then the response is negative
check:
publishResponse.requestId == ""
# And the error is returned
let response = publishResponse.response.get()
check:
response.isSuccess == false
response.info.isSome()
scanf(response.info.get(), decodeRpcFailure)
publishResponse.requestId == "N/A"
publishResponse.statusCode == LightpushStatusCode.BAD_REQUEST.uint32
publishResponse.statusDesc.isSome()
scanf(publishResponse.statusDesc.get(), decodeRpcFailure)
asyncTest "Handle Error":
# Given a lightpush server that fails
@ -289,9 +287,9 @@ suite "Waku Lightpush Client":
handlerFuture2 = newFuture[void]()
handler2 = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
handlerFuture2.complete()
return err(handlerError)
return lighpushErrorResult(PAYLOAD_TOO_LARGE, handlerError)
let
serverSwitch2 = newTestSwitch()
@ -303,11 +301,12 @@ suite "Waku Lightpush Client":
# When publishing a payload
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2)
# Then the response is negative
check:
publishResponse.error() == handlerError
publishResponse.error.code == PAYLOAD_TOO_LARGE
publishResponse.error.desc == some(handlerError)
(await handlerFuture2.waitForResult()).isOk()
# Cleanup
@ -317,7 +316,7 @@ suite "Waku Lightpush Client":
asyncTest "Positive Responses":
# When sending a valid PushRequest
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo)
# Then the response is positive
assertResultOk publishResponse
@ -333,7 +332,8 @@ suite "Waku Lightpush Client":
# When sending an invalid PushRequest
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
await client.publish(some(pubsubTopic), message, serverRemotePeerInfo2)
# Then the response is negative
check not publishResponse.isOk()
check publishResponse.error.code == LightpushStatusCode.NO_PEERS_TO_RELAY

View File

@ -14,10 +14,7 @@ import
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
@ -36,9 +33,9 @@ suite "Rate limited push service":
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler: PushMessageHandler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
return lightpushSuccessResult(1) # succeed to publish to 1 peer.
let
tokenPeriod = 500.millis
@ -53,12 +50,13 @@ suite "Rate limited push service":
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
check await handlerFuture.withTimeout(50.millis)
assert requestRes.isOk(), requestRes.error
check handlerFuture.finished()
check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
@ -98,9 +96,9 @@ suite "Rate limited push service":
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
return lightpushSuccessResult(1)
let
server =
@ -114,7 +112,7 @@ suite "Rate limited push service":
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
@ -129,12 +127,13 @@ suite "Rate limited push service":
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"
requestRes.error.code == TOO_MANY_REQUESTS
requestRes.error.desc == some(TooManyRequestsMessage)
for testCnt in 0 .. 2:
await successProc()

View File

@ -0,0 +1,29 @@
{.used.}
import std/options, chronicles, chronos, libp2p/crypto/crypto
import
waku/node/peer_manager,
waku/waku_core,
waku/waku_lightpush_legacy,
waku/waku_lightpush_legacy/[client, common],
waku/common/rate_limit/setting,
../testlib/[common, wakucore]
proc newTestWakuLegacyLightpushNode*(
switch: Switch,
handler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuLegacyLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLegacyLightPush.new(peerManager, rng, handler, rateLimitSetting)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuLegacyLightpushClient*(switch: Switch): WakuLegacyLightPushClient =
let peerManager = PeerManager.new(switch)
WakuLegacyLightPushClient.new(peerManager, rng)

View File

@ -0,0 +1 @@
import ./test_client, ./test_ratelimit

View File

@ -0,0 +1,339 @@
{.used.}
import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
waku/[
node/peer_manager,
waku_core,
waku_lightpush_legacy,
waku_lightpush_legacy/client,
waku_lightpush_legacy/common,
waku_lightpush_legacy/protocol_metrics,
waku_lightpush_legacy/rpc,
waku_lightpush_legacy/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]
suite "Waku Legacy Lightpush Client":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler
serverSwitch {.threadvar.}: Switch
clientSwitch {.threadvar.}: Switch
server {.threadvar.}: WakuLegacyLightPush
client {.threadvar.}: WakuLegacyLightPushClient
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
clientPeerId {.threadvar.}: PeerId
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage
asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let msgLen = message.encode().buffer.len
if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024:
return err("length greater than maxMessageSize")
handlerFuture.complete((pubsubTopic, message))
return ok()
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
server = await newTestWakuLegacyLightpushNode(serverSwitch, handler)
client = newTestWakuLegacyLightpushClient(clientSwitch)
await allFutures(serverSwitch.start(), clientSwitch.start())
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
clientPeerId = clientSwitch.peerInfo.peerId
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()
asyncTeardown:
await allFutures(clientSwitch.stop(), serverSwitch.stop())
suite "Verification of PushRequest Payload":
asyncTest "Valid Payload Types":
# Given the following payloads
let
message2 = fakeWakuMessage(payloads.ALPHABETIC, content_topics.CURRENT)
message3 = fakeWakuMessage(payloads.ALPHANUMERIC, content_topics.TESTNET)
message4 = fakeWakuMessage(payloads.ALPHANUMERIC_SPECIAL, content_topics.PLAIN)
message5 = fakeWakuMessage(payloads.EMOJI, content_topics.CURRENT)
message6 = fakeWakuMessage(payloads.CODE, content_topics.TESTNET)
message7 = fakeWakuMessage(payloads.QUERY, content_topics.PLAIN)
message8 = fakeWakuMessage(payloads.TEXT_SMALL, content_topics.CURRENT)
message9 = fakeWakuMessage(payloads.TEXT_LARGE, content_topics.TESTNET)
# When publishing a valid payload
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsubTopic, message) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse2 =
await client.publish(pubsub_topics.CURRENT, message2, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse2
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.CURRENT, message2) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse3 = await client.publish(
pubsub_topics.CURRENT_NESTED, message3, serverRemotePeerInfo
)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse3
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.CURRENT_NESTED, message3) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse4 =
await client.publish(pubsub_topics.SHARDING, message4, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse4
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.SHARDING, message4) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse5 =
await client.publish(pubsub_topics.PLAIN, message5, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse5
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.PLAIN, message5) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse6 =
await client.publish(pubsub_topics.LEGACY, message6, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse6
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.LEGACY, message6) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse7 = await client.publish(
pubsub_topics.LEGACY_NESTED, message7, serverRemotePeerInfo
)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse7
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.LEGACY_NESTED, message7) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse8 = await client.publish(
pubsub_topics.LEGACY_ENCODING, message8, serverRemotePeerInfo
)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse8
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsub_topics.LEGACY_ENCODING, message8) == handlerFuture.read()
# When publishing a valid payload
handlerFuture = newPushHandlerFuture()
let publishResponse9 =
await client.publish(pubsubTopic, message9, serverRemotePeerInfo)
# Then the message is received by the server
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
assertResultOk publishResponse9
check handlerFuture.finished()
# And the message is received with the correct topic and payload
check (pubsubTopic, message9) == handlerFuture.read()
asyncTest "Valid Payload Sizes":
# Given some valid payloads
let
overheadBytes: uint64 = 112
message1 =
fakeWakuMessage(contentTopic = contentTopic, payload = getByteSequence(1024))
# 1KiB
message2 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
) # 10KiB
message3 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
message4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1),
) # Inclusive Limit
message5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024),
) # Exclusive Limit
# When publishing the 1KiB payload
let publishResponse1 =
await client.publish(pubsubTopic, message1, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse1
check (pubsubTopic, message1) == (await handlerFuture.waitForResult()).value()
# When publishing the 10KiB payload
handlerFuture = newPushHandlerFuture()
let publishResponse2 =
await client.publish(pubsubTopic, message2, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse2
check (pubsubTopic, message2) == (await handlerFuture.waitForResult()).value()
# When publishing the 100KiB payload
handlerFuture = newPushHandlerFuture()
let publishResponse3 =
await client.publish(pubsubTopic, message3, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse3
check (pubsubTopic, message3) == (await handlerFuture.waitForResult()).value()
# When publishing the 1MiB + 63KiB + 911B payload (1113999B)
handlerFuture = newPushHandlerFuture()
let publishResponse4 =
await client.publish(pubsubTopic, message4, serverRemotePeerInfo)
# Then the message is received by the server
assertResultOk publishResponse4
check (pubsubTopic, message4) == (await handlerFuture.waitForResult()).value()
# When publishing the 1MiB + 63KiB + 912B payload (1114000B)
handlerFuture = newPushHandlerFuture()
let publishResponse5 =
await client.publish(pubsubTopic, message5, serverRemotePeerInfo)
# Then the message is not received by the server
check:
not publishResponse5.isOk()
(await handlerFuture.waitForResult()).isErr()
asyncTest "Invalid Encoding Payload":
# Given a payload with an invalid encoding
let fakeBuffer = @[byte(42)]
# When publishing the payload
let publishResponse = await server.handleRequest(clientPeerId, fakeBuffer)
# Then the response is negative
check:
publishResponse.requestId == ""
# And the error is returned
let response = publishResponse.response.get()
check:
response.isSuccess == false
response.info.isSome()
scanf(response.info.get(), decodeRpcFailure)
asyncTest "Handle Error":
# Given a lightpush server that fails
let
handlerError = "handler-error"
handlerFuture2 = newFuture[void]()
handler2 = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture2.complete()
return err(handlerError)
let
serverSwitch2 = newTestSwitch()
server2 = await newTestWakuLegacyLightpushNode(serverSwitch2, handler2)
await serverSwitch2.start()
let serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo()
# When publishing a payload
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
# Then the response is negative
check:
publishResponse.error() == handlerError
(await handlerFuture2.waitForResult()).isOk()
# Cleanup
await serverSwitch2.stop()
suite "Verification of PushResponse Payload":
asyncTest "Positive Responses":
# When sending a valid PushRequest
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
# Then the response is positive
assertResultOk publishResponse
# TODO: Improve: Add more negative responses variations
asyncTest "Negative Responses":
# Given a server that does not support Waku Lightpush
let
serverSwitch2 = newTestSwitch()
serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo()
await serverSwitch2.start()
# When sending an invalid PushRequest
let publishResponse =
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
# Then the response is negative
check not publishResponse.isOk()

View File

@ -0,0 +1,153 @@
{.used.}
import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
waku/[
node/peer_manager,
common/rate_limit/setting,
waku_core,
waku_lightpush_legacy,
waku_lightpush_legacy/client,
waku_lightpush_legacy/common,
waku_lightpush_legacy/protocol_metrics,
waku_lightpush_legacy/rpc,
waku_lightpush_legacy/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]
suite "Rate limited push service":
asyncTest "push message with rate limit not violated":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler: PushMessageHandler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
tokenPeriod = 500.millis
server = await newTestWakuLegacyLightpushNode(
serverSwitch, handler, some((3, tokenPeriod))
)
client = newTestWakuLegacyLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let sendMsgProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
check await handlerFuture.withTimeout(50.millis)
assert requestRes.isOk(), requestRes.error
check handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let waitInBetweenFor = 20.millis
# Test cannot be too explicit about the time when the TokenBucket resets
# the internal timer, although in normal use there is no use case to care about it.
var firstWaitExtend = 300.millis
for runCnt in 0 ..< 3:
let startTime = Moment.now()
for testCnt in 0 ..< 3:
await sendMsgProc()
await sleepAsync(20.millis)
var endTime = Moment.now()
var elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
firstWaitEXtend = 100.millis
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "push message with rate limit reject":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
server = await newTestWakuLegacyLightpushNode(
serverSwitch, handler, some((3, 500.millis))
)
client = newTestWakuLegacyLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic
let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"
for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)
await rejectProc()
await sleepAsync(500.millis)
## next one shall succeed due to the rate limit time window has passed
await successProc()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -4,6 +4,7 @@ import
./test_rest_debug_serdes,
./test_rest_debug,
./test_rest_filter,
./test_rest_lightpush_legacy,
./test_rest_health,
./test_rest_relay_serdes,
./test_rest_relay,

View File

@ -15,13 +15,13 @@ import
waku_core,
waku_node,
node/peer_manager,
waku_lightpush/common,
waku_lightpush_legacy/common,
waku_api/rest/server,
waku_api/rest/client,
waku_api/rest/responses,
waku_api/rest/lightpush/types,
waku_api/rest/lightpush/handlers as lightpush_api,
waku_api/rest/lightpush/client as lightpush_api_client,
waku_api/rest/legacy_lightpush/types,
waku_api/rest/legacy_lightpush/handlers as lightpush_api,
waku_api/rest/legacy_lightpush/client as lightpush_api_client,
waku_relay,
common/rate_limit/setting,
],
@ -61,8 +61,8 @@ proc init(
await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()
await testSetup.serviceNode.mountLegacyLightPush(rateLimit)
testSetup.pushNode.mountLegacyLightPushClient()
testSetup.serviceNode.peerManager.addServicePeer(
testSetup.consumerNode.peerInfo.toRemotePeerInfo(), WakuRelayCodec
@ -73,7 +73,7 @@ proc init(
)
testSetup.pushNode.peerManager.addServicePeer(
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLegacyLightPushCodec
)
var restPort = Port(0)
@ -209,8 +209,7 @@ suite "Waku v2 Rest API - lightpush":
await restLightPushTest.shutdown()
# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
xasyncTest "Request rate limit push message":
asyncTest "Request rate limit push message":
# Given
let budgetCap = 3
let tokenPeriod = 500.millis
@ -273,7 +272,7 @@ suite "Waku v2 Rest API - lightpush":
let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)
await sleepAsync(tokenPeriod - elapsed + 10.millis)
await restLightPushTest.shutdown()

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40
Subproject commit 78a434405435b69a24e8b263d48d622d57c4db5b

View File

@ -16,6 +16,7 @@ import
../waku_enr/sharding,
../waku_node,
../waku_core,
../waku_core/codecs,
../waku_rln_relay,
../discovery/waku_dnsdisc,
../waku_archive/retention_policy as policy,
@ -33,7 +34,7 @@ import
../node/peer_manager,
../node/peer_manager/peer_store/waku_peer_storage,
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../waku_lightpush/common,
../waku_lightpush_legacy/common,
../common/utils/parse_size_units,
../common/rate_limit/setting,
../common/databases/dburl
@ -359,14 +360,17 @@ proc setupProtocols(
if conf.lightpush:
try:
await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))
await mountLegacyLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
mountLightPushClient(node)
mountLegacyLightPushClient(node)
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if lightPushNode.isOk():
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
node.peerManager.addServicePeer(lightPushNode.value, WakuLegacyLightPushCodec)
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)

View File

@ -1,5 +1,5 @@
import tables, std/options
import waku/waku_lightpush/rpc
import ../waku_lightpush_legacy/rpc
type
PeerId = string

View File

@ -20,7 +20,7 @@ proc new*(
T: type DeliveryMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
wakuLightpushClient: WakuLightpushClient,
wakuFilterClient: WakuFilterClient,
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryMonitor

View File

@ -171,9 +171,9 @@ proc processMessages(self: SendMonitor) {.async.} =
let msg = deliveryInfo.msg
if not self.wakuRelay.isNil():
debug "trying to publish again with wakuRelay", msgHash, pubsubTopic
let ret = await self.wakuRelay.publish(pubsubTopic, msg)
if ret == 0:
error "could not publish with wakuRelay.publish", msgHash, pubsubTopic
(await self.wakuRelay.publish(pubsubTopic, msg)).isOkOr:
error "could not publish with wakuRelay.publish",
msgHash, pubsubTopic, error = $error
continue
if not self.wakuLightpushClient.isNil():

View File

@ -20,7 +20,8 @@ import
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
libp2p/transports/wstransport
libp2p/transports/wstransport,
libp2p/utility
import
../waku_core,
../waku_core/topics/sharding,
@ -40,11 +41,10 @@ import
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_rendezvous/protocol,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_lightpush/self_req_handler,
../waku_lightpush/callbacks,
../waku_lightpush_legacy/client as legacy_ligntpuhs_client,
../waku_lightpush_legacy as legacy_lightpush_protocol,
../waku_lightpush/client as ligntpuhs_client,
../waku_lightpush as lightpush_protocol,
../waku_enr,
../waku_peer_exchange,
../waku_rln_relay,
@ -105,6 +105,8 @@ type
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
wakuLegacyLightPush*: WakuLegacyLightPush
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
@ -250,7 +252,6 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) =
return
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
@ -979,53 +980,53 @@ proc setupStoreResume*(node: WakuNode) =
return
## Waku lightpush
proc mountLightPush*(
proc mountLegacyLightPush*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
info "mounting light push"
info "mounting legacy light push"
var pushHandler =
let pushHandler =
if node.wakuRelay.isNil:
debug "mounting lightpush without relay (nil)"
getNilPushHandler()
debug "mounting legacy lightpush without relay (nil)"
legacy_lightpush_protocol.getNilPushHandler()
else:
debug "mounting lightpush with relay"
debug "mounting legacy lightpush with relay"
let rlnPeer =
if isNil(node.wakuRlnRelay):
debug "mounting lightpush without rln-relay"
debug "mounting legacy lightpush without rln-relay"
none(WakuRLNRelay)
else:
debug "mounting lightpush with rln-relay"
debug "mounting legacy lightpush with rln-relay"
some(node.wakuRlnRelay)
getRelayPushHandler(node.wakuRelay, rlnPeer)
legacy_lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer)
node.wakuLightPush =
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
node.wakuLegacyLightPush =
WakuLegacyLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
if node.started:
# Node has started already. Let's start lightpush too.
await node.wakuLightPush.start()
await node.wakuLegacyLightPush.start()
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
node.switch.mount(node.wakuLegacyLightPush, protocolMatcher(WakuLegacyLightPushCodec))
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
proc mountLegacyLightPushClient*(node: WakuNode) =
info "mounting legacy light push client"
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
node.wakuLegacyLightpushClient =
WakuLegacyLightPushClient.new(node.peerManager, node.rng)
proc lightpushPublish*(
proc legacyLightpushPublish*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
error "failed to publish message as lightpush not available"
if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil():
error "failed to publish message as legacy lightpush not available"
return err("Waku lightpush not available")
let internalPublish = proc(
@ -1033,23 +1034,24 @@ proc lightpushPublish*(
pubsubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with lightpush",
if not node.wakuLegacyLightpushClient.isNil():
notice "publishing message with legacy lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
return await node.wakuLegacyLightpushClient.publish(pubsubTopic, message, peer)
if not node.wakuLightPush.isNil():
notice "publishing message with self hosted lightpush",
if not node.wakuLegacyLightPush.isNil():
notice "publishing message with self hosted legacy lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)
return
await node.wakuLegacyLightPush.handleSelfLightPushRequest(pubsubTopic, message)
try:
if pubsubTopic.isSome():
return await internalPublish(node, pubsubTopic.get(), message, peer)
@ -1068,26 +1070,119 @@ proc lightpushPublish*(
return err(getCurrentExceptionMsg())
# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish*(
proc legacyLightpushPublish*(
node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult[string]] {.
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.
async, gcsafe, deprecated: "Use 'node.legacyLightpushPublish()' instead"
.} =
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
error "failed to publish message as lightpush not available"
return err("waku lightpush not available")
if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil():
error "failed to publish message as legacy lightpush not available"
return err("waku legacy lightpush not available")
var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo)
if not node.wakuLightpushClient.isNil():
peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if not node.wakuLegacyLightpushClient.isNil():
peerOpt = node.peerManager.selectPeer(WakuLegacyLightPushCodec)
if peerOpt.isNone():
let msg = "no suitable remote peers"
error "failed to publish message", err = msg
return err(msg)
elif not node.wakuLightPush.isNil():
elif not node.wakuLegacyLightPush.isNil():
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))
return await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
return await node.legacyLightpushPublish(pubsubTopic, message, peer = peerOpt.get())
proc mountLightPush*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
info "mounting light push"
let pushHandler =
if node.wakuRelay.isNil():
debug "mounting lightpush v2 without relay (nil)"
lightpush_protocol.getNilPushHandler()
else:
debug "mounting lightpush with relay"
let rlnPeer =
if isNil(node.wakuRlnRelay):
debug "mounting lightpush without rln-relay"
none(WakuRLNRelay)
else:
debug "mounting lightpush with rln-relay"
some(node.wakuRlnRelay)
lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer)
node.wakuLightPush = WakuLightPush.new(
node.peerManager, node.rng, pushHandler, node.wakuSharding, some(rateLimit)
)
if node.started:
# Node has started already. Let's start lightpush too.
await node.wakuLightPush.start()
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
proc lightpushPublishHandler(
node: WakuNode,
pubsubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo | PeerInfo,
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with legacy lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
if not node.wakuLightPush.isNil():
notice "publishing message with self hosted legacy lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return
await node.wakuLightPush.handleSelfLightPushRequest(some(pubsubTopic), message)
proc lightpushPublish*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],
message: WakuMessage,
peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo),
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
error "failed to publish message as lightpush not available"
return lighpushErrorResult(SERVICE_NOT_AVAILABLE, "Waku lightpush not available")
let toPeer: RemotePeerInfo = peerOpt.valueOr:
if not node.wakuLightPush.isNil():
RemotePeerInfo.init(node.peerId())
elif not node.wakuLightpushClient.isNil():
node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let msg = "no suitable remote peers"
error "failed to publish message", msg = msg
return lighpushErrorResult(NO_PEERS_TO_RELAY, msg)
else:
return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers")
let pubsubForPublish = pubSubTopic.valueOr:
let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg)
node.wakuSharding.getShard(parsedTopic).valueOr:
let msg = "Autosharding error: " & error
error "lightpush publish error", error = msg
return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg)
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer)
## Waku RLN Relay
proc mountRlnRelay*(

View File

@ -12,7 +12,7 @@ import
../../../waku_store_legacy/common,
../../../waku_store/common,
../../../waku_filter_v2,
../../../waku_lightpush/common,
../../../waku_lightpush_legacy/common,
../../../waku_relay,
../../../waku_peer_exchange,
../../../waku_node,
@ -85,6 +85,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
tuplesToWakuPeers(peers, legacyStorePeers)
let legacyLightpushPeers = node.peerManager.wakuPeerStore
.peers(WakuLegacyLightPushCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyLightPushCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, legacyLightpushPeers)
let lightpushPeers = node.peerManager.wakuPeerStore.peers(WakuLightPushCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),

View File

@ -12,6 +12,7 @@ import
waku/waku_api/rest/debug/handlers as rest_debug_api,
waku/waku_api/rest/relay/handlers as rest_relay_api,
waku/waku_api/rest/filter/handlers as rest_filter_api,
waku/waku_api/rest/legacy_lightpush/handlers as rest_legacy_lightpush_api,
waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
waku/waku_api/rest/store/handlers as rest_store_api,
waku/waku_api/rest/legacy_store/handlers as rest_store_legacy_api,
@ -176,14 +177,17 @@ proc startRestServerProtocolSupport*(
## Light push API
## Install it either if lightpushnode (lightpush service node) is configured and client is mounted)
## or install it to be used with self-hosted lightpush service
if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or
(conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil):
if (conf.lightpushnode != "" and node.wakuLegacyLightpushClient != nil) or
(conf.lightpush and node.wakuLegacyLightPush != nil and node.wakuRelay != nil):
let lightDiscoHandler =
if not wakuDiscv5.isNil():
some(defaultDiscoveryHandler(wakuDiscv5, Lightpush))
else:
none(DiscoveryHandler)
rest_legacy_lightpush_api.installLightPushRequestHandler(
router, node, lightDiscoHandler
)
rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler)
else:
restServerNotInstalledTab["lightpush"] =

View File

@ -0,0 +1,23 @@
{.push raises: [].}
import
json,
std/sets,
stew/byteutils,
strformat,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types
export types
proc encodeBytes*(value: PushRequest, contentType: string): RestResult[seq[byte]] =
return encodeBytesOf(value, contentType)
proc sendPushRequest*(
body: PushRequest
): RestResponse[string] {.
rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost
.}

View File

@ -0,0 +1,91 @@
{.push raises: [].}
import
std/strformat,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common
import
waku/node/peer_manager,
waku/waku_lightpush_legacy/common,
../../../waku_node,
../../handlers,
../serdes,
../responses,
../rest_serdes,
./types
export types
logScope:
topics = "waku node rest legacy lightpush api"
const FutTimeoutForPushRequestProcessing* = 5.seconds
const NoPeerNoDiscoError =
RestApiResponse.serviceUnavailable("No suitable service peer & no discovery method")
const NoPeerNoneFoundError =
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
proc useSelfHostedLightPush(node: WakuNode): bool =
return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil
#### Request handlers
const ROUTE_LIGHTPUSH = "/lightpush/v1/message"
proc installLightPushRequestHandler*(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPost, ROUTE_LIGHTPUSH) do(
contentBody: Option[ContentBody]
) -> RestApiResponse:
## Send a request to push a waku message
debug "post", ROUTE_LIGHTPUSH, contentBody
let decodedBody = decodeRequestBody[PushRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error()
let req: PushRequest = decodedBody.value()
let msg = req.message.toWakuMessage().valueOr:
return RestApiResponse.badRequest("Invalid message: " & $error)
var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId)
if useSelfHostedLightPush(node):
discard
else:
peer = node.peerManager.selectPeer(WakuLegacyLightPushCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscoError
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError("No value in peerOp: " & $error)
peerOp.valueOr:
return NoPeerNoneFoundError
let subFut = node.legacyLightpushPublish(req.pubsubTopic, msg, peer)
if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing):
error "Failed to request a message push due to timeout!"
return RestApiResponse.serviceUnavailable("Push request timed out")
if subFut.value().isErr():
if subFut.value().error == TooManyRequestsMessage:
return RestApiResponse.tooManyRequests("Request rate limmit reached")
return RestApiResponse.serviceUnavailable(
fmt("Failed to request a message push: {subFut.value().error}")
)
return RestApiResponse.ok()

View File

@ -0,0 +1,67 @@
{.push raises: [].}
import
std/[sets, strformat],
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client]
import ../../../waku_core, ../relay/types as relay_types, ../serdes
export relay_types
#### Types
type PushRequest* = object
pubsubTopic*: Option[PubSubTopic]
message*: RelayWakuMessage
#### Serialization and deserialization
proc writeValue*(
writer: var JsonWriter[RestJson], value: PushRequest
) {.raises: [IOError].} =
writer.beginRecord()
if value.pubsubTopic.isSome():
writer.writeField("pubsubTopic", value.pubsubTopic.get())
writer.writeField("message", value.message)
writer.endRecord()
proc readValue*(
reader: var JsonReader[RestJson], value: var PushRequest
) {.raises: [SerializationError, IOError].} =
var
pubsubTopic = none(PubsubTopic)
message = none(RelayWakuMessage)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err =
try:
fmt"Multiple `{fieldName}` fields found"
except CatchableError:
"Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "PushRequest")
case fieldName
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "message":
message = some(reader.readValue(RelayWakuMessage))
else:
unrecognizedFieldWarning(value)
if message.isNone():
reader.raiseUnexpectedValue("Field `message` is missing")
value = PushRequest(
pubsubTopic:
if pubsubTopic.isNone() or pubsubTopic.get() == "":
none(string)
else:
some(pubsubTopic.get()),
message: message.get(),
)

View File

@ -13,14 +13,11 @@ import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types
export types
logScope:
topics = "waku node rest client v2"
proc encodeBytes*(value: PushRequest, contentType: string): RestResult[seq[byte]] =
return encodeBytesOf(value, contentType)
proc sendPushRequest*(
body: PushRequest
): RestResponse[string] {.
rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost
): RestResponse[PushResponse] {.
rest, endpoint: "/lightpush/v3/message", meth: HttpMethod.MethodPost
.}

View File

@ -24,7 +24,7 @@ export types
logScope:
topics = "waku node rest lightpush api"
const futTimeoutForPushRequestProcessing* = 5.seconds
const FutTimeoutForPushRequestProcessing* = 5.seconds
const NoPeerNoDiscoError =
RestApiResponse.serviceUnavailable("No suitable service peer & no discovery method")
@ -33,11 +33,32 @@ const NoPeerNoneFoundError =
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
proc useSelfHostedLightPush(node: WakuNode): bool =
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil
proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode =
## Lightpush status codes are matching HTTP status codes by design
return HttpCode(statusCode.int32)
proc makeRestResponse(response: WakuLightPushResult): RestApiResponse =
var httpStatus: HttpCode = Http200
var apiResponse: PushResponse
if response.isOk():
apiResponse.relayPeerCount = some(response.get())
else:
httpStatus = convertErrorKindToHttpStatus(response.error().code)
apiResponse.statusDesc = response.error().desc
let restResp = RestApiResponse.jsonResponse(apiResponse, status = httpStatus).valueOr:
error "An error ocurred while building the json respose: ", error = error
return RestApiResponse.internalServerError(
fmt("An error ocurred while building the json respose: {error}")
)
return restResp
#### Request handlers
const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"
const ROUTE_LIGHTPUSH = "/lightpush/v3/message"
proc installLightPushRequestHandler*(
router: var RestRouter,
@ -50,21 +71,17 @@ proc installLightPushRequestHandler*(
## Send a request to push a waku message
debug "post", ROUTE_LIGHTPUSH, contentBody
let decodedBody = decodeRequestBody[PushRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error()
let req: PushRequest = decodedBody.value()
let req: PushRequest = decodeRequestBody[PushRequest](contentBody).valueOr:
return RestApiResponse.badRequest("Invalid push request: " & $error)
let msg = req.message.toWakuMessage().valueOr:
return RestApiResponse.badRequest("Invalid message: " & $error)
var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId)
var toPeer = none(RemotePeerInfo)
if useSelfHostedLightPush(node):
discard
else:
peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let aPeer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscoError
@ -73,19 +90,12 @@ proc installLightPushRequestHandler*(
peerOp.valueOr:
return NoPeerNoneFoundError
toPeer = some(aPeer)
let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer)
let subFut = node.lightpushPublish(req.pubsubTopic, msg, toPeer)
if not await subFut.withTimeout(futTimeoutForPushRequestProcessing):
if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing):
error "Failed to request a message push due to timeout!"
return RestApiResponse.serviceUnavailable("Push request timed out")
if subFut.value().isErr():
if subFut.value().error == TooManyRequestsMessage:
return RestApiResponse.tooManyRequests("Request rate limmit reached")
return RestApiResponse.serviceUnavailable(
fmt("Failed to request a message push: {subFut.value().error}")
)
return RestApiResponse.ok()
return makeRestResponse(subFut.value())

View File

@ -13,12 +13,16 @@ export relay_types
#### Types
type PushRequest* = object
pubsubTopic*: Option[PubSubTopic]
message*: RelayWakuMessage
type
PushRequest* = object
pubsubTopic*: Option[PubSubTopic]
message*: RelayWakuMessage
PushResponse* = object
statusDesc*: Option[string]
relayPeerCount*: Option[uint32]
#### Serialization and deserialization
proc writeValue*(
writer: var JsonWriter[RestJson], value: PushRequest
) {.raises: [IOError].} =
@ -65,3 +69,46 @@ proc readValue*(
some(pubsubTopic.get()),
message: message.get(),
)
proc writeValue*(
writer: var JsonWriter[RestJson], value: PushResponse
) {.raises: [IOError].} =
writer.beginRecord()
if value.statusDesc.isSome():
writer.writeField("statusDesc", value.statusDesc.get())
if value.relayPeerCount.isSome():
writer.writeField("relayPeerCount", value.relayPeerCount.get())
writer.endRecord()
proc readValue*(
reader: var JsonReader[RestJson], value: var PushResponse
) {.raises: [SerializationError, IOError].} =
var
statusDesc = none(string)
relayPeerCount = none(uint32)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err =
try:
fmt"Multiple `{fieldName}` fields found"
except CatchableError:
"Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "PushResponse")
case fieldName
of "statusDesc":
statusDesc = some(reader.readValue(string))
of "relayPeerCount":
relayPeerCount = some(reader.readValue(uint32))
else:
unrecognizedFieldWarning(value)
if relayPeerCount.isNone() and statusDesc.isNone():
reader.raiseUnexpectedValue(
"Fields are missing, either `relayPeerCount` or `statusDesc` must be present"
)
value = PushResponse(statusDesc: statusDesc, relayPeerCount: relayPeerCount)

View File

@ -4,6 +4,7 @@ import
./waku_core/message,
./waku_core/peers,
./waku_core/subscription,
./waku_core/multiaddrstr
./waku_core/multiaddrstr,
./waku_core/codecs
export topics, time, message, peers, subscription, multiaddrstr
export topics, time, message, peers, subscription, multiaddrstr, codecs

View File

@ -3,7 +3,8 @@ const
WakuStoreCodec* = "/vac/waku/store-query/3.0.0"
WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1"
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0"
WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"

View File

@ -257,7 +257,7 @@ proc parseUrlPeerAddr*(
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
## Converts an ENR to dialable RemotePeerInfo
let typedR = ?enr.toTypedRecord()
let typedR = TypedRecord.fromRecord(enr)
if not typedR.secp256k1.isSome():
return err("enr: no secp256k1 key in record")
@ -351,12 +351,8 @@ func hasUdpPort*(peer: RemotePeerInfo): bool =
let
enr = peer.enr.get()
typedEnrRes = enr.toTypedRecord()
typedEnr = TypedRecord.fromRecord(enr)
if typedEnrRes.isErr():
return false
let typedEnr = typedEnrRes.get()
typedEnr.udp.isSome() or typedEnr.udp6.isSome()
proc getAgent*(peer: RemotePeerInfo): string =

View File

@ -1,3 +1,3 @@
import ./waku_lightpush/protocol
import ./waku_lightpush/[protocol, common, rpc, rpc_codec, callbacks, self_req_handler]
export protocol
export protocol, common, rpc, rpc_codec, callbacks, self_req_handler

View File

@ -1,5 +1,7 @@
{.push raises: [].}
import stew/results
import
../waku_core,
../waku_relay,
@ -32,28 +34,28 @@ proc checkAndGenerateRLNProof*(
proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
): Future[WakuLightPushResult] {.async.} =
return lightpushResultInternalError("no waku relay found")
proc getRelayPushHandler*(
wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
): Future[WakuLightPushResult] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message)
if msgWithProof.isErr():
return err(msgWithProof.error)
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message).valueOr:
return lighpushErrorResult(OUT_OF_RLN_PROOF, error)
(await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr:
return err(error)
(await wakuRelay.validateMessage(pubSubTopic, msgWithProof)).isOkOr:
return lighpushErrorResult(INVALID_MESSAGE_ERROR, $error)
let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value)
if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let publishedResult = await wakuRelay.publish(pubsubTopic, msgWithProof)
if publishedResult.isErr():
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers", msg_hash = msgHash
return err(protocol_metrics.notPublishedAnyPeer)
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash, reason = $publishedResult.error
return mapPubishingErrorToPushResult(publishedResult.error)
return ok()
return lightpushSuccessResult(publishedResult.get().uint32)

View File

@ -30,80 +30,83 @@ proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc sendPushRequest(
wl: WakuLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
if connOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
wl: WakuLightPushClient, req: LightPushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult] {.async.} =
let connection = (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
NO_PEERS_TO_RELAY, dialFailure & ": " & $peer & " is not accessible"
)
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
await connection.writeLP(rpc.encode().buffer)
await connection.writeLP(req.encode().buffer)
var buffer: seq[byte]
try:
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
return err("Exception reading: " & getCurrentExceptionMsg())
error "Failed to read responose from peer", error = getCurrentExceptionMsg()
return lightpushResultInternalError(
"Failed to read response from peer: " & getCurrentExceptionMsg()
)
let decodeRespRes = PushRPC.decode(buffer)
if decodeRespRes.isErr():
let response = LightpushResponse.decode(buffer).valueOr:
error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
return lightpushResultInternalError(decodeRpcFailure)
let pushResponseRes = decodeRespRes.get()
if pushResponseRes.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)
if response.requestId != req.requestId and
response.statusCode != TOO_MANY_REQUESTS.uint32:
error "response failure, requestId mismatch",
requestId = req.requestId, responseRequestId = response.requestId
return lightpushResultInternalError("response failure, requestId mismatch")
let response = pushResponseRes.response.get()
if not response.isSuccess:
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")
return ok()
return toPushResult(response)
proc publish*(
wl: WakuLightPushClient,
pubSubTopic: PubsubTopic,
pubSubTopic: Option[PubsubTopic] = none(PubsubTopic),
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## On success, returns the msg_hash of the published message
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult] {.async, gcsafe.} =
when peer is PeerId:
info "publish",
peerId = shortLog(peer),
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
else:
info "publish",
peerId = shortLog(peer.peerId),
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
let pushRequest = LightpushRequest(
requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message
)
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
obs.onMessagePublished(pubSubTopic.get(""), message)
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msg_hash_hex_str
return ok(msg_hash_hex_str)
return lightpushSuccessResult(publishedCount)
proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
): Future[WakuLightPushResult] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers")
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
let pushRequest = LightpushRequest(
requestId: generateRequestId(wl.rng),
pubSubTopic: some(pubSubTopic),
message: message,
)
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok()
return lightpushSuccessResult(publishedCount)

View File

@ -1,15 +1,82 @@
{.push raises: [].}
import results, chronos, libp2p/peerid
import ../waku_core
import std/options, results, chronos, libp2p/peerid
import ../waku_core, ./rpc, ../waku_relay/protocol
from ../waku_core/codecs import WakuLightPushCodec
export WakuLightPushCodec
type WakuLightPushResult*[T] = Result[T, string]
type LightpushStatusCode* = enum
SUCCESS = uint32(200)
BAD_REQUEST = uint32(400)
PAYLOAD_TOO_LARGE = uint32(413)
INVALID_MESSAGE_ERROR = uint32(420)
UNSUPPORTED_PUBSUB_TOPIC = uint32(421)
TOO_MANY_REQUESTS = uint32(429)
INTERNAL_SERVER_ERROR = uint32(500)
NO_PEERS_TO_RELAY = uint32(503)
OUT_OF_RLN_PROOF = uint32(504)
SERVICE_NOT_AVAILABLE = uint32(505)
type ErrorStatus* = tuple[code: LightpushStatusCode, desc: Option[string]]
type WakuLightPushResult* = Result[uint32, ErrorStatus]
type PushMessageHandler* = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}
): Future[WakuLightPushResult] {.async.}
const TooManyRequestsMessage* = "TOO_MANY_REQUESTS"
const TooManyRequestsMessage* = "Request rejected due to too many requests"
func isSuccess*(response: LightPushResponse): bool =
return response.statusCode == LightpushStatusCode.SUCCESS.uint32
func toPushResult*(response: LightPushResponse): WakuLightPushResult =
if isSuccess(response):
return ok(response.relayPeerCount.get(0))
else:
return err((response.statusCode.LightpushStatusCode, response.statusDesc))
func lightpushSuccessResult*(relayPeerCount: uint32): WakuLightPushResult =
return ok(relayPeerCount)
func lightpushResultInternalError*(msg: string): WakuLightPushResult =
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(msg)))
func lighpushErrorResult*(
statusCode: LightpushStatusCode, desc: Option[string]
): WakuLightPushResult =
return err((statusCode, desc))
func lighpushErrorResult*(
statusCode: LightpushStatusCode, desc: string
): WakuLightPushResult =
return err((statusCode, some(desc)))
func mapPubishingErrorToPushResult*(
publishOutcome: PublishOutcome
): WakuLightPushResult =
case publishOutcome
of NoTopicSpecified:
return err(
(LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Empty topic, skipping publish"))
)
of DuplicateMessage:
return err(
(LightpushStatusCode.INVALID_MESSAGE_ERROR, some("Dropping already-seen message"))
)
of NoPeersToPublish:
return err(
(
LightpushStatusCode.NO_PEERS_TO_RELAY,
some("No peers for topic, skipping publish"),
)
)
of CannotGenerateMessageId:
return err(
(
LightpushStatusCode.INTERNAL_SERVER_ERROR,
some("Error generating message id, skipping publish"),
)
)
else:
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, none[string]()))

View File

@ -1,9 +1,17 @@
{.push raises: [].}
import std/options, results, stew/byteutils, chronicles, chronos, metrics, bearssl/rand
import
std/[options, strutils],
results,
stew/byteutils,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_core/topics/sharding,
./common,
./rpc,
./rpc_codec,
@ -18,55 +26,90 @@ type WakuLightPush* = ref object of LPProtocol
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
sharding: Sharding
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
): Future[PushRPC] {.async.} =
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
pushResponseInfo = ""
requestId = ""
): Future[LightPushResponse] {.async.} =
let reqDecodeRes = LightpushRequest.decode(buffer)
var isSuccess = false
var pushResponse: LightpushResponse
if reqDecodeRes.isErr():
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
pushResponse = LightpushResponse(
requestId: "N/A", # due to decode failure we don't know requestId
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error),
)
else:
let pushRpcRequest = reqDecodeRes.get()
let pushRequest = reqDecodeRes.get()
requestId = pushRpcRequest.requestId
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32,
statusDesc: some(msg),
)
let
request = pushRpcRequest.request
wl.sharding.getShard(parsedTopic).valueOr:
let msg = "Autosharding error: " & error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32,
statusDesc: some(msg),
)
pubSubTopic = request.get().pubSubTopic
message = request.get().message
# ensure checking topic will not cause error at gossipsub level
if pubsubTopic.isEmptyOrWhitespace():
let msg = "topic must not be empty"
error "lightpush request handling error", error = msg
return LightPushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(msg),
)
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
notice "handling lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
requestId = pushRequest.requestId,
pubsubTopic = pushRequest.pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex(),
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
pushResponse = LightpushResponse(
requestId: pushRequest.requestId,
statusCode:
if isSuccess:
LightpushStatusCode.SUCCESS.uint32
else:
handleRes.error.code.uint32,
statusDesc:
if isSuccess:
none[string]()
else:
handleRes.error.desc,
)
if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error = pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
let rpc = PushRPC(requestId: requestId, response: some(response))
return rpc
waku_lightpush_v3_errors.inc(
labelValues = [pushResponse.statusDesc.valueOr("unknown")]
)
error "failed to push message", error = pushResponse.statusDesc
return pushResponse
proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
var rpc: PushRPC
var rpc: LightpushResponse
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
let buffer = await conn.readLp(DefaultMaxRpcSize)
@ -80,13 +123,13 @@ proc initProtocolHandler(wl: WakuLightPush) =
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
PushRPC(
LightpushResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
response:
some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))),
statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32,
statusDesc: some(TooManyRequestsMessage),
)
)
@ -103,6 +146,7 @@ proc new*(
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
sharding: Sharding,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
@ -110,6 +154,7 @@ proc new*(
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
sharding: sharding,
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)

View File

@ -2,9 +2,9 @@
import metrics
declarePublicGauge waku_lightpush_errors,
declarePublicGauge waku_lightpush_v3_errors,
"number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages,
declarePublicGauge waku_lightpush_v3_messages,
"number of lightpush messages received", ["type"]
# Error types (metric label values)

View File

@ -4,15 +4,13 @@ import std/options
import ../waku_core
type
PushRequest* = object
pubSubTopic*: string
LightpushRequest* = object
requestId*: string
pubSubTopic*: Option[PubsubTopic]
message*: WakuMessage
PushResponse* = object
isSuccess*: bool
info*: Option[string]
PushRPC* = object
LightPushResponse* = object
requestId*: string
request*: Option[PushRequest]
response*: Option[PushResponse]
statusCode*: uint32
statusDesc*: Option[string]
relayPeerCount*: Option[uint32]

View File

@ -5,73 +5,19 @@ import ../common/protobuf, ../waku_core, ./rpc
const DefaultMaxRpcSize* = -1
proc encode*(rpc: PushRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.pubSubTopic)
pb.write3(2, rpc.message.encode())
pb.finish3()
pb
proc decode*(T: type PushRequest, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRequest()
var pubSubTopic: PubsubTopic
if not ?pb.getField(1, pubSubTopic):
return err(ProtobufError.missingRequiredField("pubsub_topic"))
else:
rpc.pubSubTopic = pubSubTopic
var messageBuf: seq[byte]
if not ?pb.getField(2, messageBuf):
return err(ProtobufError.missingRequiredField("message"))
else:
rpc.message = ?WakuMessage.decode(messageBuf)
ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, uint64(rpc.isSuccess))
pb.write3(2, rpc.info)
pb.finish3()
pb
proc decode*(T: type PushResponse, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushResponse()
var isSuccess: uint64
if not ?pb.getField(1, isSuccess):
return err(ProtobufError.missingRequiredField("is_success"))
else:
rpc.isSuccess = bool(isSuccess)
var info: string
if not ?pb.getField(2, info):
rpc.info = none(string)
else:
rpc.info = some(info)
ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer =
proc encode*(rpc: LightpushRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.response.map(encode))
pb.write3(20, rpc.pubSubTopic)
pb.write3(21, rpc.message.encode())
pb.finish3()
pb
return pb
proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] =
proc decode*(T: type LightpushRequest, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRPC()
var rpc = LightpushRequest()
var requestId: string
if not ?pb.getField(1, requestId):
@ -79,18 +25,57 @@ proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] =
else:
rpc.requestId = requestId
var requestBuffer: seq[byte]
if not ?pb.getField(2, requestBuffer):
rpc.request = none(PushRequest)
var pubSubTopic: PubsubTopic
if not ?pb.getField(20, pubSubTopic):
rpc.pubSubTopic = none(PubsubTopic)
else:
let request = ?PushRequest.decode(requestBuffer)
rpc.request = some(request)
rpc.pubSubTopic = some(pubSubTopic)
var responseBuffer: seq[byte]
if not ?pb.getField(3, responseBuffer):
rpc.response = none(PushResponse)
var messageBuf: seq[byte]
if not ?pb.getField(21, messageBuf):
return err(ProtobufError.missingRequiredField("message"))
else:
let response = ?PushResponse.decode(responseBuffer)
rpc.response = some(response)
rpc.message = ?WakuMessage.decode(messageBuf)
ok(rpc)
return ok(rpc)
proc encode*(rpc: LightPushResponse): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.requestId)
pb.write3(10, rpc.statusCode)
pb.write3(11, rpc.statusDesc)
pb.write3(12, rpc.relayPeerCount)
pb.finish3()
return pb
proc decode*(T: type LightPushResponse, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = LightPushResponse()
var requestId: string
if not ?pb.getField(1, requestId):
return err(ProtobufError.missingRequiredField("request_id"))
else:
rpc.requestId = requestId
var statusCode: uint32
if not ?pb.getField(10, statusCode):
return err(ProtobufError.missingRequiredField("status_code"))
else:
rpc.statusCode = statusCode
var statusDesc: string
if not ?pb.getField(11, statusDesc):
rpc.statusDesc = none(string)
else:
rpc.statusDesc = some(statusDesc)
var relayPeerCount: uint32
if not ?pb.getField(12, relayPeerCount):
rpc.relayPeerCount = none(uint32)
else:
rpc.relayPeerCount = some(relayPeerCount)
return ok(rpc)

View File

@ -20,8 +20,8 @@ import
../utils/requests
proc handleSelfLightPushRequest*(
self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[string]] {.async.} =
self: WakuLightPush, pubSubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
## Handles the lightpush requests made by the node to itself.
## Normally used in REST-lightpush requests
## On success, returns the msg_hash of the published message.
@ -30,30 +30,14 @@ proc handleSelfLightPushRequest*(
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
let selfPeerId = self.peerManager.switch.peerInfo.peerId
let req = PushRequest(pubSubTopic: pubSubTopic, message: message)
let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req))
let req = LightpushRequest(
requestId: generateRequestId(self.rng), pubSubTopic: pubSubTopic, message: message
)
let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer)
let response = await self.handleRequest(selfPeerId, req.encode().buffer)
if respRpc.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)
let response = respRpc.response.get()
if not response.isSuccess:
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")
let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex()
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
self_peer_id = selfPeerId,
msg_hash = msg_hash_hex_str
return ok(msg_hash_hex_str)
return response.toPushResult()
except Exception:
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())
return lightPushResultInternalError(
"exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg()
)

View File

@ -0,0 +1,5 @@
import
./waku_lightpush_legacy/
[protocol, common, rpc, rpc_codec, callbacks, self_req_handler]
export protocol, common, rpc, rpc_codec, callbacks, self_req_handler

View File

@ -0,0 +1,62 @@
{.push raises: [].}
import
../waku_core,
../waku_relay,
./common,
./protocol_metrics,
../waku_rln_relay,
../waku_rln_relay/protocol_types
import std/times, libp2p/peerid, stew/byteutils
proc checkAndGenerateRLNProof*(
rlnPeer: Option[WakuRLNRelay], message: WakuMessage
): Result[WakuMessage, string] =
# check if the message already has RLN proof
if message.proof.len > 0:
return ok(message)
if rlnPeer.isNone():
notice "Publishing message without RLN proof"
return ok(message)
# generate and append RLN proof
let
time = getTime().toUnix()
senderEpochTime = float64(time)
var msgWithProof = message
rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr:
return err(error)
return ok(msgWithProof)
proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
proc getRelayPushHandler*(
wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message)
if msgWithProof.isErr():
return err(msgWithProof.error)
(await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr:
return err(error)
let publishResult = await wakuRelay.publish(pubsubTopic, msgWithProof.value)
if publishResult.isErr():
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash, reason = $publishResult.error
# for legacy lightpush we do not detail the reason towards clients. All error during publish result in not-published-to-any-peer
# this let client of the legacy protocol to react as they did so far.
return err(protocol_metrics.notPublishedAnyPeer)
return ok()

View File

@ -0,0 +1,111 @@
{.push raises: [].}
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,
./rpc,
./rpc_codec
logScope:
topics = "waku lightpush legacy client"
type WakuLegacyLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLegacyLightPushClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
): T =
WakuLegacyLightPushClient(peerManager: peerManager, rng: rng)
proc addPublishObserver*(wl: WakuLegacyLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc sendPushRequest(
wl: WakuLegacyLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let connOpt = await wl.peerManager.dialPeer(peer, WakuLegacyLightPushCodec)
if connOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
await connection.writeLP(rpc.encode().buffer)
var buffer: seq[byte]
try:
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
return err("Exception reading: " & getCurrentExceptionMsg())
let decodeRespRes = PushRPC.decode(buffer)
if decodeRespRes.isErr():
error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
let pushResponseRes = decodeRespRes.get()
if pushResponseRes.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)
let response = pushResponseRes.response.get()
if not response.isSuccess:
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")
return ok()
proc publish*(
wl: WakuLegacyLightPushClient,
pubSubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[string]] {.async, gcsafe.} =
## On success, returns the msg_hash of the published message
let msg_hash_hex_str = computeMessageHash(pubsubTopic, message).to0xHex()
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msg_hash_hex_str
return ok(msg_hash_hex_str)
proc publishToAny*(
wl: WakuLegacyLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager
info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
let peer = wl.peerManager.selectPeer(WakuLegacyLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLegacyLightPushCodec")
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok()

View File

@ -0,0 +1,15 @@
{.push raises: [].}
import results, chronos, libp2p/peerid
import ../waku_core
from ../waku_core/codecs import WakuLegacyLightPushCodec
export WakuLegacyLightPushCodec
type WakuLightPushResult*[T] = Result[T, string]
type PushMessageHandler* = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}
const TooManyRequestsMessage* = "TOO_MANY_REQUESTS"

View File

@ -0,0 +1,113 @@
{.push raises: [].}
import std/options, results, stew/byteutils, chronicles, chronos, metrics, bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush legacy"
type WakuLegacyLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
proc handleRequest*(
wl: WakuLegacyLightPush, peerId: PeerId, buffer: seq[byte]
): Future[PushRPC] {.async.} =
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
pushResponseInfo = ""
requestId = ""
if reqDecodeRes.isErr():
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
else:
let pushRpcRequest = reqDecodeRes.get()
requestId = pushRpcRequest.requestId
let
request = pushRpcRequest.request
pubSubTopic = request.get().pubSubTopic
message = request.get().message
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
notice "handling lightpush request",
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error = pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
let rpc = PushRPC(requestId: requestId, response: some(response))
return rpc
proc initProtocolHandler(wl: WakuLegacyLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
var rpc: PushRPC
wl.requestRateLimiter.checkUsageLimit(WakuLegacyLightPushCodec, conn):
let buffer = await conn.readLp(DefaultMaxRpcSize)
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLegacyLightPushCodec, "in"]
)
rpc = await handleRequest(wl, conn.peerId, buffer)
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
PushRPC(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
response:
some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))),
)
)
await conn.writeLp(rpc.encode().buffer)
## For lightpush might not worth to measure outgoing trafic as it is only
## small respones about success/failure
wl.handler = handle
wl.codec = WakuLegacyLightPushCodec
proc new*(
T: type WakuLegacyLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLegacyLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLegacyLightPushCodec, rateLimitSetting)
return wl

View File

@ -0,0 +1,19 @@
{.push raises: [].}
import metrics
declarePublicGauge waku_lightpush_errors,
"number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages,
"number of lightpush messages received", ["type"]
# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
peerNotFoundFailure* = "peer_not_found_failure"
emptyRequestBodyFailure* = "empty_request_body_failure"
emptyResponseBodyFailure* = "empty_response_body_failure"
messagePushFailure* = "message_push_failure"
requestLimitReachedFailure* = "request_limit_reached_failure"
notPublishedAnyPeer* = "not_published_to_any_peer"

View File

@ -0,0 +1,18 @@
{.push raises: [].}
import std/options
import ../waku_core
type
PushRequest* = object
pubSubTopic*: string
message*: WakuMessage
PushResponse* = object
isSuccess*: bool
info*: Option[string]
PushRPC* = object
requestId*: string
request*: Option[PushRequest]
response*: Option[PushResponse]

View File

@ -0,0 +1,96 @@
{.push raises: [].}
import std/options
import ../common/protobuf, ../waku_core, ./rpc
const DefaultMaxRpcSize* = -1
proc encode*(rpc: PushRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.pubSubTopic)
pb.write3(2, rpc.message.encode())
pb.finish3()
pb
proc decode*(T: type PushRequest, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRequest()
var pubSubTopic: PubsubTopic
if not ?pb.getField(1, pubSubTopic):
return err(ProtobufError.missingRequiredField("pubsub_topic"))
else:
rpc.pubSubTopic = pubSubTopic
var messageBuf: seq[byte]
if not ?pb.getField(2, messageBuf):
return err(ProtobufError.missingRequiredField("message"))
else:
rpc.message = ?WakuMessage.decode(messageBuf)
ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, uint64(rpc.isSuccess))
pb.write3(2, rpc.info)
pb.finish3()
pb
proc decode*(T: type PushResponse, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushResponse()
var isSuccess: uint64
if not ?pb.getField(1, isSuccess):
return err(ProtobufError.missingRequiredField("is_success"))
else:
rpc.isSuccess = bool(isSuccess)
var info: string
if not ?pb.getField(2, info):
rpc.info = none(string)
else:
rpc.info = some(info)
ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.response.map(encode))
pb.finish3()
pb
proc decode*(T: type PushRPC, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRPC()
var requestId: string
if not ?pb.getField(1, requestId):
return err(ProtobufError.missingRequiredField("request_id"))
else:
rpc.requestId = requestId
var requestBuffer: seq[byte]
if not ?pb.getField(2, requestBuffer):
rpc.request = none(PushRequest)
else:
let request = ?PushRequest.decode(requestBuffer)
rpc.request = some(request)
var responseBuffer: seq[byte]
if not ?pb.getField(3, responseBuffer):
rpc.response = none(PushResponse)
else:
let response = ?PushResponse.decode(responseBuffer)
rpc.response = some(response)
ok(rpc)

View File

@ -0,0 +1,59 @@
{.push raises: [].}
## Notice that the REST /lightpush requests normally assume that the node
## is acting as a lightpush-client that will trigger the service provider node
## to relay the message.
## In this module, we allow that a lightpush service node (full node) can be
## triggered directly through the REST /lightpush endpoint.
## The typical use case for that is when using `nwaku-compose`,
## which spawn a full service Waku node
## that could be used also as a lightpush client, helping testing and development.
import results, chronos, chronicles, std/options, metrics, stew/byteutils
import
../waku_core,
./protocol,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../utils/requests
proc handleSelfLightPushRequest*(
self: WakuLegacyLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[string]] {.async.} =
## Handles the lightpush requests made by the node to itself.
## Normally used in REST-lightpush requests
## On success, returns the msg_hash of the published message.
try:
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
let selfPeerId = self.peerManager.switch.peerInfo.peerId
let req = PushRequest(pubSubTopic: pubSubTopic, message: message)
let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req))
let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer)
if respRpc.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)
let response = respRpc.response.get()
if not response.isSuccess:
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")
let msg_hash_hex_str = computeMessageHash(pubSubTopic, message).to0xHex()
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
self_peer_id = selfPeerId,
msg_hash = msg_hash_hex_str
return ok(msg_hash_hex_str)
except Exception:
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())

View File

@ -5,7 +5,7 @@
{.push raises: [].}
import
std/strformat,
std/[strformat, strutils],
stew/byteutils,
results,
sequtils,
@ -13,7 +13,6 @@ import
chronicles,
metrics,
libp2p/multihash,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
@ -136,6 +135,13 @@ type
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]
# predefinition for more detailed results from publishing new message
type PublishOutcome* {.pure.} = enum
NoTopicSpecified
DuplicateMessage
NoPeersToPublish
CannotGenerateMessageId
proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -514,7 +520,10 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[int] {.async.} =
): Future[Result[int, PublishOutcome]] {.async.} =
if pubsubTopic.isEmptyOrWhitespace():
return err(NoTopicSpecified)
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
@ -522,11 +531,13 @@ proc publish*(
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
if relayedPeerCount > 0:
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
if relayedPeerCount <= 0:
return err(NoPeersToPublish)
return relayedPeerCount
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok(relayedPeerCount)
proc getNumConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic