From 97d3119c337a439c250c117ccd5bc20347701760 Mon Sep 17 00:00:00 2001 From: Oskar Thoren Date: Fri, 22 May 2020 15:35:31 +0800 Subject: [PATCH] Subscribe RPC and quicksim WIP; hacky in-line handler --- waku/node/v2/quicksim.nim | 6 +++++- waku/node/v2/rpc/wakucallsigs.nim | 2 ++ waku/node/v2/rpc/wakurpc.nim | 13 ++++++++++++- waku/protocol/v2/waku_protocol.nim | 8 ++++++++ 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/waku/node/v2/quicksim.nim b/waku/node/v2/quicksim.nim index 326ad6822..4c0f67de3 100644 --- a/waku/node/v2/quicksim.nim +++ b/waku/node/v2/quicksim.nim @@ -34,7 +34,11 @@ waitFor node2.connect("localhost", Port(8548)) let version = waitFor node1.wakuVersion() -let res = waitFor node1.wakuPost("hello world") +proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + debug "Hit handler", topic=topic, data=data + +# TODO: Implement handler logic +let res1 = waitFor node2.wakuSubscribe("foobar") let res2 = waitFor node1.wakuPublish("foobar", "hello world") info "Version is", version diff --git a/waku/node/v2/rpc/wakucallsigs.nim b/waku/node/v2/rpc/wakucallsigs.nim index 5d67e41f4..bf54dad35 100644 --- a/waku/node/v2/rpc/wakucallsigs.nim +++ b/waku/node/v2/rpc/wakucallsigs.nim @@ -1,6 +1,8 @@ # Alpha - Currently implemented in v2 proc waku_version(): string proc waku_publish(topic: string, message: string): bool +proc waku_subscribe(topic: string): bool +#proc waku_subscribe(topic: string, handler: Topichandler): bool # NYI proc waku_info(): WakuInfo diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 8e35c4c22..11704cffd 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -4,7 +4,8 @@ import ../../../protocol/v2/waku_protocol, nimcrypto/[sysrand, hmac, sha2, pbkdf2], ../../v1/rpc/[rpc_types, hexstrings, key_storage], - ../waku_types + ../waku_types, + libp2p/protocols/pubsub/pubsub from stew/byteutils import hexToSeqByte, hexToByteArray @@ -34,6 +35,16 @@ proc setupWakuRPC*(wakuProto: WakuProto, rpcsrv: RpcServer) = return true #if not result: # raise newException(ValueError, "Message could not be posted") + + # TODO: Handler / Identifier logic + rpcsrv.rpc("waku_subscribe") do(topic: string) -> bool: + let wakuSub = cast[WakuSub](wakuProto.switch.pubSub.get()) + + # XXX: Hacky in-line handler + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + info "Hit subscribe handler", topic=topic, data=data + + discard wakuSub.subscribe(topic, handler) return true #if not result: # raise newException(ValueError, "Message could not be posted") diff --git a/waku/protocol/v2/waku_protocol.nim b/waku/protocol/v2/waku_protocol.nim index f05929b0e..28d016d88 100644 --- a/waku/protocol/v2/waku_protocol.nim +++ b/waku/protocol/v2/waku_protocol.nim @@ -47,6 +47,14 @@ method initPubSub*(w: WakuSub) = procCall FloodSub(w).initPubSub() w.init() +method subscribe*(w: WakuSub, + topic: string, + handler: TopicHandler) {.async.} = + debug "subscribe", topic=topic + # XXX: Pubsub really + await procCall FloodSub(w).subscribe(topic, handler) + +# Subscribing a peer to a specified topic method subscribeTopic*(w: WakuSub, topic: string, subscribe: bool,