mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-05 22:39:33 +00:00
refactor(waku-lightpush): waku lightpush protocol code reorganization
This commit is contained in:
parent
34460f5b23
commit
dcfe87fb65
@ -1,76 +1,85 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, tables, sets],
|
std/options,
|
||||||
testutils/unittests, chronos, chronicles,
|
testutils/unittests,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/crypto/crypto
|
||||||
libp2p/stream/[bufferstream, connection],
|
import
|
||||||
libp2p/crypto/crypto,
|
|
||||||
libp2p/multistream,
|
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
|
../../waku/v2/protocol/waku_message,
|
||||||
../test_helpers, ./utils
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
|
../test_helpers
|
||||||
|
|
||||||
procSuite "Waku Light Push":
|
|
||||||
|
|
||||||
# NOTE See test_wakunode for light push request success
|
const
|
||||||
|
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
|
||||||
|
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Extend lightpush protocol test coverage
|
||||||
|
procSuite "Waku Lightpush":
|
||||||
|
|
||||||
|
asyncTest "handle light push request success":
|
||||||
|
# TODO: Move here the test case at test_wakunode: light push request success
|
||||||
|
discard
|
||||||
|
|
||||||
asyncTest "handle light push request fail":
|
asyncTest "handle light push request fail":
|
||||||
const defaultTopic = "/waku/2/default-waku/proto"
|
|
||||||
|
|
||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.new(key)
|
listenSwitch = newStandardSwitch(some(key))
|
||||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
||||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
|
||||||
|
|
||||||
var dialSwitch = newStandardSwitch()
|
|
||||||
await dialSwitch.start()
|
|
||||||
|
|
||||||
var listenSwitch = newStandardSwitch(some(key))
|
|
||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
var responseRequestIdFuture = newFuture[string]()
|
let dialSwitch = newStandardSwitch()
|
||||||
var completionFut = newFuture[bool]()
|
await dialSwitch.start()
|
||||||
|
|
||||||
proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
|
||||||
|
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||||
# TODO Success return here
|
# TODO Success return here
|
||||||
debug "handle push req"
|
debug "handle push req"
|
||||||
check:
|
check:
|
||||||
1 == 0
|
1 == 0
|
||||||
responseRequestIdFuture.complete(requestId)
|
|
||||||
|
|
||||||
# FIXME Unclear how we want to use subscriptions, if at all
|
# FIXME Unclear how we want to use subscriptions, if at all
|
||||||
let
|
let
|
||||||
proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
peerManager = PeerManager.new(dialSwitch)
|
||||||
wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
rng = crypto.newRng()
|
||||||
rpc = PushRequest(pubSubTopic: defaultTopic, message: wm)
|
proto = WakuLightPush.init(peerManager, rng, requestHandler)
|
||||||
|
|
||||||
dialSwitch.mount(proto)
|
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
dialSwitch.mount(proto)
|
||||||
|
|
||||||
|
|
||||||
# TODO Can possibly get rid of this if it isn't dynamic
|
# TODO Can possibly get rid of this if it isn't dynamic
|
||||||
proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
|
||||||
debug "push request handler"
|
debug "push request handler"
|
||||||
# TODO: Also relay message
|
# TODO: Also relay message
|
||||||
# TODO: Here we want to send back response with is_success true
|
# TODO: Here we want to send back response with is_success true
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let
|
let
|
||||||
proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle)
|
peerManager2 = PeerManager.new(listenSwitch)
|
||||||
|
rng2 = crypto.newRng()
|
||||||
|
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
|
||||||
|
|
||||||
listenSwitch.mount(proto2)
|
listenSwitch.mount(proto2)
|
||||||
|
|
||||||
proc handler(response: PushResponse) {.gcsafe, closure.} =
|
|
||||||
debug "push response handler, expecting false"
|
|
||||||
check:
|
|
||||||
response.isSuccess == false
|
|
||||||
debug "Additional info", info=response.info
|
|
||||||
completionFut.complete(true)
|
|
||||||
|
|
||||||
await proto.request(rpc, handler)
|
## Given
|
||||||
await sleepAsync(2.seconds)
|
let
|
||||||
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC)
|
||||||
|
rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await proto.request(rpc)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check res.isOk()
|
||||||
|
let response = res.get()
|
||||||
check:
|
check:
|
||||||
(await completionFut.withTimeout(5.seconds)) == true
|
not response.isSuccess
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(listenSwitch.stop(), dialSwitch.stop())
|
||||||
@ -19,7 +19,7 @@ import
|
|||||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||||
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
|
|||||||
@ -20,7 +20,7 @@ import
|
|||||||
../protocol/waku_store,
|
../protocol/waku_store,
|
||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter/waku_filter,
|
../protocol/waku_filter/waku_filter,
|
||||||
../protocol/waku_lightpush/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
../protocol/waku_rln_relay/[waku_rln_relay_types],
|
../protocol/waku_rln_relay/[waku_rln_relay_types],
|
||||||
../utils/[peers, requests, wakuswitch, wakuenr],
|
../utils/[peers, requests, wakuswitch, wakuenr],
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
../protocol/waku_store,
|
../protocol/waku_store,
|
||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter/waku_filter,
|
../protocol/waku_filter/waku_filter,
|
||||||
../protocol/waku_lightpush/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./discv5/waku_discv5
|
./discv5/waku_discv5
|
||||||
|
|||||||
9
waku/v2/protocol/waku_lightpush.nim
Normal file
9
waku/v2/protocol/waku_lightpush.nim
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
import
|
||||||
|
./waku_lightpush/protocol,
|
||||||
|
./waku_lightpush/rpc,
|
||||||
|
./waku_lightpush/rpc_codec
|
||||||
|
|
||||||
|
export
|
||||||
|
protocol,
|
||||||
|
rpc,
|
||||||
|
rpc_codec
|
||||||
156
waku/v2/protocol/waku_lightpush/protocol.nim
Normal file
156
waku/v2/protocol/waku_lightpush/protocol.nim
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/options,
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
metrics,
|
||||||
|
bearssl,
|
||||||
|
libp2p/crypto/crypto
|
||||||
|
|
||||||
|
import
|
||||||
|
../waku_message,
|
||||||
|
../waku_relay,
|
||||||
|
../../node/peer_manager/peer_manager,
|
||||||
|
../../utils/requests,
|
||||||
|
./rpc,
|
||||||
|
./rpc_codec
|
||||||
|
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakulightpush"
|
||||||
|
|
||||||
|
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||||
|
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||||
|
|
||||||
|
|
||||||
|
const
|
||||||
|
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||||
|
|
||||||
|
const
|
||||||
|
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
|
||||||
|
|
||||||
|
# Error types (metric label values)
|
||||||
|
const
|
||||||
|
dialFailure = "dial_failure"
|
||||||
|
decodeRpcFailure = "decode_rpc_failure"
|
||||||
|
|
||||||
|
|
||||||
|
type
|
||||||
|
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
WakuLightPushResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
WakuLightPush* = ref object of LPProtocol
|
||||||
|
rng*: ref BrHmacDrbgContext
|
||||||
|
peerManager*: PeerManager
|
||||||
|
requestHandler*: PushRequestHandler
|
||||||
|
relayReference*: WakuRelay
|
||||||
|
|
||||||
|
|
||||||
|
proc init*(wl: WakuLightPush) =
|
||||||
|
|
||||||
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
|
let message = await conn.readLp(MaxRpcSize.int)
|
||||||
|
let res = PushRPC.init(message)
|
||||||
|
if res.isErr():
|
||||||
|
error "failed to decode rpc"
|
||||||
|
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return
|
||||||
|
|
||||||
|
let rpc = res.get()
|
||||||
|
|
||||||
|
if rpc.request != PushRequest():
|
||||||
|
info "lightpush push request"
|
||||||
|
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||||
|
|
||||||
|
let
|
||||||
|
pubSubTopic = rpc.request.pubSubTopic
|
||||||
|
message = rpc.request.message
|
||||||
|
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
||||||
|
|
||||||
|
var response: PushResponse
|
||||||
|
if not wl.relayReference.isNil():
|
||||||
|
let data = message.encode().buffer
|
||||||
|
|
||||||
|
# Assumimng success, should probably be extended to check for network, peers, etc
|
||||||
|
discard wl.relayReference.publish(pubSubTopic, data)
|
||||||
|
response = PushResponse(is_success: true, info: "Totally.")
|
||||||
|
else:
|
||||||
|
debug "No relay protocol present, unsuccesssful push"
|
||||||
|
response = PushResponse(is_success: false, info: "No relay protocol")
|
||||||
|
|
||||||
|
|
||||||
|
let rpc = PushRPC(requestId: rpc.requestId, response: response)
|
||||||
|
await conn.writeLp(rpc.encode().buffer)
|
||||||
|
|
||||||
|
if rpc.response != PushResponse():
|
||||||
|
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||||
|
if rpc.response.isSuccess:
|
||||||
|
info "lightpush message success"
|
||||||
|
else:
|
||||||
|
info "lightpush message failure", info=rpc.response.info
|
||||||
|
|
||||||
|
wl.handler = handle
|
||||||
|
wl.codec = WakuLightPushCodec
|
||||||
|
|
||||||
|
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
||||||
|
debug "init"
|
||||||
|
let rng = crypto.newRng()
|
||||||
|
let wl = WakuLightPush(rng: rng,
|
||||||
|
peerManager: peerManager,
|
||||||
|
requestHandler: handler,
|
||||||
|
relayReference: relay)
|
||||||
|
wl.init()
|
||||||
|
|
||||||
|
return wl
|
||||||
|
|
||||||
|
|
||||||
|
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
||||||
|
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
||||||
|
waku_lightpush_peers.inc()
|
||||||
|
|
||||||
|
|
||||||
|
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||||
|
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
|
||||||
|
if connOpt.isNone():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
|
return err(dialFailure)
|
||||||
|
|
||||||
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
|
||||||
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
var message = await connection.readLp(MaxRpcSize.int)
|
||||||
|
let res = PushRPC.init(message)
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return err(decodeRpcFailure)
|
||||||
|
|
||||||
|
let rpcRes = res.get()
|
||||||
|
if rpcRes.response == PushResponse():
|
||||||
|
return err("empty response body")
|
||||||
|
|
||||||
|
return ok(rpcRes.response)
|
||||||
|
|
||||||
|
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
|
||||||
|
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
||||||
|
return err(dialFailure)
|
||||||
|
|
||||||
|
return await wl.request(req, peerOpt.get())
|
||||||
|
|
||||||
|
proc request*(wl: WakuLightPush, req: PushRequest, handler: PushResponseHandler) {.async, gcsafe,
|
||||||
|
deprecated: "Use the no-callback request() procedure".} =
|
||||||
|
let res = await wl.request(req)
|
||||||
|
if res.isErr():
|
||||||
|
return
|
||||||
|
|
||||||
|
handler(res.get())
|
||||||
18
waku/v2/protocol/waku_lightpush/rpc.nim
Normal file
18
waku/v2/protocol/waku_lightpush/rpc.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
{.push raises: [Defec].}
|
||||||
|
|
||||||
|
import
|
||||||
|
../waku_message
|
||||||
|
|
||||||
|
type
|
||||||
|
PushRequest* = object
|
||||||
|
pubSubTopic*: string
|
||||||
|
message*: WakuMessage
|
||||||
|
|
||||||
|
PushResponse* = object
|
||||||
|
isSuccess*: bool
|
||||||
|
info*: string
|
||||||
|
|
||||||
|
PushRPC* = object
|
||||||
|
requestId*: string
|
||||||
|
request*: PushRequest
|
||||||
|
response*: PushResponse
|
||||||
86
waku/v2/protocol/waku_lightpush/rpc_codec.nim
Normal file
86
waku/v2/protocol/waku_lightpush/rpc_codec.nim
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
|
||||||
|
import
|
||||||
|
libp2p/protobuf/minprotobuf
|
||||||
|
import
|
||||||
|
../waku_message,
|
||||||
|
../../utils/protobuf,
|
||||||
|
./rpc
|
||||||
|
|
||||||
|
|
||||||
|
proc encode*(rpc: PushRequest): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
output.write3(1, rpc.pubSubTopic)
|
||||||
|
output.write3(2, rpc.message.encode())
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PushRequest()
|
||||||
|
|
||||||
|
var pubSubTopic: string
|
||||||
|
discard ?pb.getField(1, pubSubTopic)
|
||||||
|
rpc.pubSubTopic = pubSubTopic
|
||||||
|
|
||||||
|
var buf: seq[byte]
|
||||||
|
discard ?pb.getField(2, buf)
|
||||||
|
rpc.message = ?WakuMessage.init(buf)
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
|
|
||||||
|
proc encode*(rpc: PushResponse): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
output.write3(1, uint64(rpc.isSuccess))
|
||||||
|
output.write3(2, rpc.info)
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PushResponse(isSuccess: false, info: "")
|
||||||
|
|
||||||
|
var isSuccess: uint64
|
||||||
|
if ?pb.getField(1, isSuccess):
|
||||||
|
rpc.isSuccess = bool(isSuccess)
|
||||||
|
|
||||||
|
var info: string
|
||||||
|
discard ?pb.getField(2, info)
|
||||||
|
rpc.info = info
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
|
|
||||||
|
proc encode*(rpc: PushRPC): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
output.write3(1, rpc.requestId)
|
||||||
|
output.write3(2, rpc.request.encode())
|
||||||
|
output.write3(3, rpc.response.encode())
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PushRPC()
|
||||||
|
|
||||||
|
var requestId: string
|
||||||
|
discard ?pb.getField(1, requestId)
|
||||||
|
rpc.requestId = requestId
|
||||||
|
|
||||||
|
var requestBuffer: seq[byte]
|
||||||
|
discard ?pb.getField(2, requestBuffer)
|
||||||
|
rpc.request = ?PushRequest.init(requestBuffer)
|
||||||
|
|
||||||
|
var pushBuffer: seq[byte]
|
||||||
|
discard ?pb.getField(3, pushBuffer)
|
||||||
|
rpc.response = ?PushResponse.init(pushBuffer)
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
@ -1,202 +0,0 @@
|
|||||||
{.push raises: [Defect].}
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[tables, options],
|
|
||||||
bearssl,
|
|
||||||
chronos, chronicles, metrics, stew/results,
|
|
||||||
libp2p/protocols/pubsub/pubsubpeer,
|
|
||||||
libp2p/protocols/pubsub/floodsub,
|
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
|
||||||
libp2p/protocols/protocol,
|
|
||||||
libp2p/protobuf/minprotobuf,
|
|
||||||
libp2p/stream/connection,
|
|
||||||
libp2p/crypto/crypto,
|
|
||||||
waku_lightpush_types,
|
|
||||||
../../utils/requests,
|
|
||||||
../../utils/protobuf,
|
|
||||||
../../node/peer_manager/peer_manager,
|
|
||||||
../waku_relay
|
|
||||||
|
|
||||||
export waku_lightpush_types
|
|
||||||
|
|
||||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
|
||||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
|
||||||
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "wakulightpush"
|
|
||||||
|
|
||||||
const
|
|
||||||
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
|
||||||
|
|
||||||
# Error types (metric label values)
|
|
||||||
const
|
|
||||||
dialFailure = "dial_failure"
|
|
||||||
decodeRpcFailure = "decode_rpc_failure"
|
|
||||||
|
|
||||||
# Encoding and decoding -------------------------------------------------------
|
|
||||||
proc encode*(rpc: PushRequest): ProtoBuffer =
|
|
||||||
var output = initProtoBuffer()
|
|
||||||
|
|
||||||
output.write3(1, rpc.pubSubTopic)
|
|
||||||
output.write3(2, rpc.message.encode())
|
|
||||||
|
|
||||||
output.finish3()
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
|
||||||
#var rpc = PushRequest(pubSubTopic: "", message: WakuMessage())
|
|
||||||
var rpc = PushRequest()
|
|
||||||
let pb = initProtoBuffer(buffer)
|
|
||||||
|
|
||||||
var pubSubTopic: string
|
|
||||||
discard ? pb.getField(1, pubSubTopic)
|
|
||||||
rpc.pubSubTopic = pubSubTopic
|
|
||||||
|
|
||||||
var buf: seq[byte]
|
|
||||||
discard ? pb.getField(2, buf)
|
|
||||||
rpc.message = ? WakuMessage.init(buf)
|
|
||||||
|
|
||||||
return ok(rpc)
|
|
||||||
|
|
||||||
proc encode*(rpc: PushResponse): ProtoBuffer =
|
|
||||||
var output = initProtoBuffer()
|
|
||||||
|
|
||||||
output.write3(1, uint64(rpc.isSuccess))
|
|
||||||
output.write3(2, rpc.info)
|
|
||||||
|
|
||||||
output.finish3()
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
|
|
||||||
var rpc = PushResponse(isSuccess: false, info: "")
|
|
||||||
let pb = initProtoBuffer(buffer)
|
|
||||||
|
|
||||||
var isSuccess: uint64
|
|
||||||
if ? pb.getField(1, isSuccess):
|
|
||||||
rpc.isSuccess = bool(isSuccess)
|
|
||||||
|
|
||||||
var info: string
|
|
||||||
discard ? pb.getField(2, info)
|
|
||||||
rpc.info = info
|
|
||||||
|
|
||||||
return ok(rpc)
|
|
||||||
|
|
||||||
proc encode*(rpc: PushRPC): ProtoBuffer =
|
|
||||||
var output = initProtoBuffer()
|
|
||||||
|
|
||||||
output.write3(1, rpc.requestId)
|
|
||||||
output.write3(2, rpc.request.encode())
|
|
||||||
output.write3(3, rpc.response.encode())
|
|
||||||
|
|
||||||
output.finish3()
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
|
|
||||||
var rpc = PushRPC()
|
|
||||||
let pb = initProtoBuffer(buffer)
|
|
||||||
|
|
||||||
discard ? pb.getField(1, rpc.requestId)
|
|
||||||
|
|
||||||
var requestBuffer: seq[byte]
|
|
||||||
discard ? pb.getField(2, requestBuffer)
|
|
||||||
|
|
||||||
rpc.request = ? PushRequest.init(requestBuffer)
|
|
||||||
|
|
||||||
var pushBuffer: seq[byte]
|
|
||||||
discard ? pb.getField(3, pushBuffer)
|
|
||||||
|
|
||||||
rpc.response = ? PushResponse.init(pushBuffer)
|
|
||||||
|
|
||||||
return ok(rpc)
|
|
||||||
|
|
||||||
# Protocol -------------------------------------------------------
|
|
||||||
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
|
|
||||||
debug "init"
|
|
||||||
let rng = crypto.newRng()
|
|
||||||
var wl = WakuLightPush(rng: rng,
|
|
||||||
peerManager: peerManager,
|
|
||||||
requestHandler: handler,
|
|
||||||
relayReference: relay)
|
|
||||||
wl.init()
|
|
||||||
|
|
||||||
return wl
|
|
||||||
|
|
||||||
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
|
|
||||||
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
|
|
||||||
waku_lightpush_peers.inc()
|
|
||||||
|
|
||||||
method init*(wlp: WakuLightPush) =
|
|
||||||
debug "init"
|
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
|
||||||
var message = await conn.readLp(MaxRpcSize.int)
|
|
||||||
var res = PushRPC.init(message)
|
|
||||||
if res.isErr:
|
|
||||||
error "failed to decode rpc"
|
|
||||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
|
||||||
return
|
|
||||||
|
|
||||||
info "lightpush message received"
|
|
||||||
|
|
||||||
let value = res.value
|
|
||||||
if value.request != PushRequest():
|
|
||||||
info "lightpush push request"
|
|
||||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
|
||||||
let
|
|
||||||
pubSubTopic = value.request.pubSubTopic
|
|
||||||
message = value.request.message
|
|
||||||
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
|
|
||||||
var response: PushResponse
|
|
||||||
if wlp.relayReference != nil:
|
|
||||||
let wakuRelay = wlp.relayReference
|
|
||||||
let data = message.encode().buffer
|
|
||||||
# XXX Assumes success, should probably be extended to check for network, peers, etc
|
|
||||||
discard wakuRelay.publish(pubSubTopic, data)
|
|
||||||
response = PushResponse(is_success: true, info: "Totally.")
|
|
||||||
else:
|
|
||||||
debug "No relay protocol present, unsuccesssful push"
|
|
||||||
response = PushResponse(is_success: false, info: "No relay protocol")
|
|
||||||
await conn.writeLp(PushRPC(requestId: value.requestId,
|
|
||||||
response: response).encode().buffer)
|
|
||||||
#wlp.requestHandler(value.requestId, value.request)
|
|
||||||
if value.response != PushResponse():
|
|
||||||
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
|
||||||
if value.response.isSuccess:
|
|
||||||
info "lightpush message success"
|
|
||||||
else:
|
|
||||||
info "lightpush message failure", info=value.response.info
|
|
||||||
|
|
||||||
wlp.handler = handle
|
|
||||||
wlp.codec = WakuLightPushCodec
|
|
||||||
|
|
||||||
proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} =
|
|
||||||
let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec)
|
|
||||||
|
|
||||||
if peerOpt.isNone():
|
|
||||||
error "no suitable remote peers"
|
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
|
||||||
return
|
|
||||||
|
|
||||||
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec)
|
|
||||||
|
|
||||||
if connOpt.isNone():
|
|
||||||
# @TODO more sophisticated error handling here
|
|
||||||
error "failed to connect to remote peer"
|
|
||||||
waku_lightpush_errors.inc(labelValues = [dialFailure])
|
|
||||||
return
|
|
||||||
|
|
||||||
await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng),
|
|
||||||
request: request).encode().buffer)
|
|
||||||
|
|
||||||
var message = await connOpt.get().readLp(64*1024)
|
|
||||||
let response = PushRPC.init(message)
|
|
||||||
|
|
||||||
if response.isErr:
|
|
||||||
error "failed to decode response"
|
|
||||||
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
|
|
||||||
return
|
|
||||||
|
|
||||||
handler(response.value.response)
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
import
|
|
||||||
std/[tables],
|
|
||||||
bearssl,
|
|
||||||
libp2p/protocols/protocol,
|
|
||||||
../../node/peer_manager/peer_manager,
|
|
||||||
../waku_message,
|
|
||||||
../waku_relay
|
|
||||||
|
|
||||||
export waku_message
|
|
||||||
|
|
||||||
const
|
|
||||||
MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
|
|
||||||
|
|
||||||
type
|
|
||||||
PushRequest* = object
|
|
||||||
pubSubTopic*: string
|
|
||||||
message*: WakuMessage
|
|
||||||
|
|
||||||
PushResponse* = object
|
|
||||||
isSuccess*: bool
|
|
||||||
info*: string
|
|
||||||
|
|
||||||
PushRPC* = object
|
|
||||||
requestId*: string
|
|
||||||
request*: PushRequest
|
|
||||||
response*: PushResponse
|
|
||||||
|
|
||||||
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
|
|
||||||
|
|
||||||
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
|
|
||||||
|
|
||||||
WakuLightPush* = ref object of LPProtocol
|
|
||||||
rng*: ref BrHmacDrbgContext
|
|
||||||
peerManager*: PeerManager
|
|
||||||
requestHandler*: PushRequestHandler
|
|
||||||
relayReference*: WakuRelay
|
|
||||||
Loading…
x
Reference in New Issue
Block a user