feat(lightpush): add waku lightpush protocol client

This commit is contained in:
Lorenzo Delgado 2022-10-25 14:55:31 +02:00 committed by GitHub
parent c99b0a4778
commit 7e7bba4a98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 263 additions and 145 deletions

View File

@ -4,7 +4,6 @@ import
# ./v2/test_waku,
./v2/test_wakunode,
./v2/test_wakunode_relay,
./v2/test_wakunode_lightpush,
# Waku Store
./v2/test_message_store_queue_index,
./v2/test_message_store_queue_pagination,
@ -17,6 +16,9 @@ import
# TODO: Re-enable store resume test cases (#1282)
# ./v2/test_waku_store_resume,
./v2/test_wakunode_store,
# Waku LightPush
./v2/test_waku_lightpush,
./v2/test_wakunode_lightpush,
# Waku Filter
./v2/test_waku_filter,
./v2/test_wakunode_filter,

View File

@ -5,78 +5,119 @@ import
testutils/unittests,
chronicles,
chronos,
libp2p/switch,
libp2p/crypto/crypto
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_lightpush,
../test_helpers,
./testlib/common
../../waku/v2/protocol/waku_lightpush/client,
./testlib/common,
./testlib/switch
# TODO: Extend lightpush protocol test coverage
procSuite "Waku Lightpush":
proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuLightPush.new(peerManager, rng, handler)
asyncTest "handle light push request success":
# TODO: Move here the test case at test_wakunode: light push request success
discard
await proto.start()
switch.mount(proto)
asyncTest "handle light push request fail":
let
key = PrivateKey.random(ECDSA, rng[]).get()
listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
return proto
let dialSwitch = newStandardSwitch()
await dialSwitch.start()
proc newTestWakuLightpushClient(switch: Switch): WakuLightPushClient =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
WakuLightPushClient.new(peerManager, rng)
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
# TODO Success return here
debug "handle push req"
check:
1 == 0
suite "Waku Lightpush":
# FIXME Unclear how we want to use subscriptions, if at all
let
peerManager = PeerManager.new(dialSwitch)
rng = crypto.newRng()
proto = WakuLightPush.init(peerManager, rng, requestHandler)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
waitFor proto.start()
dialSwitch.mount(proto)
# TODO Can possibly get rid of this if it isn't dynamic
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
debug "push request handler"
# TODO: Also relay message
# TODO: Here we want to send back response with is_success true
discard
let
peerManager2 = PeerManager.new(listenSwitch)
rng2 = crypto.newRng()
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
waitFor proto2.start()
listenSwitch.mount(proto2)
asyncTest "push message to pubsub topic is successful":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
let handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic)
server = await newTestWakuLightpushNode(serverSwitch, handler)
client = newTestWakuLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let
topic = DefaultPubsubTopic
message = fakeWakuMessage()
## When
let res = await proto.request(rpc)
let rpc = PushRequest(pubSubTopic: topic, message: message)
let requestRes = await client.request(rpc, serverPeerId)
require await handlerFuture.withTimeout(100.millis)
## Then
check res.isOk()
let response = res.get()
check:
not response.isSuccess
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == topic
handledMessage == message
## Cleanup
await allFutures(listenSwitch.stop(), dialSwitch.stop())
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "push message to pubsub topic should fail":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
let error = "test_failure"
let handlerFuture = newFuture[void]()
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete()
return err(error)
let
server = await newTestWakuLightpushNode(serverSwitch, handler)
client = newTestWakuLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let
topic = DefaultPubsubTopic
message = fakeWakuMessage()
## When
let rpc = PushRequest(pubSubTopic: topic, message: message)
let requestRes = await client.request(rpc, serverPeerId)
require await handlerFuture.withTimeout(100.millis)
## Then
check:
requestRes.isErr()
handlerFuture.finished()
let requestError = requestRes.error
check:
requestError == error
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -59,7 +59,7 @@ procSuite "WakuNode - Lightpush":
## When
let lightpushRes = await lightNode.lightpush(DefaultPubsubTopic, message)
require (await completionFutRelay.withTimeout(5.seconds)) == true
require await completionFutRelay.withTimeout(5.seconds)
## Then
check lightpushRes.isOk()

View File

@ -1,5 +1,6 @@
import
std/times
std/times,
stew/byteutils
import
../../../waku/v2/protocol/waku_message,
../../../waku/v2/utils/time

View File

@ -9,8 +9,8 @@ import
metrics/chronos_httpserver
import
../protocol/waku_filter,
../protocol/waku_store/protocol_metrics,
../protocol/waku_lightpush,
../protocol/waku_store/protocol_metrics as store_metrics,
../protocol/waku_lightpush/protocol_metrics as lightpush_metrics,
../protocol/waku_swap/waku_swap,
../protocol/waku_peer_exchange,
../utils/collector,

View File

