Light push protocol initial cut (#503)

* Init lightpush

* Lightpush types

* light push encode and decode

* lightpush protocol more

* light push test draft

* lightpush compiles

* lightpush fix tests etc

* cleanup

* pubsubTopic -> pubSubTopic
This commit is contained in:
Oskar Thorén 2021-04-23 14:56:04 +08:00 committed by GitHub
parent f1a6027158
commit d7de633793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 276 additions and 0 deletions

View File

@ -0,0 +1,76 @@
{.used.}
import
std/[options, tables, sets],
testutils/unittests, chronos, chronicles,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../test_helpers, ./utils
procSuite "Waku Light Push":
asyncTest "handle light push request":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseRequestIdFuture = newFuture[string]()
var completionFut = newFuture[bool]()
proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
# TODO Success return here
debug "handle push req"
check:
1 == 0
responseRequestIdFuture.complete(requestId)
# FIXME Unclear how we want to use subscriptions, if at all
let
proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
rpc = PushRequest(pubSubTopic: defaultTopic, message: wm)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
# TODO Can possibly get rid of this if it isn't dynamic
proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} =
debug "push request handler"
# TODO: Also relay message
# TODO: Here we want to send back response with is_success true
discard
let
proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle)
listenSwitch.mount(proto2)
# FIXME Don't think this will be hit yet
proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting false"
check:
response.isSuccess == false
completionFut.complete(true)
await proto.request(rpc, handler)
await sleepAsync(2.seconds)
check:
(await completionFut.withTimeout(5.seconds)) == true

View File

@ -0,0 +1 @@
# Waku Light Push

View File

@ -0,0 +1,167 @@
import
std/[tables, sequtils, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
../message_notifier,
waku_lightpush_types,
../../utils/requests,
../../node/peer_manager/peer_manager
export waku_lightpush_types
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
logScope:
topics = "wakulightpush"
const
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-alpha1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
# Encoding and decoding -------------------------------------------------------
proc encode*(rpc: PushRequest): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rpc.pubSubTopic)
result.write(2, rpc.message.encode())
proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
#var rpc = PushRequest(pubSubTopic: "", message: WakuMessage())
var rpc = PushRequest()
let pb = initProtoBuffer(buffer)
var pubSubTopic: string
discard ? pb.getField(1, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
var buf: seq[byte]
discard ? pb.getField(2, buf)
rpc.message = ? WakuMessage.init(buf)
ok(rpc)
proc encode*(rpc: PushResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, uint64(rpc.isSuccess))
result.write(2, rpc.info)
proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushResponse(isSuccess: false, info: "")
let pb = initProtoBuffer(buffer)
var isSuccess: uint64
if ? pb.getField(1, isSuccess):
rpc.isSuccess = bool(isSuccess)
var info: string
discard ? pb.getField(2, info)
rpc.info = info
ok(rpc)
proc encode*(rpc: PushRPC): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rpc.requestId)
result.write(2, rpc.request.encode())
result.write(3, rpc.response.encode())
proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var requestBuffer: seq[byte]
discard ? pb.getField(2, requestBuffer)
rpc.request = ? PushRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ? pb.getField(3, pushBuffer)
rpc.response = ? PushResponse.init(pushBuffer)
ok(rpc)
# Protocol -------------------------------------------------------
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler): T =
new result
result.rng = crypto.newRng()
result.peerManager = peerManager
result.requestHandler = handler
result.init()
proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
wlp.peerManager.addPeer(peer, WakuLightPushCodec)
waku_lightpush_peers.inc()
method init*(wlp: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var res = PushRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
info "lightpush message received"
let value = res.value
if value.request != PushRequest():
info "lightpush push request"
# TODO Relay messages here
var response = PushResponse(is_success: false, info: "NYI")
await conn.writeLp(PushRPC(requestId: value.requestId,
response: response).encode().buffer)
#wlp.requestHandler(value.requestId, value.request)
if value.response != PushResponse():
if value.response.isSuccess:
info "lightpush message success"
else:
info "lightpush message failure", info=value.response.info
wlp.handler = handle
wlp.codec = WakuLightPushCodec
proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} =
let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_lightpush_errors.inc(labelValues = [dialFailure])
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_lightpush_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng),
request: request).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
let response = PushRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return
handler(response.value.response)

View File

@ -0,0 +1,32 @@
import
std/[tables],
bearssl,
libp2p/peerinfo,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message
export waku_message
type
PushRequest* = object
pubSubTopic*: string
message*: WakuMessage
PushResponse* = object
isSuccess*: bool
info*: string
PushRPC* = object
requestId*: string
request*: PushRequest
response*: PushResponse
PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.}
PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.}
WakuLightPush* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
requestHandler*: PushRequestHandler