mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 00:43:06 +00:00
feat(lightpush): add waku lightpush protocol client
This commit is contained in:
parent
a16c80f58c
commit
8999f7ccd1
@ -4,7 +4,6 @@ import
|
|||||||
# ./v2/test_waku,
|
# ./v2/test_waku,
|
||||||
./v2/test_wakunode,
|
./v2/test_wakunode,
|
||||||
./v2/test_wakunode_relay,
|
./v2/test_wakunode_relay,
|
||||||
./v2/test_wakunode_lightpush,
|
|
||||||
# Waku Store
|
# Waku Store
|
||||||
./v2/test_message_store_queue_index,
|
./v2/test_message_store_queue_index,
|
||||||
./v2/test_message_store_queue_pagination,
|
./v2/test_message_store_queue_pagination,
|
||||||
@ -17,6 +16,9 @@ import
|
|||||||
# TODO: Re-enable store resume test cases (#1282)
|
# TODO: Re-enable store resume test cases (#1282)
|
||||||
# ./v2/test_waku_store_resume,
|
# ./v2/test_waku_store_resume,
|
||||||
./v2/test_wakunode_store,
|
./v2/test_wakunode_store,
|
||||||
|
# Waku LightPush
|
||||||
|
./v2/test_waku_lightpush,
|
||||||
|
./v2/test_wakunode_lightpush,
|
||||||
# Waku Filter
|
# Waku Filter
|
||||||
./v2/test_waku_filter,
|
./v2/test_waku_filter,
|
||||||
./v2/test_wakunode_filter,
|
./v2/test_wakunode_filter,
|
||||||
|
|||||||
@ -5,78 +5,119 @@ import
|
|||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/switch,
|
|
||||||
libp2p/crypto/crypto
|
libp2p/crypto/crypto
|
||||||
import
|
import
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/protocol/waku_lightpush,
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
../test_helpers,
|
../../waku/v2/protocol/waku_lightpush/client,
|
||||||
./testlib/common
|
./testlib/common,
|
||||||
|
./testlib/switch
|
||||||
|
|
||||||
|
|
||||||
# TODO: Extend lightpush protocol test coverage
|
proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
|
||||||
procSuite "Waku Lightpush":
|
|
||||||
|
|
||||||
asyncTest "handle light push request success":
|
|
||||||
# TODO: Move here the test case at test_wakunode: light push request success
|
|
||||||
discard
|
|
||||||
|
|
||||||
asyncTest "handle light push request fail":
|
|
||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
peerManager = PeerManager.new(switch)
|
||||||
listenSwitch = newStandardSwitch(some(key))
|
|
||||||
await listenSwitch.start()
|
|
||||||
|
|
||||||
let dialSwitch = newStandardSwitch()
|
|
||||||
await dialSwitch.start()
|
|
||||||
|
|
||||||
|
|
||||||
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
|
||||||
# TODO Success return here
|
|
||||||
debug "handle push req"
|
|
||||||
check:
|
|
||||||
1 == 0
|
|
||||||
|
|
||||||
# FIXME Unclear how we want to use subscriptions, if at all
|
|
||||||
let
|
|
||||||
peerManager = PeerManager.new(dialSwitch)
|
|
||||||
rng = crypto.newRng()
|
rng = crypto.newRng()
|
||||||
proto = WakuLightPush.init(peerManager, rng, requestHandler)
|
proto = WakuLightPush.new(peerManager, rng, handler)
|
||||||
|
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
await proto.start()
|
||||||
waitFor proto.start()
|
switch.mount(proto)
|
||||||
dialSwitch.mount(proto)
|
|
||||||
|
|
||||||
|
return proto
|
||||||
|
|
||||||
# TODO Can possibly get rid of this if it isn't dynamic
|
proc newTestWakuLightpushClient(switch: Switch): WakuLightPushClient =
|
||||||
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
|
||||||
debug "push request handler"
|
|
||||||
# TODO: Also relay message
|
|
||||||
# TODO: Here we want to send back response with is_success true
|
|
||||||
discard
|
|
||||||
|
|
||||||
let
|
let
|
||||||
peerManager2 = PeerManager.new(listenSwitch)
|
peerManager = PeerManager.new(switch)
|
||||||
rng2 = crypto.newRng()
|
rng = crypto.newRng()
|
||||||
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
|
WakuLightPushClient.new(peerManager, rng)
|
||||||
waitFor proto2.start()
|
|
||||||
listenSwitch.mount(proto2)
|
|
||||||
|
|
||||||
|
|
||||||
|
suite "Waku Lightpush":
|
||||||
|
|
||||||
|
asyncTest "push message to pubsub topic is successful":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
|
let handlerFuture = newFuture[(string, WakuMessage)]()
|
||||||
|
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
|
handlerFuture.complete((pubsubTopic, message))
|
||||||
|
return ok()
|
||||||
|
|
||||||
let
|
let
|
||||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
|
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||||
rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic)
|
client = newTestWakuLightpushClient(clientSwitch)
|
||||||
|
|
||||||
|
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
|
||||||
|
let
|
||||||
|
topic = DefaultPubsubTopic
|
||||||
|
message = fakeWakuMessage()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = await proto.request(rpc)
|
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
||||||
|
let requestRes = await client.request(rpc, serverPeerId)
|
||||||
|
|
||||||
|
require await handlerFuture.withTimeout(100.millis)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check res.isOk()
|
|
||||||
let response = res.get()
|
|
||||||
check:
|
check:
|
||||||
not response.isSuccess
|
requestRes.isOk()
|
||||||
|
handlerFuture.finished()
|
||||||
|
|
||||||
|
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
||||||
|
check:
|
||||||
|
handledMessagePubsubTopic == topic
|
||||||
|
handledMessage == message
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(listenSwitch.stop(), dialSwitch.stop())
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "push message to pubsub topic should fail":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let error = "test_failure"
|
||||||
|
|
||||||
|
let handlerFuture = newFuture[void]()
|
||||||
|
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
|
handlerFuture.complete()
|
||||||
|
return err(error)
|
||||||
|
|
||||||
|
let
|
||||||
|
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||||
|
client = newTestWakuLightpushClient(clientSwitch)
|
||||||
|
|
||||||
|
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
|
||||||
|
let
|
||||||
|
topic = DefaultPubsubTopic
|
||||||
|
message = fakeWakuMessage()
|
||||||
|
|
||||||
|
## When
|
||||||
|
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
||||||
|
let requestRes = await client.request(rpc, serverPeerId)
|
||||||
|
|
||||||
|
require await handlerFuture.withTimeout(100.millis)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
requestRes.isErr()
|
||||||
|
handlerFuture.finished()
|
||||||
|
|
||||||
|
let requestError = requestRes.error
|
||||||
|
check:
|
||||||
|
requestError == error
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|||||||
@ -59,7 +59,7 @@ procSuite "WakuNode - Lightpush":
|
|||||||
## When
|
## When
|
||||||
let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
|
let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
|
||||||
|
|
||||||
require (await completionFutRelay.withTimeout(5.seconds)) == true
|
require await completionFutRelay.withTimeout(5.seconds)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check lightpushRes.isOk()
|
check lightpushRes.isOk()
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import
|
import
|
||||||
std/times
|
std/times,
|
||||||
|
stew/byteutils
|
||||||
import
|
import
|
||||||
../../../waku/v2/protocol/waku_message,
|
../../../waku/v2/protocol/waku_message,
|
||||||
../../../waku/v2/utils/time
|
../../../waku/v2/utils/time
|
||||||
|
|||||||
@ -9,8 +9,8 @@ import
|
|||||||
metrics/chronos_httpserver
|
metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_store/protocol_metrics,
|
../protocol/waku_store/protocol_metrics as store_metrics,
|
||||||
../protocol/waku_lightpush,
|
../protocol/waku_lightpush/protocol_metrics as lightpush_metrics,
|
||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_peer_exchange,
|
../protocol/waku_peer_exchange,
|
||||||
../utils/collector,
|
../utils/collector,
|
||||||
|
|||||||
@ -361,21 +361,16 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf
|
|||||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||||
## `contentTopic` field for light node functionality. This field may be also
|
## `contentTopic` field for light node functionality. This field may be also
|
||||||
## be omitted.
|
## be omitted.
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
if node.wakuRelay.isNil:
|
if node.wakuRelay.isNil():
|
||||||
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||||
# @TODO improved error handling
|
# TODO: Improve error handling
|
||||||
return
|
return
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
|
||||||
trace "publish", topic=topic, contentTopic=message.contentTopic
|
trace "publish", topic=topic, contentTopic=message.contentTopic
|
||||||
var publishingMessage = message
|
|
||||||
|
|
||||||
let data = message.encode().buffer
|
let data = message.encode().buffer
|
||||||
|
discard await node.wakuRelay.publish(topic, data)
|
||||||
discard await wakuRelay.publish(topic, data)
|
|
||||||
|
|
||||||
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
||||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||||
@ -421,7 +416,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
|
|||||||
|
|
||||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||||
|
|
||||||
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||||
proc info*(node: WakuNode): WakuInfo =
|
proc info*(node: WakuNode): WakuInfo =
|
||||||
## Returns information about the Node, such as what multiaddress it can be reached at.
|
## Returns information about the Node, such as what multiaddress it can be reached at.
|
||||||
##
|
##
|
||||||
@ -594,15 +589,22 @@ proc mountRelay*(node: WakuNode,
|
|||||||
|
|
||||||
info "relay mounted successfully"
|
info "relay mounted successfully"
|
||||||
|
|
||||||
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
proc mountLightPush*(node: WakuNode) {.async.} =
|
||||||
info "mounting light push"
|
info "mounting light push"
|
||||||
|
|
||||||
if node.wakuRelay.isNil:
|
var pushHandler: PushMessageHandler
|
||||||
debug "mounting lightpush without relay"
|
if node.wakuRelay.isNil():
|
||||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
|
debug "mounting lightpush without relay (nil)"
|
||||||
|
# TODO: Remove after using waku lightpush client
|
||||||
|
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
|
return err("no waku relay found")
|
||||||
else:
|
else:
|
||||||
|
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||||
|
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
|
||||||
|
return ok()
|
||||||
|
|
||||||
debug "mounting lightpush with relay"
|
debug "mounting lightpush with relay"
|
||||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
|
node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler)
|
||||||
|
|
||||||
if node.started:
|
if node.started:
|
||||||
# Node has started already. Let's start lightpush too.
|
# Node has started already. Let's start lightpush too.
|
||||||
|
|||||||
79
waku/v2/protocol/waku_lightpush/client.nim
Normal file
79
waku/v2/protocol/waku_lightpush/client.nim
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/options,
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
metrics,
|
||||||
|
bearssl/rand
|
||||||
|
import
|
||||||
|
../../node/peer_manager/peer_manager,
|
||||||
|
../../utils/requests,
|
||||||
|
./protocol,
|
||||||
|
./protocol_metrics,
|
||||||
|
./rpc,
|
||||||
|
./rpc_codec
|
||||||
|
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakulightpush.client"
|
||||||
|
|
||||||
|
|
||||||
|
type WakuLightPushClient* = ref object
|
||||||
|
peerManager*: PeerManager
|
||||||
|
rng*: ref rand.HmacDrbgContext
|
||||||
|
|
||||||
|
|
||||||
|
proc new*(T: type WakuLightPushClient,
|
||||||
|
peerManager: PeerManager,
|
||||||
|
rng: ref rand.HmacDrbgContext): T =
|
||||||
|
WakuLightPushClient(peerManager: peerManager, rng: rng)
|
||||||
|
|
||||||
|
|
||||||
|
proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
|
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||||
|
if connOpt.isNone():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
|
return err(dialFailure)
|
||||||
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||||
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
var message = await connection.readLp(MaxRpcSize.int)
|
||||||
|
let decodeRespRes = PushRPC.init(message)
|
||||||
|
if decodeRespRes.isErr():
|
||||||
|
error "failed to decode response"
|
||||||
|
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return err(decodeRpcFailure)
|
||||||
|
|
||||||
|
let pushResponseRes = decodeRespRes.get()
|
||||||
|
if pushResponseRes.response == PushResponse():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
|
||||||
|
return err(emptyResponseBodyFailure)
|
||||||
|
|
||||||
|
let response = pushResponseRes.response
|
||||||
|
if not response.isSuccess:
|
||||||
|
if response.info != "":
|
||||||
|
return err(response.info)
|
||||||
|
else:
|
||||||
|
return err("unknown failure")
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
|
||||||
|
### Set lightpush peer and send push requests
|
||||||
|
|
||||||
|
proc setPeer*(wl: WakuLightPushClient, peer: RemotePeerInfo) =
|
||||||
|
wl.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||||
|
waku_lightpush_peers.inc()
|
||||||
|
|
||||||
|
proc request*(wl: WakuLightPushClient, req: PushRequest): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
|
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
error "no suitable remote peers"
|
||||||
|
waku_lightpush_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err(peerNotFoundFailure)
|
||||||
|
|
||||||
|
return await wl.request(req, peerOpt.get())
|
||||||
@ -6,116 +6,86 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
metrics,
|
metrics,
|
||||||
bearssl/rand,
|
bearssl/rand
|
||||||
libp2p/crypto/crypto
|
|
||||||
|
|
||||||
import
|
import
|
||||||
../waku_message,
|
../waku_message,
|
||||||
../waku_relay,
|
../waku_relay,
|
||||||
../../node/peer_manager/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
./rpc,
|
./rpc,
|
||||||
./rpc_codec
|
./rpc_codec,
|
||||||
|
./protocol_metrics
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakulightpush"
|
topics = "wakulightpush"
|
||||||
|
|
||||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
|
||||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
|
||||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
|
||||||
|
|
||||||
|
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||||
const
|
|
||||||
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
|
||||||
|
|
||||||
const
|
|
||||||
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
|
||||||
|
|
||||||
# Error types (metric label values)
|
|
||||||
const
|
|
||||||
dialFailure = "dial_failure"
|
|
||||||
decodeRpcFailure = "decode_rpc_failure"
|
|
||||||
|
|
||||||
|
|
||||||
type
|
type
|
||||||
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
|
||||||
|
|
||||||
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
|
||||||
|
|
||||||
WakuLightPushResult*[T] = Result[T, string]
|
WakuLightPushResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
||||||
|
|
||||||
WakuLightPush* = ref object of LPProtocol
|
WakuLightPush* = ref object of LPProtocol
|
||||||
rng*: ref rand.HmacDrbgContext
|
rng*: ref rand.HmacDrbgContext
|
||||||
peerManager*: PeerManager
|
peerManager*: PeerManager
|
||||||
requestHandler*: PushRequestHandler
|
pushHandler*: PushMessageHandler
|
||||||
relayReference*: WakuRelay
|
|
||||||
|
|
||||||
|
|
||||||
proc init*(wl: WakuLightPush) =
|
|
||||||
|
|
||||||
|
proc initProtocolHandler*(wl: WakuLightPush) =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
let message = await conn.readLp(MaxRpcSize.int)
|
let buffer = await conn.readLp(MaxRpcSize.int)
|
||||||
let res = PushRPC.init(message)
|
let reqDecodeRes = PushRPC.init(buffer)
|
||||||
if res.isErr():
|
if reqDecodeRes.isErr():
|
||||||
error "failed to decode rpc"
|
error "failed to decode rpc"
|
||||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
return
|
return
|
||||||
|
|
||||||
let rpc = res.get()
|
let req = reqDecodeRes.get()
|
||||||
|
if req.request == PushRequest():
|
||||||
|
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
|
||||||
|
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
|
||||||
|
return
|
||||||
|
|
||||||
if rpc.request != PushRequest():
|
|
||||||
info "lightpush push request"
|
|
||||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||||
|
|
||||||
let
|
let
|
||||||
pubSubTopic = rpc.request.pubSubTopic
|
pubSubTopic = req.request.pubSubTopic
|
||||||
message = rpc.request.message
|
message = req.request.message
|
||||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic
|
||||||
|
|
||||||
var response: PushResponse
|
var response: PushResponse
|
||||||
if not wl.relayReference.isNil():
|
let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
|
||||||
let data = message.encode().buffer
|
if handleRes.isOk():
|
||||||
|
response = PushResponse(is_success: true, info: "OK")
|
||||||
# Assumimng success, should probably be extended to check for network, peers, etc
|
|
||||||
discard wl.relayReference.publish(pubSubTopic, data)
|
|
||||||
response = PushResponse(is_success: true, info: "Totally.")
|
|
||||||
else:
|
else:
|
||||||
debug "No relay protocol present, unsuccesssful push"
|
response = PushResponse(is_success: false, info: handleRes.error)
|
||||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
|
||||||
|
error "pushed message handling failed", error=handleRes.error
|
||||||
|
|
||||||
|
let rpc = PushRPC(requestId: req.requestId, response: response)
|
||||||
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
|
||||||
await conn.writeLp(rpc.encode().buffer)
|
await conn.writeLp(rpc.encode().buffer)
|
||||||
|
|
||||||
if rpc.response != PushResponse():
|
|
||||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
|
||||||
if rpc.response.isSuccess:
|
|
||||||
info "lightpush message success"
|
|
||||||
else:
|
|
||||||
info "lightpush message failure", info=rpc.response.info
|
|
||||||
|
|
||||||
wl.handler = handle
|
wl.handler = handle
|
||||||
wl.codec = WakuLightPushCodec
|
wl.codec = WakuLightPushCodec
|
||||||
|
|
||||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
proc new*(T: type WakuLightPush,
|
||||||
debug "init"
|
peerManager: PeerManager,
|
||||||
let rng = crypto.newRng()
|
rng: ref rand.HmacDrbgContext,
|
||||||
let wl = WakuLightPush(rng: rng,
|
pushHandler: PushMessageHandler): T =
|
||||||
peerManager: peerManager,
|
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
|
||||||
requestHandler: handler,
|
wl.initProtocolHandler()
|
||||||
relayReference: relay)
|
|
||||||
wl.init()
|
|
||||||
|
|
||||||
return wl
|
return wl
|
||||||
|
|
||||||
|
|
||||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) {.
|
||||||
|
deprecated: "Use 'WakuLightPushClient.setPeer()' instead" .} =
|
||||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||||
waku_lightpush_peers.inc()
|
waku_lightpush_peers.inc()
|
||||||
|
|
||||||
|
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
||||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
||||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||||
if connOpt.isNone():
|
if connOpt.isNone():
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
@ -139,7 +109,8 @@ proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[
|
|||||||
|
|
||||||
return ok(rpcRes.response)
|
return ok(rpcRes.response)
|
||||||
|
|
||||||
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
||||||
|
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
||||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||||
if peerOpt.isNone():
|
if peerOpt.isNone():
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
|
|||||||
18
waku/v2/protocol/waku_lightpush/protocol_metrics.nim
Normal file
18
waku/v2/protocol/waku_lightpush/protocol_metrics.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import metrics
|
||||||
|
|
||||||
|
|
||||||
|
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||||
|
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||||
|
|
||||||
|
|
||||||
|
# Error types (metric label values)
|
||||||
|
const
|
||||||
|
dialFailure* = "dial_failure"
|
||||||
|
decodeRpcFailure* = "decode_rpc_failure"
|
||||||
|
peerNotFoundFailure* = "peer_not_found_failure"
|
||||||
|
emptyRequestBodyFailure* = "empty_request_body_failure"
|
||||||
|
emptyResponseBodyFailure* = "empty_response_body_failure"
|
||||||
|
messagePushFailure* = "message_push_failure"
|
||||||
@ -9,6 +9,9 @@ import
|
|||||||
./rpc
|
./rpc
|
||||||
|
|
||||||
|
|
||||||
|
const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||||
|
|
||||||
|
|
||||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||||
var output = initProtoBuffer()
|
var output = initProtoBuffer()
|
||||||
output.write3(1, rpc.pubSubTopic)
|
output.write3(1, rpc.pubSubTopic)
|
||||||
|
|||||||
@ -49,7 +49,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
|
|||||||
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
|
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
|
||||||
await connection.writeLP(rpc.encode().buffer)
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
var message = await connOpt.get().readLp(MaxRpcSize.int)
|
var message = await connection.readLp(MaxRpcSize.int)
|
||||||
let response = HistoryRPC.init(message)
|
let response = HistoryRPC.init(message)
|
||||||
|
|
||||||
if response.isErr():
|
if response.isErr():
|
||||||
@ -114,6 +114,7 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo
|
|||||||
|
|
||||||
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
|
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
|
||||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||||
|
waku_store_peers.inc()
|
||||||
|
|
||||||
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||||
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user