From ddcfc3fdd63e9fef3e712a6cbf1b46b6ffeb1a0a Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Mon, 31 Aug 2020 05:32:41 +0200 Subject: [PATCH] V2cleanup (#115) * Cleanup of warnings + imports, etc. * Remove connected_peers metric as there is already libp2p_pubsub_peers --- tests/v2/test_waku.nim | 6 ++-- tests/v2/test_waku_store.nim | 29 +++++++++--------- tests/v2/test_wakunode.nim | 22 ++++++-------- tests/v2/utils.nim | 1 + waku/node/v2/config.nim | 2 +- waku/node/v2/rpc/wakurpc.nim | 6 ++-- waku/node/v2/standard_setup.nim | 2 +- waku/node/v2/waku_types.nim | 2 -- waku/node/v2/wakunode2.nim | 36 +++++++++------------- waku/protocol/v2/filter.nim | 7 ++--- waku/protocol/v2/waku_relay.nim | 54 ++++++++++++--------------------- waku/protocol/v2/waku_store.nim | 33 ++++++++------------ 12 files changed, 82 insertions(+), 118 deletions(-) diff --git a/tests/v2/test_waku.nim b/tests/v2/test_waku.nim index 9f7c427e4..200c20cef 100644 --- a/tests/v2/test_waku.nim +++ b/tests/v2/test_waku.nim @@ -6,6 +6,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) +{.used.} import unittest, options, tables, sets, sequtils import chronos, chronicles @@ -91,7 +92,7 @@ procSuite "FloodSub": let msg = message() discard await nodes[0].publish("foobar", msg) - let result = await completionFut.wait(5.seconds) + check: await completionFut.wait(5.seconds) await allFuturesThrowing( nodes[0].stop(), @@ -101,6 +102,3 @@ procSuite "FloodSub": for fut in nodesFut: let res = fut.read() await allFuturesThrowing(res) - - check: - result == true diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 42ad2bf4f..37c02901d 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -1,19 +1,18 @@ -import unittest, options, tables, sets, sequtils -import chronos, chronicles -import utils, - libp2p/errors, - libp2p/switch, - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], - libp2p/crypto/crypto, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/rpc/message, - libp2p/multistream, - libp2p/transports/transport, - libp2p/transports/tcptransport -import ../../waku/protocol/v2/[waku_relay, waku_store, filter] +{.used.} -import ../test_helpers +import + std/[unittest, options, tables, sets], + chronos, chronicles, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/protocols/pubsub/rpc/message, + libp2p/multistream, + libp2p/transports/transport, + libp2p/transports/tcptransport, + ../../waku/protocol/v2/[waku_store, filter], + ../test_helpers, ./utils procSuite "Waku Store": diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 53a24afa0..fb206ba48 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1,16 +1,14 @@ -import unittest +{.used.} -import confutils, chronicles, chronos, os - -import stew/shims/net as stewNet -import libp2p/crypto/crypto -import libp2p/crypto/secp -import eth/keys -import json_rpc/[rpcclient, rpcserver] - -import ../../waku/node/v2/[config, wakunode2, waku_types] - -import ../test_helpers +import + std/[unittest, os], + confutils, chronicles, chronos, stew/shims/net as stewNet, + json_rpc/[rpcclient, rpcserver], + libp2p/crypto/crypto, + libp2p/crypto/secp, + eth/keys, + ../../waku/node/v2/[config, wakunode2, waku_types], + ../test_helpers procSuite "WakuNode": asyncTest "Message published with content filter is retrievable": diff --git a/tests/v2/utils.nim b/tests/v2/utils.nim index 3058f99e1..8a05cf972 100644 --- a/tests/v2/utils.nim +++ b/tests/v2/utils.nim @@ -12,4 +12,5 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} = for node in nodes: if dialer.peerInfo.peerId != node.peerInfo.peerId: dials.add(dialer.connect(node.peerInfo)) + # TODO: Hmm, does this make sense? await allFutures(dials) diff --git a/waku/node/v2/config.nim b/waku/node/v2/config.nim index d8d74504a..cb5f4c3fa 100644 --- a/waku/node/v2/config.nim +++ b/waku/node/v2/config.nim @@ -1,5 +1,5 @@ import - strutils, + std/strutils, confutils, confutils/defs, confutils/std/net, chronicles, chronos, libp2p/crypto/crypto, diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 2e24cca57..d37e12c9c 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -1,9 +1,9 @@ import - json_rpc/rpcserver, options, + std/options, + json_rpc/rpcserver, + nimcrypto/[sysrand, hmac, sha2], eth/[common, rlp, keys, p2p], ../../../protocol/v2/waku_relay, - nimcrypto/[sysrand, hmac, sha2], - ../wakunode2, ../waku_types # Instead of using rlpx waku_protocol here, lets do mock waku2_protocol diff --git a/waku/node/v2/standard_setup.nim b/waku/node/v2/standard_setup.nim index c009230bd..bd67f6bbc 100644 --- a/waku/node/v2/standard_setup.nim +++ b/waku/node/v2/standard_setup.nim @@ -11,7 +11,7 @@ import libp2p/transports/[transport, tcptransport], libp2p/muxers/[muxer, mplex/mplex, mplex/types], libp2p/protocols/[identify, secure/secure], - libp2p/protocols/pubsub/[pubsub, gossipsub], + libp2p/protocols/pubsub/pubsub, ../../protocol/v2/waku_relay import diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 21834e5d8..8c0a10db7 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -2,8 +2,6 @@ import chronos, libp2p/multiaddress, libp2p/crypto/crypto, - libp2p/protocols/protocol, - libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, standard_setup diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 0b785b90c..141b19a14 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -1,19 +1,17 @@ import - confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, sequtils, - chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird. - eth/[keys, p2p], eth/net/nat, - eth/p2p/[discovery, enode], + std/[strutils, options], + chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet, + # TODO: Why do we need eth keys? + eth/keys, eth/net/nat, + # eth/[keys, p2p], eth/net/nat, eth/p2p/[discovery, enode], libp2p/multiaddress, libp2p/crypto/crypto, libp2p/protocols/protocol, # NOTE For TopicHandler, solve with exports? libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, - stew/shims/net as stewNet, - rpc/wakurpc, - standard_setup, ../../protocol/v2/waku_relay, - waku_types + ./waku_types, ./config, ./standard_setup, ./rpc/wakurpc # key and crypto modules different type @@ -34,12 +32,6 @@ type const clientId = "Nimbus waku node" -proc setBootNodes(nodes: openArray[string]): seq[ENode] = - result = newSeqOfCap[ENode](nodes.len) - for nodeId in nodes: - # TODO: something more user friendly than an expect - result.add(ENode.fromString(nodeId).expect("correct node")) - # NOTE Any difference here in Waku vs Eth2? # E.g. Devp2p/Libp2p support, etc. #func asLibp2pKey*(key: keys.PublicKey): PublicKey = @@ -216,7 +208,7 @@ proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} = ## Public API ## -method init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} = +proc init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} = ## Creates and starts a Waku node. ## let node = await createWakuNode(conf) @@ -228,7 +220,7 @@ method init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} = type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message) -method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = +proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. ## @@ -239,7 +231,7 @@ method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = # XXX Consider awaiting here discard wakuRelay.subscribe(topic, handler) -method subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) = +proc subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) = echo "NYI" ## Subscribes to a ContentFilter. Triggers handler when receiving messages on ## this content filter. ContentFilter is a method that takes some content @@ -249,21 +241,21 @@ method subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFil ## Status: Not yet implemented. ## TODO Implement as wrapper around `waku_filter` and `subscribe` above. -method unsubscribe*(w: WakuNode, topic: Topic) = +proc unsubscribe*(w: WakuNode, topic: Topic) = echo "NYI" ## Unsubscribe from a topic. ## ## Status: Not yet implemented. ## TODO Implement. -method unsubscribe*(w: WakuNode, contentFilter: ContentFilter) = +proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) = echo "NYI" ## Unsubscribe from a content filter. ## ## Status: Not yet implemented. ## TODO Implement. -method publish*(w: WakuNode, topic: Topic, message: Message) = +proc publish*(w: WakuNode, topic: Topic, message: Message) = ## Publish a `Message` to a PubSub topic. ## ## Status: Partially implemented. @@ -273,7 +265,7 @@ method publish*(w: WakuNode, topic: Topic, message: Message) = # XXX Consider awaiting here discard wakuSub.publish(topic, message) -method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) = +proc publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) = ## Publish a `Message` to a PubSub topic with a specific content filter. ## Currently this means a `contentTopic`. ## @@ -288,7 +280,7 @@ method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message discard wakuSub.publish(topic, message) -method query*(w: WakuNode, query: HistoryQuery): HistoryResponse = +proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = ## Queries for historical messages. ## ## Status: Not yet implemented. diff --git a/waku/protocol/v2/filter.nim b/waku/protocol/v2/filter.nim index d39aa961f..7e36b4d56 100644 --- a/waku/protocol/v2/filter.nim +++ b/waku/protocol/v2/filter.nim @@ -1,7 +1,6 @@ -import libp2p/protocols/pubsub/rpc/messages - -import - tables +import + std/tables, + libp2p/protocols/pubsub/rpc/messages type diff --git a/waku/protocol/v2/waku_relay.nim b/waku/protocol/v2/waku_relay.nim index 987e10e46..5656fcb88 100644 --- a/waku/protocol/v2/waku_relay.nim +++ b/waku/protocol/v2/waku_relay.nim @@ -3,20 +3,17 @@ ## This file should eventually correspond to waku_protocol as RLPx subprotocol. ## Instead, it should likely be on top of GossipSub with a similar interface. -import strutils -import chronos, chronicles -import ./filter -import tables -import libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/pubsub/rpc/[messages], - libp2p/stream/connection +import + std/[strutils, tables], + chronos, chronicles, metrics, + libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/[messages], + libp2p/stream/connection, + ./filter -import metrics - -declarePublicGauge connected_peers, "number of peers in the pool" # XXX declarePublicGauge total_messages, "number of messages received" logScope: @@ -28,7 +25,7 @@ type WakuRelay* = ref object of GossipSub # XXX: just playing text*: string - gossip_enabled*: bool + gossipEnabled*: bool filters: Filters @@ -41,9 +38,6 @@ method init(w: WakuRelay) = ## debug "Incoming WakuRelay connection" - # XXX: Increment connectedPeers counter, unclear if this is the right place tho - # Where is the disconnect event? - connected_peers.inc() await w.handleConn(conn, proto) # XXX: Handler hijack GossipSub here? @@ -57,9 +51,9 @@ method initPubSub*(w: WakuRelay) = debug "w.text", text = w.text # Not using GossipSub - w.gossip_enabled = false + w.gossipEnabled = false - if w.gossip_enabled: + if w.gossipEnabled: procCall GossipSub(w).initPubSub() else: procCall FloodSub(w).initPubSub() @@ -73,7 +67,7 @@ method subscribe*(w: WakuRelay, # XXX: Pubsub really # XXX: This is what is called, I think - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).subscribe(topic, handler) else: await procCall FloodSub(w).subscribe(topic, handler) @@ -92,7 +86,7 @@ method subscribeTopic*(w: WakuRelay, debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId) else: await procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId) @@ -103,14 +97,6 @@ method subscribeTopic*(w: WakuRelay, info "about to call subscribe" await w.subscribe(topic, handler) - - - -# TODO: Fix decrement connected peers here or somewhere else -method handleDisconnect*(w: WakuRelay, peer: PubSubPeer) {.async.} = - debug "handleDisconnect (NYI)" - #connected_peers.dec() - method rpcHandler*(w: WakuRelay, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async.} = @@ -119,7 +105,7 @@ method rpcHandler*(w: WakuRelay, # XXX: Right place? total_messages.inc() - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).rpcHandler(peer, rpcMsgs) else: await procCall FloodSub(w).rpcHandler(peer, rpcMsgs) @@ -134,7 +120,7 @@ method publish*(w: WakuRelay, data: seq[byte]): Future[int] {.async.} = debug "publish", topic=topic - if w.gossip_enabled: + if w.gossipEnabled: return await procCall GossipSub(w).publish(topic, data) else: return await procCall FloodSub(w).publish(topic, data) @@ -142,7 +128,7 @@ method publish*(w: WakuRelay, method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) {.async.} = debug "unsubscribe" - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).unsubscribe(topics) else: await procCall FloodSub(w).unsubscribe(topics) @@ -150,14 +136,14 @@ method unsubscribe*(w: WakuRelay, # GossipSub specific methods method start*(w: WakuRelay) {.async.} = debug "start" - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).start() else: await procCall FloodSub(w).start() method stop*(w: WakuRelay) {.async.} = debug "stop" - if w.gossip_enabled: + if w.gossipEnabled: await procCall GossipSub(w).stop() else: await procCall FloodSub(w).stop() diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index d60f4638c..66c3bff28 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -1,18 +1,11 @@ -import chronos, chronicles -import ./filter -import tables -import libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/pubsub/rpc/[messages, protobuf], - libp2p/protocols/protocol, - libp2p/protobuf/minprotobuf, - libp2p/stream/connection - -import metrics - -import stew/results +import + std/tables, + chronos, chronicles, metrics, stew/results, + libp2p/protocols/pubsub/rpc/[messages, protobuf], + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + ./filter const WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2" @@ -33,7 +26,7 @@ type WakuStore* = ref object of LPProtocol messages*: seq[Message] -method init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = +proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) @@ -77,7 +70,7 @@ proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] = ok(rpc) -method encode*(query: HistoryQuery): ProtoBuffer = +proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() result.write(1, query.uuid) @@ -85,7 +78,7 @@ method encode*(query: HistoryQuery): ProtoBuffer = for topic in query.topics: result.write(2, topic) -method encode*(response: HistoryResponse): ProtoBuffer = +proc encode*(response: HistoryResponse): ProtoBuffer = result = initProtoBuffer() result.write(1, response.uuid) @@ -93,7 +86,7 @@ method encode*(response: HistoryResponse): ProtoBuffer = for msg in response.messages: result.write(2, msg.encodeMessage()) -method encode*(response: StoreRPC): ProtoBuffer = +proc encode*(response: StoreRPC): ProtoBuffer = result = initProtoBuffer() for query in response.query: @@ -110,7 +103,7 @@ proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = result.messages.insert(msg) break -method init*(T: type WakuStore): T = +proc init*(T: type WakuStore): T = var ws = WakuStore() proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =