refactor(waku-lightpush): waku lightpush protocol code reorganization

This commit is contained in:
Lorenzo Delgado 2022-08-03 01:47:42 +02:00 committed by Lorenzo Delgado
parent 1575312a0c
commit d201a37330
10 changed files with 320 additions and 280 deletions

View File

@ -1,76 +1,85 @@
{.used.}
import
std/[options, tables, sets],
testutils/unittests, chronos, chronicles,
std/options,
testutils/unittests,
chronicles,
chronos,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/crypto/crypto
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../test_helpers, ./utils
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_lightpush,
../test_helpers
procSuite "Waku Light Push":
# NOTE See test_wakunode for light push request success
const
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
# TODO: Extend lightpush protocol test coverage
procSuite "Waku Lightpush":
asyncTest "handle light push request success":
# TODO: Move here the test case at test_wakunode: light push request success
discard
asyncTest "handle light push request fail":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
var responseRequestIdFuture = newFuture[string]()
var completionFut = newFuture[bool]()
let dialSwitch = newStandardSwitch()
await dialSwitch.start()
proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
# TODO Success return here
debug "handle push req"
check:
1 == 0
responseRequestIdFuture.complete(requestId)
# FIXME Unclear how we want to use subscriptions, if at all
let
proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
rpc = PushRequest(pubSubTopic: defaultTopic, message: wm)
peerManager = PeerManager.new(dialSwitch)
rng = crypto.newRng()
proto = WakuLightPush.init(peerManager, rng, requestHandler)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
dialSwitch.mount(proto)
# TODO Can possibly get rid of this if it isn't dynamic
proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
debug "push request handler"
# TODO: Also relay message
# TODO: Here we want to send back response with is_success true
discard
let
proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle)
peerManager2 = PeerManager.new(listenSwitch)
rng2 = crypto.newRng()
proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2)
listenSwitch.mount(proto2)
proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting false"
check:
response.isSuccess == false
debug "Additional info", info=response.info
completionFut.complete(true)
await proto.request(rpc, handler)
await sleepAsync(2.seconds)
## Given
let
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC)
rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC)
## When
let res = await proto.request(rpc)
## Then
check res.isOk()
let response = res.get()
check:
(await completionFut.withTimeout(5.seconds)) == true
not response.isSuccess
## Cleanup
await allFutures(listenSwitch.stop(), dialSwitch.stop())

View File

@ -19,7 +19,7 @@ import
../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,

View File

@ -20,7 +20,7 @@ import
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_lightpush/waku_lightpush,
../protocol/waku_lightpush,
../protocol/waku_rln_relay/[waku_rln_relay_types],
../utils/[peers, requests, wakuswitch, wakuenr],
./peer_manager/peer_manager,

View File

@ -6,7 +6,7 @@ import
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_lightpush/waku_lightpush,
../protocol/waku_lightpush,
../protocol/waku_rln_relay/waku_rln_relay_types,
./peer_manager/peer_manager,
./discv5/waku_discv5

View File

@ -0,0 +1,9 @@
import
./waku_lightpush/protocol,
./waku_lightpush/rpc,
./waku_lightpush/rpc_codec
export
protocol,
rpc,
rpc_codec

View File

@ -0,0 +1,156 @@
{.push raises: [Defect].}
import
std/options,
stew/results,
chronicles,
chronos,
metrics,
bearssl,
libp2p/crypto/crypto
import
../waku_message,
../waku_relay,
../../node/peer_manager/peer_manager,
../../utils/requests,
./rpc,
./rpc_codec
logScope:
topics = "wakulightpush"
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
const
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
const
MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
type
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
WakuLightPushResult*[T] = Result[T, string]
WakuLightPush* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
requestHandler*: PushRequestHandler
relayReference*: WakuRelay
proc init*(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
let res = PushRPC.init(message)
if res.isErr():
error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = res.get()
if rpc.request != PushRequest():
info "lightpush push request"
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
let
pubSubTopic = rpc.request.pubSubTopic
message = rpc.request.message
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
var response: PushResponse
if not wl.relayReference.isNil():
let data = message.encode().buffer
# Assumimng success, should probably be extended to check for network, peers, etc
discard wl.relayReference.publish(pubSubTopic, data)
response = PushResponse(is_success: true, info: "Totally.")
else:
debug "No relay protocol present, unsuccesssful push"
response = PushResponse(is_success: false, info: "No relay protocol")
let rpc = PushRPC(requestId: rpc.requestId, response: response)
await conn.writeLp(rpc.encode().buffer)
if rpc.response != PushResponse():
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
if rpc.response.isSuccess:
info "lightpush message success"
else:
info "lightpush message failure", info=rpc.response.info
wl.handler = handle
wl.codec = WakuLightPushCodec
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
debug "init"
let rng = crypto.newRng()
let wl = WakuLightPush(rng: rng,
peerManager: peerManager,
requestHandler: handler,
relayReference: relay)
wl.init()
return wl
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()
proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
if connOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connection.readLp(MaxRpcSize.int)
let res = PushRPC.init(message)
if res.isErr():
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
let rpcRes = res.get()
if rpcRes.response == PushResponse():
return err("empty response body")
return ok(rpcRes.response)
proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} =
let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
waku_lightpush_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
return await wl.request(req, peerOpt.get())
proc request*(wl: WakuLightPush, req: PushRequest, handler: PushResponseHandler) {.async, gcsafe,
deprecated: "Use the no-callback request() procedure".} =
let res = await wl.request(req)
if res.isErr():
return
handler(res.get())

