Refactor: Waku protocol -> Waku relay protocol (#104)

* Refactor: Waku protocol -> Waku relay protocol

Including updating protocol string.

Addresses https://github.com/status-im/nim-waku/issues/101

* Refactor: WakuSub -> WakuRelay
This commit is contained in:
Oskar Thorén 2020-08-26 19:28:24 +08:00 committed by GitHub
parent 28f21f0920
commit 893134b536
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 43 deletions

View File

@ -16,7 +16,7 @@ import utils,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/floodsub
import ../../waku/protocol/v2/waku_protocol2
import ../../waku/protocol/v2/waku_relay
import ../test_helpers
@ -26,12 +26,12 @@ const
# TODO: Start with floodsub here, then move other logic here
# XXX: If I cast to WakuSub here I get a SIGSEGV
# XXX: If I cast to WakuRelay here I get a SIGSEGV
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# turn things deterministic
# this is for testing purposes only
var ceil = 15
let fsub = cast[WakuSub](sender.pubSub.get())
let fsub = cast[WakuRelay](sender.pubSub.get())
while not fsub.floodsub.hasKey(key) or
not fsub.floodsub[key].anyIt(it.peerInfo.id == receiver.peerInfo.id):
await sleepAsync(100.millis)

View File

@ -50,8 +50,9 @@ task wakusim, "Build Waku simulation tools":
buildBinary "quicksim", "waku/node/v1/", "-d:chronicles_log_level=INFO"
buildBinary "start_network", "waku/node/v1/", "-d:chronicles_log_level=DEBUG"
# TODO Also build Waku store and filter protocols here
task protocol2, "Build the experimental Waku protocol":
buildBinary "waku_protocol2", "waku/protocol/v2/", "-d:chronicles_log_level=TRACE"
buildBinary "waku_relay", "waku/protocol/v2/", "-d:chronicles_log_level=TRACE"
task wakunode2, "Build Experimental Waku cli":
buildBinary "wakunode2", "waku/node/v2/", "-d:chronicles_log_level=TRACE"

View File

@ -2,4 +2,4 @@
-d:"chronicles_runtime_filtering=on"
-d:nimDebugDlOpen
# Results in empty output for some reason
#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuSub:TRACE"
#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE"

View File

@ -1,7 +1,7 @@
import
json_rpc/rpcserver, options,
eth/[common, rlp, keys, p2p],
../../../protocol/v2/waku_protocol2,
../../../protocol/v2/waku_relay,
nimcrypto/[sysrand, hmac, sha2],
../waku_types
@ -13,18 +13,19 @@ import
# Where is the equivalent in Waku/2?
# TODO: Extend to get access to protocol state and keys
#proc setupWakuRPC*(node: EthereumNode, keys: KeyStorage, rpcsrv: RpcServer) =
proc setupWakuRPC*(wakuProto: WakuProto, rpcsrv: RpcServer) =
# TODO This should probably take node, not wakuRelayProto
proc setupWakuRPC*(wakuRelayProto: WakuRelayProto, rpcsrv: RpcServer) =
# Seems easy enough, lets try to get this first
rpcsrv.rpc("waku_version") do() -> string:
## Returns string of the current Waku protocol version.
result = WakuSubCodec
result = WakuRelayCodec
# TODO: Implement symkey etc logic
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
# Assumes someone subscribing on this topic
#let wakuSub = wakuProto.switch.pubsub
let wakuSub = cast[WakuSub](wakuProto.switch.pubSub.get())
#let wakuSub = wakuRelayProto.switch.pubsub
let wakuSub = cast[WakuRelay](wakuRelayProto.switch.pubSub.get())
# XXX also future return type
discard wakuSub.publish(topic, message)
return true
@ -33,7 +34,7 @@ proc setupWakuRPC*(wakuProto: WakuProto, rpcsrv: RpcServer) =
# TODO: Handler / Identifier logic
rpcsrv.rpc("waku_subscribe") do(topic: string) -> bool:
let wakuSub = cast[WakuSub](wakuProto.switch.pubSub.get())
let wakuSub = cast[WakuRelay](wakuRelayProto.switch.pubSub.get())
# XXX: Hacky in-line handler
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =

View File

@ -12,7 +12,7 @@ import
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
libp2p/protocols/[identify, secure/secure],
libp2p/protocols/pubsub/[pubsub, gossipsub],
../../protocol/v2/waku_protocol2
../../protocol/v2/waku_relay
import
libp2p/protocols/secure/noise,
@ -66,7 +66,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
of SecureProtocol.Secio:
secureManagerInstances &= newSecio(rng, seckey).Secure
let pubSub = PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
let pubSub = PubSub newPubSub(WakuRelay, peerInfo, triggerSelf)
result = newSwitch(peerInfo,
transports,

View File

@ -4,7 +4,7 @@ import libp2p/multiaddress,
libp2p/peerinfo,
standard_setup
type WakuProto* = ref object of LPProtocol
type WakuRelayProto* = ref object of LPProtocol
switch*: Switch
conn*: Connection
connected*: bool

View File

@ -12,7 +12,7 @@ import
stew/shims/net as stewNet,
rpc/wakurpc,
standard_setup,
../../protocol/v2/waku_protocol2,
../../protocol/v2/waku_relay,
# TODO: Pull out standard switch from tests
waku_types
@ -68,7 +68,7 @@ proc initAddress(T: type MultiAddress, str: string): T =
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, tcpProtocol, port)
proc dialPeer(p: WakuProto, address: string) {.async.} =
proc dialPeer(p: WakuRelayProto, address: string) {.async.} =
info "dialPeer", address = address
# XXX: This turns ipfs into p2p, not quite sure why
let multiAddr = MultiAddress.initAddress(address)
@ -77,12 +77,12 @@ proc dialPeer(p: WakuProto, address: string) {.async.} =
let remotePeer = PeerInfo.init(parts[^1], [multiAddr])
info "Dialing peer", multiAddr
p.conn = await p.switch.dial(remotePeer, WakuSubCodec)
p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
info "Post switch dial"
# Isn't there just one p instance? Why connected here?
p.connected = true
proc connectToNodes(p: WakuProto, nodes: openArray[string]) =
proc connectToNodes(p: WakuRelayProto, nodes: openArray[string]) =
for nodeId in nodes:
info "connectToNodes", node = nodeId
# XXX: This seems...brittle
@ -136,16 +136,16 @@ proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress],
if extPorts.isSome:
(result.tcpPort, result.udpPort) = extPorts.get()
proc newWakuProto(switch: Switch): WakuProto =
var wakuproto = WakuProto(switch: switch, codec: WakuSubCodec)
proc newWakuRelayProto(switch: Switch): WakuRelayProto =
var wakuRelayProto = WakuRelayProto(switch: switch, codec: WakuRelayCodec)
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp(1024))
await conn.writeLp("Hello!")
await conn.close()
wakuproto.handler = handle
return wakuproto
wakuRelayProto.handler = handle
return wakuRelayProto
# TODO Consider removing unused arguments
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch,
@ -187,16 +187,17 @@ proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} =
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
node.libp2pTransportLoops = await node.switch.start()
let wakuProto = newWakuProto(node.switch)
node.switch.mount(wakuProto)
wakuProto.started = true
# TODO Mount Waku Store and Waku Filter here
let wakuRelayProto = newWakuRelayProto(node.switch)
node.switch.mount(wakuRelayProto)
wakuRelayProto.started = true
# TODO Move out into separate proc
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta])
setupWakuRPC(wakuProto, rpcServer)
setupWakuRPC(wakuRelayProto, rpcServer)
rpcServer.start()
info "rpcServer started", ta=ta
@ -210,7 +211,7 @@ proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
# XXX: So doing this _after_ other setup
# Optionally direct connect with a set of nodes
if conf.staticnodes.len > 0: connectToNodes(wakuProto, conf.staticnodes)
if conf.staticnodes.len > 0: connectToNodes(wakuRelayProto, conf.staticnodes)
# TODO Move out into separate proc
when defined(insecure):

