mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-14 16:07:27 +00:00
deploy: 7e7bba4a983c3321f036b73a72f9621d0baaa14d
This commit is contained in:
parent
c270da77a6
commit
d40bdda16e
@ -4,7 +4,6 @@ import
|
||||
# ./v2/test_waku,
|
||||
./v2/test_wakunode,
|
||||
./v2/test_wakunode_relay,
|
||||
./v2/test_wakunode_lightpush,
|
||||
# Waku Store
|
||||
./v2/test_message_store_queue_index,
|
||||
./v2/test_message_store_queue_pagination,
|
||||
@ -17,6 +16,9 @@ import
|
||||
# TODO: Re-enable store resume test cases (#1282)
|
||||
# ./v2/test_waku_store_resume,
|
||||
./v2/test_wakunode_store,
|
||||
# Waku LightPush
|
||||
./v2/test_waku_lightpush,
|
||||
./v2/test_wakunode_lightpush,
|
||||
# Waku Filter
|
||||
./v2/test_waku_filter,
|
||||
./v2/test_wakunode_filter,
|
||||
|
@ -5,78 +5,119 @@ import
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/switch,
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_lightpush,
|
||||
../test_helpers,
|
||||
./testlib/common
|
||||
../../waku/v2/protocol/waku_lightpush/client,
|
||||
./testlib/common,
|
||||
./testlib/switch
|
||||
|
||||
|
||||
# TODO: Extend lightpush protocol test coverage
|
||||
procSuite "Waku Lightpush":
|
||||
proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
rng = crypto.newRng()
|
||||
proto = WakuLightPush.new(peerManager, rng, handler)
|
||||
|
||||
asyncTest "handle light push request success":
|
||||
# TODO: Move here the test case at test_wakunode: light push request success
|
||||
discard
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
asyncTest "handle light push request fail":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
listenSwitch = newStandardSwitch(some(key))
|
||||
await listenSwitch.start()
|
||||
return proto
|
||||
|
||||
let dialSwitch = newStandardSwitch()
|
||||
await dialSwitch.start()
|
||||
proc newTestWakuLightpushClient(switch: Switch): WakuLightPushClient =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
rng = crypto.newRng()
|
||||
WakuLightPushClient.new(peerManager, rng)
|
||||
|
||||
|
||||
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
# TODO Success return here
|
||||
debug "handle push req"
|
||||
check:
|
||||
1 == 0
|
||||
suite "Waku Lightpush":
|
||||
|
||||
# FIXME Unclear how we want to use subscriptions, if at all
|
||||
let
|
||||
peerManager = PeerManager.new(dialSwitch)
|
||||
rng = crypto.newRng()
|
||||
proto = WakuLightPush.init(peerManager, rng, requestHandler)
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||
waitFor proto.start()
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
|
||||
# TODO Can possibly get rid of this if it isn't dynamic
|
||||
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
debug "push request handler"
|
||||
# TODO: Also relay message
|
||||
# TODO: Here we want to send back response with is_success true
|
||||
discard
|
||||
|
||||
let
|
||||
peerManager2 = PeerManager.new(listenSwitch)
|
||||
rng2 = crypto.newRng()
|
||||
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
|
||||
waitFor proto2.start()
|
||||
listenSwitch.mount(proto2)
|
||||
asyncTest "push message to pubsub topic is successful":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
let handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
let
|
||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
|
||||
rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic)
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
let
|
||||
topic = DefaultPubsubTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
## When
|
||||
let res = await proto.request(rpc)
|
||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
||||
let requestRes = await client.request(rpc, serverPeerId)
|
||||
|
||||
require await handlerFuture.withTimeout(100.millis)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
let response = res.get()
|
||||
check:
|
||||
not response.isSuccess
|
||||
requestRes.isOk()
|
||||
handlerFuture.finished()
|
||||
|
||||
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
||||
check:
|
||||
handledMessagePubsubTopic == topic
|
||||
handledMessage == message
|
||||
|
||||
## Cleanup
|
||||
await allFutures(listenSwitch.stop(), dialSwitch.stop())
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "push message to pubsub topic should fail":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
let error = "test_failure"
|
||||
|
||||
let handlerFuture = newFuture[void]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete()
|
||||
return err(error)
|
||||
|
||||
let
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
let
|
||||
topic = DefaultPubsubTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
## When
|
||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
||||
let requestRes = await client.request(rpc, serverPeerId)
|
||||
|
||||
require await handlerFuture.withTimeout(100.millis)
|
||||
|
||||
## Then
|
||||
check:
|
||||
requestRes.isErr()
|
||||
handlerFuture.finished()
|
||||
|
||||
let requestError = requestRes.error
|
||||
check:
|
||||
requestError == error
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
@ -59,7 +59,7 @@ procSuite "WakuNode - Lightpush":
|
||||
## When
|
||||
let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
|
||||
|
||||
require (await completionFutRelay.withTimeout(5.seconds)) == true
|
||||
require await completionFutRelay.withTimeout(5.seconds)
|
||||
|
||||
## Then
|
||||
check lightpushRes.isOk()
|
||||
|
@ -1,5 +1,6 @@
|
||||
import
|
||||
std/times
|
||||
std/times,
|
||||
stew/byteutils
|
||||
import
|
||||
../../../waku/v2/protocol/waku_message,
|
||||
../../../waku/v2/utils/time
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az208-754:
|
||||
# Libtool was configured on host fv-az204-961:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -9,8 +9,8 @@ import
|
||||
metrics/chronos_httpserver
|
||||
import
|
||||
../protocol/waku_filter,
|
||||
../protocol/waku_store/protocol_metrics,
|
||||
../protocol/waku_lightpush,
|
||||
../protocol/waku_store/protocol_metrics as store_metrics,
|
||||
../protocol/waku_lightpush/protocol_metrics as lightpush_metrics,
|
||||
../protocol/waku_swap/waku_swap,
|
||||
../protocol/waku_peer_exchange,
|
||||
../utils/collector,
|
||||
|
@ -361,21 +361,16 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf
|
||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||
## `contentTopic` field for light node functionality. This field may be also
|
||||
## be omitted.
|
||||
##
|
||||
## Status: Implemented.
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||
# @TODO improved error handling
|
||||
# TODO: Improve error handling
|
||||
return
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
trace "publish", topic=topic, contentTopic=message.contentTopic
|
||||
var publishingMessage = message
|
||||
|
||||
let data = message.encode().buffer
|
||||
|
||||
discard await wakuRelay.publish(topic, data)
|
||||
discard await node.wakuRelay.publish(topic, data)
|
||||
|
||||
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||
@ -421,7 +416,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
|
||||
|
||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||
|
||||
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||
proc info*(node: WakuNode): WakuInfo =
|
||||
## Returns information about the Node, such as what multiaddress it can be reached at.
|
||||
##
|
||||
@ -594,16 +589,23 @@ proc mountRelay*(node: WakuNode,
|
||||
|
||||
info "relay mounted successfully"
|
||||
|
||||
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountLightPush*(node: WakuNode) {.async.} =
|
||||
info "mounting light push"
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
debug "mounting lightpush without relay"
|
||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
|
||||
var pushHandler: PushMessageHandler
|
||||
if node.wakuRelay.isNil():
|
||||
debug "mounting lightpush without relay (nil)"
|
||||
# TODO: Remove after using waku lightpush client
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
return err("no waku relay found")
|
||||
else:
|
||||
debug "mounting lightpush with relay"
|
||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
|
||||
|
||||
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
|
||||
return ok()
|
||||
|
||||
debug "mounting lightpush with relay"
|
||||
node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler)
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start lightpush too.
|
||||
await node.wakuLightPush.start()
|
||||
|
79
waku/v2/protocol/waku_lightpush/client.nim
Normal file
79
waku/v2/protocol/waku_lightpush/client.nim
Normal file
@ -0,0 +1,79 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
bearssl/rand
|
||||
import
|
||||
../../node/peer_manager/peer_manager,
|
||||
../../utils/requests,
|
||||
./protocol,
|
||||
./protocol_metrics,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "wakulightpush.client"
|
||||
|
||||
|
||||
type WakuLightPushClient* = ref object
|
||||
peerManager*: PeerManager
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
|
||||
|
||||
proc new*(T: type WakuLightPushClient,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext): T =
|
||||
WakuLightPushClient(peerManager: peerManager, rng: rng)
|
||||
|
||||
|
||||
proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||
if connOpt.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
return err(dialFailure)
|
||||
let connection = connOpt.get()
|
||||
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||
await connection.writeLP(rpc.encode().buffer)
|
||||
|
||||
var message = await connection.readLp(MaxRpcSize.int)
|
||||
let decodeRespRes = PushRPC.init(message)
|
||||
if decodeRespRes.isErr():
|
||||
error "failed to decode response"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return err(decodeRpcFailure)
|
||||
|
||||
let pushResponseRes = decodeRespRes.get()
|
||||
if pushResponseRes.response == PushResponse():
|
||||
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
|
||||
return err(emptyResponseBodyFailure)
|
||||
|
||||
let response = pushResponseRes.response
|
||||
if not response.isSuccess:
|
||||
if response.info != "":
|
||||
return err(response.info)
|
||||
else:
|
||||
return err("unknown failure")
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
### Set lightpush peer and send push requests
|
||||
|
||||
proc setPeer*(wl: WakuLightPushClient, peer: RemotePeerInfo) =
|
||||
wl.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
proc request*(wl: WakuLightPushClient, req: PushRequest): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
error "no suitable remote peers"
|
||||
waku_lightpush_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
return err(peerNotFoundFailure)
|
||||
|
||||
return await wl.request(req, peerOpt.get())
|
@ -6,116 +6,86 @@ import
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
bearssl/rand,
|
||||
libp2p/crypto/crypto
|
||||
|
||||
bearssl/rand
|
||||
import
|
||||
../waku_message,
|
||||
../waku_relay,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../../utils/requests,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
./rpc_codec,
|
||||
./protocol_metrics
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "wakulightpush"
|
||||
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||
|
||||
|
||||
const
|
||||
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
const
|
||||
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
|
||||
type
|
||||
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
||||
|
||||
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
||||
|
||||
WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
requestHandler*: PushRequestHandler
|
||||
relayReference*: WakuRelay
|
||||
|
||||
|
||||
proc init*(wl: WakuLightPush) =
|
||||
pushHandler*: PushMessageHandler
|
||||
|
||||
proc initProtocolHandler*(wl: WakuLightPush) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
let message = await conn.readLp(MaxRpcSize.int)
|
||||
let res = PushRPC.init(message)
|
||||
if res.isErr():
|
||||
let buffer = await conn.readLp(MaxRpcSize.int)
|
||||
let reqDecodeRes = PushRPC.init(buffer)
|
||||
if reqDecodeRes.isErr():
|
||||
error "failed to decode rpc"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
let rpc = res.get()
|
||||
let req = reqDecodeRes.get()
|
||||
if req.request == PushRequest():
|
||||
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
|
||||
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
|
||||
return
|
||||
|
||||
if rpc.request != PushRequest():
|
||||
info "lightpush push request"
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
let
|
||||
pubSubTopic = req.request.pubSubTopic
|
||||
message = req.request.message
|
||||
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic
|
||||
|
||||
let
|
||||
pubSubTopic = rpc.request.pubSubTopic
|
||||
message = rpc.request.message
|
||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||
var response: PushResponse
|
||||
let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
|
||||
if handleRes.isOk():
|
||||
response = PushResponse(is_success: true, info: "OK")
|
||||
else:
|
||||
response = PushResponse(is_success: false, info: handleRes.error)
|
||||
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
|
||||
error "pushed message handling failed", error=handleRes.error
|
||||
|
||||
var response: PushResponse
|
||||
if not wl.relayReference.isNil():
|
||||
let data = message.encode().buffer
|
||||
|
||||
# Assumimng success, should probably be extended to check for network, peers, etc
|
||||
discard wl.relayReference.publish(pubSubTopic, data)
|
||||
response = PushResponse(is_success: true, info: "Totally.")
|
||||
else:
|
||||
debug "No relay protocol present, unsuccesssful push"
|
||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||
|
||||
|
||||
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
if rpc.response != PushResponse():
|
||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||
if rpc.response.isSuccess:
|
||||
info "lightpush message success"
|
||||
else:
|
||||
info "lightpush message failure", info=rpc.response.info
|
||||
let rpc = PushRPC(requestId: req.requestId, response: response)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
wl.handler = handle
|
||||
wl.codec = WakuLightPushCodec
|
||||
|
||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
||||
debug "init"
|
||||
let rng = crypto.newRng()
|
||||
let wl = WakuLightPush(rng: rng,
|
||||
peerManager: peerManager,
|
||||
requestHandler: handler,
|
||||
relayReference: relay)
|
||||
wl.init()
|
||||
|
||||
proc new*(T: type WakuLightPush,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
pushHandler: PushMessageHandler): T =
|
||||
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
|
||||
wl.initProtocolHandler()
|
||||
return wl
|
||||
|
||||
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) {.
|
||||
deprecated: "Use 'WakuLightPushClient.setPeer()' instead" .} =
|
||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
|
||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
||||
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||
if connOpt.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
@ -139,7 +109,8 @@ proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[
|
||||
|
||||
return ok(rpcRes.response)
|
||||
|
||||
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
|
||||
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
|
||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
|
18
waku/v2/protocol/waku_lightpush/protocol_metrics.nim
Normal file
18
waku/v2/protocol/waku_lightpush/protocol_metrics.nim
Normal file
@ -0,0 +1,18 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import metrics
|
||||
|
||||
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure* = "dial_failure"
|
||||
decodeRpcFailure* = "decode_rpc_failure"
|
||||
peerNotFoundFailure* = "peer_not_found_failure"
|
||||
emptyRequestBodyFailure* = "empty_request_body_failure"
|
||||
emptyResponseBodyFailure* = "empty_response_body_failure"
|
||||
messagePushFailure* = "message_push_failure"
|
@ -9,6 +9,9 @@ import
|
||||
./rpc
|
||||
|
||||
|
||||
const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
|
||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
output.write3(1, rpc.pubSubTopic)
|
||||
|
@ -49,7 +49,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
|
||||
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
|
||||
await connection.writeLP(rpc.encode().buffer)
|
||||
|
||||
var message = await connOpt.get().readLp(MaxRpcSize.int)
|
||||
var message = await connection.readLp(MaxRpcSize.int)
|
||||
let response = HistoryRPC.init(message)
|
||||
|
||||
if response.isErr():
|
||||
@ -114,6 +114,7 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo
|
||||
|
||||
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
|
||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||
waku_store_peers.inc()
|
||||
|
||||
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||
|
Loading…
x
Reference in New Issue
Block a user