mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 17:04:53 +00:00
deploy: d201a37330b1b215e535aa23f808c00d4f6804ac
This commit is contained in:
parent
686e30daa5
commit
611c10af04
@ -1,76 +1,85 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sets],
|
||||
testutils/unittests, chronos, chronicles,
|
||||
std/options,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/switch,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multistream,
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
|
||||
../test_helpers, ./utils
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_lightpush,
|
||||
../test_helpers
|
||||
|
||||
procSuite "Waku Light Push":
|
||||
|
||||
# NOTE See test_wakunode for light push request success
|
||||
const
|
||||
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
|
||||
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
|
||||
# TODO: Extend lightpush protocol test coverage
|
||||
procSuite "Waku Lightpush":
|
||||
|
||||
asyncTest "handle light push request success":
|
||||
# TODO: Move here the test case at test_wakunode: light push request success
|
||||
discard
|
||||
|
||||
asyncTest "handle light push request fail":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.new(key)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
listenSwitch = newStandardSwitch(some(key))
|
||||
await listenSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
var completionFut = newFuture[bool]()
|
||||
let dialSwitch = newStandardSwitch()
|
||||
await dialSwitch.start()
|
||||
|
||||
proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
|
||||
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
# TODO Success return here
|
||||
debug "handle push req"
|
||||
check:
|
||||
1 == 0
|
||||
responseRequestIdFuture.complete(requestId)
|
||||
|
||||
# FIXME Unclear how we want to use subscriptions, if at all
|
||||
let
|
||||
proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
rpc = PushRequest(pubSubTopic: defaultTopic, message: wm)
|
||||
peerManager = PeerManager.new(dialSwitch)
|
||||
rng = crypto.newRng()
|
||||
proto = WakuLightPush.init(peerManager, rng, requestHandler)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
|
||||
# TODO Can possibly get rid of this if it isn't dynamic
|
||||
proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||
debug "push request handler"
|
||||
# TODO: Also relay message
|
||||
# TODO: Here we want to send back response with is_success true
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle)
|
||||
peerManager2 = PeerManager.new(listenSwitch)
|
||||
rng2 = crypto.newRng()
|
||||
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
|
||||
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
proc handler(response: PushResponse) {.gcsafe, closure.} =
|
||||
debug "push response handler, expecting false"
|
||||
check:
|
||||
response.isSuccess == false
|
||||
debug "Additional info", info=response.info
|
||||
completionFut.complete(true)
|
||||
|
||||
await proto.request(rpc, handler)
|
||||
await sleepAsync(2.seconds)
|
||||
## Given
|
||||
let
|
||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC)
|
||||
rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC)
|
||||
|
||||
## When
|
||||
let res = await proto.request(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
let response = res.get()
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
not response.isSuccess
|
||||
|
||||
## Cleanup
|
||||
await allFutures(listenSwitch.stop(), dialSwitch.stop())
|
@ -19,7 +19,7 @@ import
|
||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
|
||||
../../waku/v2/protocol/waku_lightpush,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/utils/peers,
|
||||
../../waku/v2/utils/time,
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az201-59:
|
||||
# Libtool was configured on host fv-az397-247:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -20,7 +20,7 @@ import
|
||||
../protocol/waku_store,
|
||||
../protocol/waku_swap/waku_swap,
|
||||
../protocol/waku_filter/waku_filter,
|
||||
../protocol/waku_lightpush/waku_lightpush,
|
||||
../protocol/waku_lightpush,
|
||||
../protocol/waku_rln_relay/[waku_rln_relay_types],
|
||||
../utils/[peers, requests, wakuswitch, wakuenr],
|
||||
./peer_manager/peer_manager,
|
||||
|
@ -6,7 +6,7 @@ import
|
||||
../protocol/waku_store,
|
||||
../protocol/waku_swap/waku_swap,
|
||||
../protocol/waku_filter/waku_filter,
|
||||
../protocol/waku_lightpush/waku_lightpush,
|
||||
../protocol/waku_lightpush,
|
||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||
./peer_manager/peer_manager,
|
||||
./discv5/waku_discv5
|
||||
|
9
waku/v2/protocol/waku_lightpush.nim
Normal file
9
waku/v2/protocol/waku_lightpush.nim
Normal file
@ -0,0 +1,9 @@
|
||||
import
|
||||
./waku_lightpush/protocol,
|
||||
./waku_lightpush/rpc,
|
||||
./waku_lightpush/rpc_codec
|
||||
|
||||
export
|
||||
protocol,
|
||||
rpc,
|
||||
rpc_codec
|
156
waku/v2/protocol/waku_lightpush/protocol.nim
Normal file
156
waku/v2/protocol/waku_lightpush/protocol.nim
Normal file
@ -0,0 +1,156 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
bearssl,
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
../waku_message,
|
||||
../waku_relay,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../../utils/requests,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "wakulightpush"
|
||||
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||
|
||||
|
||||
const
|
||||
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
const
|
||||
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
|
||||
type
|
||||
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
||||
|
||||
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
||||
|
||||
WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
requestHandler*: PushRequestHandler
|
||||
relayReference*: WakuRelay
|
||||
|
||||
|
||||
proc init*(wl: WakuLightPush) =
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
let message = await conn.readLp(MaxRpcSize.int)
|
||||
let res = PushRPC.init(message)
|
||||
if res.isErr():
|
||||
error "failed to decode rpc"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
let rpc = res.get()
|
||||
|
||||
if rpc.request != PushRequest():
|
||||
info "lightpush push request"
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
|
||||
let
|
||||
pubSubTopic = rpc.request.pubSubTopic
|
||||
message = rpc.request.message
|
||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||
|
||||
var response: PushResponse
|
||||
if not wl.relayReference.isNil():
|
||||
let data = message.encode().buffer
|
||||
|
||||
# Assumimng success, should probably be extended to check for network, peers, etc
|
||||
discard wl.relayReference.publish(pubSubTopic, data)
|
||||
response = PushResponse(is_success: true, info: "Totally.")
|
||||
else:
|
||||
debug "No relay protocol present, unsuccesssful push"
|
||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||
|
||||
|
||||
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
||||
await conn.writeLp(rpc.encode().buffer)
|
||||
|
||||
if rpc.response != PushResponse():
|
||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||
if rpc.response.isSuccess:
|
||||
info "lightpush message success"
|
||||
else:
|
||||
info "lightpush message failure", info=rpc.response.info
|
||||
|
||||
wl.handler = handle
|
||||
wl.codec = WakuLightPushCodec
|
||||
|
||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
||||
debug "init"
|
||||
let rng = crypto.newRng()
|
||||
let wl = WakuLightPush(rng: rng,
|
||||
peerManager: peerManager,
|
||||
requestHandler: handler,
|
||||
relayReference: relay)
|
||||
wl.init()
|
||||
|
||||
return wl
|
||||
|
||||
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
|
||||
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||
if connOpt.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
return err(dialFailure)
|
||||
|
||||
let connection = connOpt.get()
|
||||
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||
await connection.writeLP(rpc.encode().buffer)
|
||||
|
||||
var message = await connection.readLp(MaxRpcSize.int)
|
||||
let res = PushRPC.init(message)
|
||||
|
||||
if res.isErr():
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return err(decodeRpcFailure)
|
||||
|
||||
let rpcRes = res.get()
|
||||
if rpcRes.response == PushResponse():
|
||||
return err("empty response body")
|
||||
|
||||
return ok(rpcRes.response)
|
||||
|
||||
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
return err(dialFailure)
|
||||
|
||||
return await wl.request(req, peerOpt.get())
|
||||
|
||||
proc request*(wl: WakuLightPush, req: PushRequest, handler: PushResponseHandler) {.async, gcsafe,
|
||||
deprecated: "Use the no-callback request() procedure".} =
|
||||
let res = await wl.request(req)
|
||||
if res.isErr():
|
||||
return
|
||||
|
||||
handler(res.get())
|
18
waku/v2/protocol/waku_lightpush/rpc.nim
Normal file
18
waku/v2/protocol/waku_lightpush/rpc.nim
Normal file
@ -0,0 +1,18 @@
|
||||
{.push raises: [Defec].}
|
||||
|
||||
import
|
||||
../waku_message
|
||||
|
||||
type
|
||||
PushRequest* = object
|
||||
pubSubTopic*: string
|
||||
message*: WakuMessage
|
||||
|
||||
PushResponse* = object
|
||||
isSuccess*: bool
|
||||
info*: string
|
||||
|
||||
PushRPC* = object
|
||||
requestId*: string
|
||||
request*: PushRequest
|
||||
response*: PushResponse
|
86
waku/v2/protocol/waku_lightpush/rpc_codec.nim
Normal file
86
waku/v2/protocol/waku_lightpush/rpc_codec.nim
Normal file
@ -0,0 +1,86 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
||||
import
|
||||
libp2p/protobuf/minprotobuf
|
||||
import
|
||||
../waku_message,
|
||||
../../utils/protobuf,
|
||||
./rpc
|
||||
|
||||
|
||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
output.write3(1, rpc.pubSubTopic)
|
||||
output.write3(2, rpc.message.encode())
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var rpc = PushRequest()
|
||||
|
||||
var pubSubTopic: string
|
||||
discard ?pb.getField(1, pubSubTopic)
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
|
||||
var buf: seq[byte]
|
||||
discard ?pb.getField(2, buf)
|
||||
rpc.message = ?WakuMessage.init(buf)
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
|
||||
proc encode*(rpc: PushResponse): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
output.write3(1, uint64(rpc.isSuccess))
|
||||
output.write3(2, rpc.info)
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var rpc = PushResponse(isSuccess: false, info: "")
|
||||
|
||||
var isSuccess: uint64
|
||||
if ?pb.getField(1, isSuccess):
|
||||
rpc.isSuccess = bool(isSuccess)
|
||||
|
||||
var info: string
|
||||
discard ?pb.getField(2, info)
|
||||
rpc.info = info
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
|
||||
proc encode*(rpc: PushRPC): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
output.write3(1, rpc.requestId)
|
||||
output.write3(2, rpc.request.encode())
|
||||
output.write3(3, rpc.response.encode())
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var rpc = PushRPC()
|
||||
|
||||
var requestId: string
|
||||
discard ?pb.getField(1, requestId)
|
||||
rpc.requestId = requestId
|
||||
|
||||
var requestBuffer: seq[byte]
|
||||
discard ?pb.getField(2, requestBuffer)
|
||||
rpc.request = ?PushRequest.init(requestBuffer)
|
||||
|
||||
var pushBuffer: seq[byte]
|
||||
discard ?pb.getField(3, pushBuffer)
|
||||
rpc.response = ?PushResponse.init(pushBuffer)
|
||||
|
||||
return ok(rpc)
|
@ -1,202 +0,0 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, options],
|
||||
bearssl,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/protocols/pubsub/pubsubpeer,
|
||||
libp2p/protocols/pubsub/floodsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
waku_lightpush_types,
|
||||
../../utils/requests,
|
||||
../../utils/protobuf,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../waku_relay
|
||||
|
||||
export waku_lightpush_types
|
||||
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||
|
||||
logScope:
|
||||
topics = "wakulightpush"
|
||||
|
||||
const
|
||||
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
# Encoding and decoding -------------------------------------------------------
|
||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
|
||||
output.write3(1, rpc.pubSubTopic)
|
||||
output.write3(2, rpc.message.encode())
|
||||
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
#var rpc = PushRequest(pubSubTopic: "", message: WakuMessage())
|
||||
var rpc = PushRequest()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var pubSubTopic: string
|
||||
discard ? pb.getField(1, pubSubTopic)
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
|
||||
var buf: seq[byte]
|
||||
discard ? pb.getField(2, buf)
|
||||
rpc.message = ? WakuMessage.init(buf)
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
proc encode*(rpc: PushResponse): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
|
||||
output.write3(1, uint64(rpc.isSuccess))
|
||||
output.write3(2, rpc.info)
|
||||
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = PushResponse(isSuccess: false, info: "")
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var isSuccess: uint64
|
||||
if ? pb.getField(1, isSuccess):
|
||||
rpc.isSuccess = bool(isSuccess)
|
||||
|
||||
var info: string
|
||||
discard ? pb.getField(2, info)
|
||||
rpc.info = info
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
proc encode*(rpc: PushRPC): ProtoBuffer =
|
||||
var output = initProtoBuffer()
|
||||
|
||||
output.write3(1, rpc.requestId)
|
||||
output.write3(2, rpc.request.encode())
|
||||
output.write3(3, rpc.response.encode())
|
||||
|
||||
output.finish3()
|
||||
|
||||
return output
|
||||
|
||||
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = PushRPC()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
discard ? pb.getField(1, rpc.requestId)
|
||||
|
||||
var requestBuffer: seq[byte]
|
||||
discard ? pb.getField(2, requestBuffer)
|
||||
|
||||
rpc.request = ? PushRequest.init(requestBuffer)
|
||||
|
||||
var pushBuffer: seq[byte]
|
||||
discard ? pb.getField(3, pushBuffer)
|
||||
|
||||
rpc.response = ? PushResponse.init(pushBuffer)
|
||||
|
||||
return ok(rpc)
|
||||
|
||||
# Protocol -------------------------------------------------------
|
||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
||||
debug "init"
|
||||
let rng = crypto.newRng()
|
||||
var wl = WakuLightPush(rng: rng,
|
||||
peerManager: peerManager,
|
||||
requestHandler: handler,
|
||||
relayReference: relay)
|
||||
wl.init()
|
||||
|
||||
return wl
|
||||
|
||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||
waku_lightpush_peers.inc()
|
||||
|
||||
method init*(wlp: WakuLightPush) =
|
||||
debug "init"
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var message = await conn.readLp(MaxRpcSize.int)
|
||||
var res = PushRPC.init(message)
|
||||
if res.isErr:
|
||||
error "failed to decode rpc"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
info "lightpush message received"
|
||||
|
||||
let value = res.value
|
||||
if value.request != PushRequest():
|
||||
info "lightpush push request"
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
let
|
||||
pubSubTopic = value.request.pubSubTopic
|
||||
message = value.request.message
|
||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||
var response: PushResponse
|
||||
if wlp.relayReference != nil:
|
||||
let wakuRelay = wlp.relayReference
|
||||
let data = message.encode().buffer
|
||||
# XXX Assumes success, should probably be extended to check for network, peers, etc
|
||||
discard wakuRelay.publish(pubSubTopic, data)
|
||||
response = PushResponse(is_success: true, info: "Totally.")
|
||||
else:
|
||||
debug "No relay protocol present, unsuccesssful push"
|
||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||
await conn.writeLp(PushRPC(requestId: value.requestId,
|
||||
response: response).encode().buffer)
|
||||
#wlp.requestHandler(value.requestId, value.request)
|
||||
if value.response != PushResponse():
|
||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||
if value.response.isSuccess:
|
||||
info "lightpush message success"
|
||||
else:
|
||||
info "lightpush message failure", info=value.response.info
|
||||
|
||||
wlp.handler = handle
|
||||
wlp.codec = WakuLightPushCodec
|
||||
|
||||
proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} =
|
||||
let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "no suitable remote peers"
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec)
|
||||
|
||||
if connOpt.isNone():
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng),
|
||||
request: request).encode().buffer)
|
||||
|
||||
var message = await connOpt.get().readLp(64*1024)
|
||||
let response = PushRPC.init(message)
|
||||
|
||||
if response.isErr:
|
||||
error "failed to decode response"
|
||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
handler(response.value.response)
|
@ -1,36 +0,0 @@
|
||||
import
|
||||
std/[tables],
|
||||
bearssl,
|
||||
libp2p/protocols/protocol,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../waku_message,
|
||||
../waku_relay
|
||||
|
||||
export waku_message
|
||||
|
||||
const
|
||||
MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
||||
|
||||
type
|
||||
PushRequest* = object
|
||||
pubSubTopic*: string
|
||||
message*: WakuMessage
|
||||
|
||||
PushResponse* = object
|
||||
isSuccess*: bool
|
||||
info*: string
|
||||
|
||||
PushRPC* = object
|
||||
requestId*: string
|
||||
request*: PushRequest
|
||||
response*: PushResponse
|
||||
|
||||
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
||||
|
||||
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
requestHandler*: PushRequestHandler
|
||||
relayReference*: WakuRelay
|
Loading…
x
Reference in New Issue
Block a user