View File

@ -20,19 +20,19 @@ declarePublicGauge connected_peers, "number of peers in the pool" # XXX
declarePublicGauge total_messages, "number of messages received"
logScope:
topic = "WakuSub"
topic = "WakuRelay"
const WakuSubCodec* = "/wakusub/2.0.0-alpha1"
const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
type
WakuSub* = ref object of GossipSub
WakuRelay* = ref object of GossipSub
# XXX: just playing
text*: string
gossip_enabled*: bool
filters: Filters
method init(w: WakuSub) =
method init(w: WakuRelay) =
debug "init"
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -40,7 +40,7 @@ method init(w: WakuSub) =
## e.g. ``/wakusub/0.0.1``, etc...
##
debug "Incoming WakuSub connection"
debug "Incoming WakuRelay connection"
# XXX: Increment connectedPeers counter, unclear if this is the right place tho
# Where is the disconnect event?
connected_peers.inc()
@ -49,10 +49,10 @@ method init(w: WakuSub) =
# XXX: Handler hijack GossipSub here?
w.handler = handler
w.filters = initTable[string, Filter]()
w.codec = WakuSubCodec
w.codec = WakuRelayCodec
method initPubSub*(w: WakuSub) =
debug "initWakuSub"
method initPubSub*(w: WakuRelay) =
debug "initWakuRelay"
w.text = "Foobar"
debug "w.text", text = w.text
@ -66,7 +66,7 @@ method initPubSub*(w: WakuSub) =
w.init()
method subscribe*(w: WakuSub,
method subscribe*(w: WakuRelay,
topic: string,
handler: TopicHandler) {.async.} =
debug "subscribe", topic=topic
@ -80,7 +80,7 @@ method subscribe*(w: WakuSub,
# Subscribing a peer to a specified topic
method subscribeTopic*(w: WakuSub,
method subscribeTopic*(w: WakuRelay,
topic: string,
subscribe: bool,
peerId: string) {.async, gcsafe.} =
@ -107,11 +107,11 @@ method subscribeTopic*(w: WakuSub,
# TODO: Fix decrement connected peers here or somewhere else
method handleDisconnect*(w: WakuSub, peer: PubSubPeer) {.async.} =
method handleDisconnect*(w: WakuRelay, peer: PubSubPeer) {.async.} =
debug "handleDisconnect (NYI)"
#connected_peers.dec()
method rpcHandler*(w: WakuSub,
method rpcHandler*(w: WakuRelay,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
debug "rpcHandler"
@ -129,7 +129,7 @@ method rpcHandler*(w: WakuSub,
for msg in rpcs.messages:
w.filters.notify(msg)
method publish*(w: WakuSub,
method publish*(w: WakuRelay,
topic: string,
data: seq[byte]): Future[int] {.async.} =
debug "publish", topic=topic
@ -139,7 +139,7 @@ method publish*(w: WakuSub,
else:
return await procCall FloodSub(w).publish(topic, data)
method unsubscribe*(w: WakuSub,
method unsubscribe*(w: WakuRelay,
topics: seq[TopicPair]) {.async.} =
debug "unsubscribe"
if w.gossip_enabled:
@ -148,14 +148,14 @@ method unsubscribe*(w: WakuSub,
await procCall FloodSub(w).unsubscribe(topics)
# GossipSub specific methods
method start*(w: WakuSub) {.async.} =
method start*(w: WakuRelay) {.async.} =
debug "start"
if w.gossip_enabled:
await procCall GossipSub(w).start()
else:
await procCall FloodSub(w).start()
method stop*(w: WakuSub) {.async.} =
method stop*(w: WakuRelay) {.async.} =
debug "stop"
if w.gossip_enabled:
await procCall GossipSub(w).stop()