View File

@ -0,0 +1,18 @@
{.push raises: [Defec].}
import
../waku_message
type
PushRequest* = object
pubSubTopic*: string
message*: WakuMessage
PushResponse* = object
isSuccess*: bool
info*: string
PushRPC* = object
requestId*: string
request*: PushRequest
response*: PushResponse

View File

@ -0,0 +1,86 @@
{.push raises: [Defect].}
import
libp2p/protobuf/minprotobuf
import
../waku_message,
../../utils/protobuf,
./rpc
proc encode*(rpc: PushRequest): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.pubSubTopic)
output.write3(2, rpc.message.encode())
output.finish3()
return output
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRequest()
var pubSubTopic: string
discard ?pb.getField(1, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
var buf: seq[byte]
discard ?pb.getField(2, buf)
rpc.message = ?WakuMessage.init(buf)
return ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, uint64(rpc.isSuccess))
output.write3(2, rpc.info)
output.finish3()
return output
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushResponse(isSuccess: false, info: "")
var isSuccess: uint64
if ?pb.getField(1, isSuccess):
rpc.isSuccess = bool(isSuccess)
var info: string
discard ?pb.getField(2, info)
rpc.info = info
return ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode())
output.write3(3, rpc.response.encode())
output.finish3()
return output
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushRPC()
var requestId: string
discard ?pb.getField(1, requestId)
rpc.requestId = requestId
var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer)
rpc.request = ?PushRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer)
rpc.response = ?PushResponse.init(pushBuffer)
return ok(rpc)

View File

@ -1,202 +0,0 @@
{.push raises: [Defect].}
import
std/[tables, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
waku_lightpush_types,
../../utils/requests,
../../utils/protobuf,
../../node/peer_manager/peer_manager,
../waku_relay
export waku_lightpush_types
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
logScope:
topics = "wakulightpush"
const
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
# Encoding and decoding -------------------------------------------------------
proc encode*(rpc: PushRequest): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.pubSubTopic)
output.write3(2, rpc.message.encode())
output.finish3()
return output
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
#var rpc = PushRequest(pubSubTopic: "", message: WakuMessage())
var rpc = PushRequest()
let pb = initProtoBuffer(buffer)
var pubSubTopic: string
discard ? pb.getField(1, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
var buf: seq[byte]
discard ? pb.getField(2, buf)
rpc.message = ? WakuMessage.init(buf)
return ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, uint64(rpc.isSuccess))
output.write3(2, rpc.info)
output.finish3()
return output
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushResponse(isSuccess: false, info: "")
let pb = initProtoBuffer(buffer)
var isSuccess: uint64
if ? pb.getField(1, isSuccess):
rpc.isSuccess = bool(isSuccess)
var info: string
discard ? pb.getField(2, info)
rpc.info = info
return ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode())
output.write3(3, rpc.response.encode())
output.finish3()
return output
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var requestBuffer: seq[byte]
discard ? pb.getField(2, requestBuffer)
rpc.request = ? PushRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ? pb.getField(3, pushBuffer)
rpc.response = ? PushResponse.init(pushBuffer)
return ok(rpc)
# Protocol -------------------------------------------------------
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
debug "init"
let rng = crypto.newRng()
var wl = WakuLightPush(rng: rng,
peerManager: peerManager,
requestHandler: handler,
relayReference: relay)
wl.init()
return wl
proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) =
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()
method init*(wlp: WakuLightPush) =
debug "init"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(MaxRpcSize.int)
var res = PushRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
info "lightpush message received"
let value = res.value
if value.request != PushRequest():
info "lightpush push request"
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
let
pubSubTopic = value.request.pubSubTopic
message = value.request.message
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
var response: PushResponse
if wlp.relayReference != nil:
let wakuRelay = wlp.relayReference
let data = message.encode().buffer
# XXX Assumes success, should probably be extended to check for network, peers, etc
discard wakuRelay.publish(pubSubTopic, data)
response = PushResponse(is_success: true, info: "Totally.")
else:
debug "No relay protocol present, unsuccesssful push"
response = PushResponse(is_success: false, info: "No relay protocol")
await conn.writeLp(PushRPC(requestId: value.requestId,
response: response).encode().buffer)
#wlp.requestHandler(value.requestId, value.request)
if value.response != PushResponse():
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
if value.response.isSuccess:
info "lightpush message success"
else:
info "lightpush message failure", info=value.response.info
wlp.handler = handle
wlp.codec = WakuLightPushCodec
proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} =
let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_lightpush_errors.inc(labelValues = [dialFailure])
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_lightpush_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng),
request: request).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
let response = PushRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
handler(response.value.response)

View File

@ -1,36 +0,0 @@
import
std/[tables],
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message,
../waku_relay
export waku_message
const
MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
type
PushRequest* = object
pubSubTopic*: string
message*: WakuMessage
PushResponse* = object
isSuccess*: bool
info*: string
PushRPC* = object
requestId*: string
request*: PushRequest
response*: PushResponse
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
WakuLightPush* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
requestHandler*: PushRequestHandler
relayReference*: WakuRelay