@ -361,21 +361,16 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
##
## Status: Implemented.
if node.wakuRelay.isNil:
if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
# @TODO improved error handling
# TODO: Improve error handling
return
let wakuRelay = node.wakuRelay
trace "publish", topic=topic, contentTopic=message.contentTopic
var publishingMessage = message
let data = message.encode().buffer
discard await wakuRelay.publish(topic, data)
discard await node.wakuRelay.publish(topic, data)
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
@ -421,7 +416,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
##
@ -594,16 +589,23 @@ proc mountRelay*(node: WakuNode,
info "relay mounted successfully"
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
proc mountLightPush*(node: WakuNode) {.async.} =
info "mounting light push"
if node.wakuRelay.isNil:
debug "mounting lightpush without relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
var pushHandler: PushMessageHandler
if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
# TODO: Remove after using waku lightpush client
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
debug "mounting lightpush with relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
return ok()
debug "mounting lightpush with relay"
node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler)
if node.started:
# Node has started already. Let's start lightpush too.
await node.wakuLightPush.start()

View File

@ -0,0 +1,79 @@
{.push raises: [Defect].}
import
std/options,
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand
import
../../node/peer_manager/peer_manager,
../../utils/requests,
./protocol,
./protocol_metrics,
./rpc,
./rpc_codec
logScope:
topics = "wakulightpush.client"
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
proc new*(T: type WakuLightPushClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)
proc request*(wl: WakuLightPushClient, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
if connOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connection.readLp(MaxRpcSize.int)
let decodeRespRes = PushRPC.init(message)
if decodeRespRes.isErr():
error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
let pushResponseRes = decodeRespRes.get()
if pushResponseRes.response == PushResponse():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)
let response = pushResponseRes.response
if not response.isSuccess:
if response.info != "":
return err(response.info)
else:
return err("unknown failure")
return ok()
### Set lightpush peer and send push requests
proc setPeer*(wl: WakuLightPushClient, peer: RemotePeerInfo) =
wl.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()
proc request*(wl: WakuLightPushClient, req: PushRequest): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_lightpush_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wl.request(req, peerOpt.get())

View File

@ -6,116 +6,86 @@ import
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/crypto/crypto
bearssl/rand
import
../waku_message,
../waku_relay,
../../node/peer_manager/peer_manager,
../../utils/requests,
./rpc,
./rpc_codec
./rpc_codec,
./protocol_metrics
logScope:
topics = "wakulightpush"
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
const
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
const
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
type
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
WakuLightPushResult*[T] = Result[T, string]
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
requestHandler*: PushRequestHandler
relayReference*: WakuRelay
proc init*(wl: WakuLightPush) =
pushHandler*: PushMessageHandler
proc initProtocolHandler*(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
let res = PushRPC.init(message)
if res.isErr():
let buffer = await conn.readLp(MaxRpcSize.int)
let reqDecodeRes = PushRPC.init(buffer)
if reqDecodeRes.isErr():
error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = res.get()
let req = reqDecodeRes.get()
if req.request == PushRequest():
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
return
if rpc.request != PushRequest():
info "lightpush push request"
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
let
pubSubTopic = req.request.pubSubTopic
message = req.request.message
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic
let
pubSubTopic = rpc.request.pubSubTopic
message = rpc.request.message
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
var response: PushResponse
let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
if handleRes.isOk():
response = PushResponse(is_success: true, info: "OK")
else:
response = PushResponse(is_success: false, info: handleRes.error)
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
error "pushed message handling failed", error=handleRes.error
var response: PushResponse
if not wl.relayReference.isNil():
let data = message.encode().buffer
# Assumimng success, should probably be extended to check for network, peers, etc
discard wl.relayReference.publish(pubSubTopic, data)
response = PushResponse(is_success: true, info: "Totally.")
else:
debug "No relay protocol present, unsuccesssful push"
response = PushResponse(is_success: false, info: "No relay protocol")
let rpc = PushRPC(requestId: rpc.requestId, response: response)
await conn.writeLp(rpc.encode().buffer)
if rpc.response != PushResponse():
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
if rpc.response.isSuccess:
info "lightpush message success"
else:
info "lightpush message failure", info=rpc.response.info
let rpc = PushRPC(requestId: req.requestId, response: response)
await conn.writeLp(rpc.encode().buffer)
wl.handler = handle
wl.codec = WakuLightPushCodec
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
debug "init"
let rng = crypto.newRng()
let wl = WakuLightPush(rng: rng,
peerManager: peerManager,
requestHandler: handler,
relayReference: relay)
wl.init()
proc new*(T: type WakuLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler): T =
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
wl.initProtocolHandler()
return wl
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) {.
deprecated: "Use 'WakuLightPushClient.setPeer()' instead" .} =
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
if connOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
@ -139,7 +109,8 @@ proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[
return ok(rpcRes.response)
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe,
deprecated: "Use 'WakuLightPushClient.request()' instead" .} =
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])

View File

@ -0,0 +1,18 @@
{.push raises: [Defect].}
import metrics
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
peerNotFoundFailure* = "peer_not_found_failure"
emptyRequestBodyFailure* = "empty_request_body_failure"
emptyResponseBodyFailure* = "empty_response_body_failure"
messagePushFailure* = "message_push_failure"

View File

@ -9,6 +9,9 @@ import
./rpc
const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
proc encode*(rpc: PushRequest): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.pubSubTopic)

View File

@ -49,7 +49,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connOpt.get().readLp(MaxRpcSize.int)
var message = await connection.readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr():
@ -114,6 +114,7 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.