diff --git a/examples/v2/matterbridge/chat2bridge.nim b/examples/v2/matterbridge/chat2bridge.nim index f44e0ae07..1c67c27c3 100644 --- a/examples/v2/matterbridge/chat2bridge.nim +++ b/examples/v2/matterbridge/chat2bridge.nim @@ -1,5 +1,3 @@ -{.push raises: [Defect, Exception].} - import std/[tables, times, strutils, hashes, sequtils], chronos, confutils, chronicles, chronicles/topics_registry, metrics, @@ -61,7 +59,7 @@ proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool = return false -proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage = +proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} = # Translates a Matterbridge API JSON response to a Waku v2 message let msgFields = jsonNode.getFields() @@ -89,7 +87,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = await cmb.nodev2.publish(DefaultTopic, msg) -proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} = +proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} = if cmb.seen.containsOrAdd(msg.payload.hash()): # This is a duplicate message. Return. chat2_mb_dropped.inc(labelValues = ["duplicate"]) @@ -111,7 +109,7 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} = try: cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick) - except OSError, IOError: + except OSError, IOError, TimeoutError: chat2_mb_dropped.inc(labelValues = ["duplicate"]) error "Matterbridge host unreachable. Dropping message." @@ -172,7 +170,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = debug "Start polling Matterbridge" # Start Matterbridge polling (@TODO: use streaming interface) - proc mbHandler(jsonNode: JsonNode) {.gcsafe.} = + proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} = trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode waitFor cmb.toChat2(jsonNode) @@ -188,7 +186,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = # Bridging # Handle messages on Waku v2 and bridge to Matterbridge - proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = let msg = WakuMessage.init(data) if msg.isOk(): trace "Bridging message from Chat2 to Matterbridge", msg=msg[] @@ -211,7 +209,7 @@ when isMainModule: relay_api, store_api] - proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) = + proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 77bd1666f..54cb7316f 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -5,7 +5,7 @@ import testutils/unittests, stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], eth/[keys, rlp], eth/common/eth_types, - libp2p/[standard_setup, switch, multiaddress], + libp2p/[builders, switch, multiaddress], libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index d315dff09..5aab1c316 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -5,7 +5,7 @@ import testutils/unittests, stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], eth/[keys, rlp], eth/common/eth_types, - libp2p/[standard_setup, switch, multiaddress], + libp2p/[builders, switch, multiaddress], libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 92974eff1..28b87dfe1 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -401,19 +401,19 @@ procSuite "WakuNode": $(peerInfo.addrs[0][1].tryGet()) == "/tcp/60002" # Now test some common corner cases - expect ValueError: + expect LPError: # gibberish discard parsePeerInfo("/p2p/$UCH GIBBER!SH") - expect ValueError: + expect LPError: # leading whitespace discard parsePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") - expect ValueError: + expect LPError: # trailing whitespace discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ") - expect ValueError: + expect LPError: # invalid IP address discard parsePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") diff --git a/tests/v2/utils.nim b/tests/v2/utils.nim index 9d220f536..d4fb9a8da 100644 --- a/tests/v2/utils.nim +++ b/tests/v2/utils.nim @@ -5,12 +5,12 @@ const import random import chronos -import libp2p/[standard_setup, +import libp2p/[builders, protocols/pubsub/pubsub, protocols/secure/secure] import ../../waku/v2/protocol/waku_relay -export standard_setup +export builders randomize() diff --git a/waku/v1/protocol/waku_protocol.nim b/waku/v1/protocol/waku_protocol.nim index 20e14ddfc..7fd07b777 100644 --- a/waku/v1/protocol/waku_protocol.nim +++ b/waku/v1/protocol/waku_protocol.nim @@ -171,7 +171,7 @@ proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T = of bloomFilterKey: let bloom = rlp.read(seq[byte]) if bloom.len != bloomSize: - raise newException(UselessPeerError, "Bloomfilter size mismatch") + raise newException(RlpTypeMismatch, "Bloomfilter size mismatch") var bloomFilter: Bloom bloomFilter.bytesCopy(bloom) result.bloomFilter = some(bloomFilter) @@ -210,8 +210,9 @@ proc allowed*(msg: Message, config: WakuConfig): bool = return true -proc run(peer: Peer) {.gcsafe, async.} -proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.} +proc run(peer: Peer) {.gcsafe, async, raises: [Defect].} +proc run(node: EthereumNode, network: WakuNetwork) + {.gcsafe, async, raises: [Defect].} proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = new(network.queue) @@ -389,7 +390,7 @@ p2pProtocol Waku(version = wakuVersion, # 'Runner' calls --------------------------------------------------------------- -proc processQueue(peer: Peer) = +proc processQueue(peer: Peer) {.raises: [Defect].} = # Send to peer all valid and previously not send envelopes in the queue. var envelopes: seq[Envelope] = @[] @@ -426,7 +427,7 @@ proc processQueue(peer: Peer) = # gets dropped traceAsyncErrors peer.messages(envelopes) -proc run(peer: Peer) {.async.} = +proc run(peer: Peer) {.async, raises: [Defect].} = while peer.connectionState notin {Disconnecting, Disconnected}: peer.processQueue() await sleepAsync(messageInterval) @@ -444,7 +445,7 @@ proc pruneReceived(node: EthereumNode) {.raises: [].} = # the received sets. peer.received = intersection(peer.received, wakuNet.queue.itemHashes) -proc run(node: EthereumNode, network: WakuNetwork) {.async.} = +proc run(node: EthereumNode, network: WakuNetwork) {.async, raises: [Defect].} = while true: # prune message queue every second # TTL unit is in seconds, so this should be sufficient? diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 5ae06e758..6bdb00d9e 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -1,7 +1,8 @@ -{.push raises: [Exception, Defect].} +{.push raises: [Defect, CatchableError].} import std/[options, sequtils, sets], + chronicles, json_rpc/rpcserver, libp2p/[peerinfo, switch], ../../protocol/waku_store/[waku_store_types, waku_store], diff --git a/waku/v2/node/jsonrpc/debug_api.nim b/waku/v2/node/jsonrpc/debug_api.nim index 3eeb9d215..9dcbf388c 100644 --- a/waku/v2/node/jsonrpc/debug_api.nim +++ b/waku/v2/node/jsonrpc/debug_api.nim @@ -1,4 +1,5 @@ import + chronicles, json_rpc/rpcserver, ../wakunode2 diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 98067a1de..15e46d31c 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -1,4 +1,4 @@ -{.push raises: [Exception, Defect].} +{.push raises: [Defect, CatchableError].} import std/[tables,sequtils], diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index 80aadbb43..add4198eb 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -25,8 +25,8 @@ type connected*: bool WakuKeyPair* = object - seckey*: PrivateKey - pubkey*: PublicKey + seckey*: keys.PrivateKey + pubkey*: keys.PublicKey TopicCache* = TableRef[string, seq[WakuMessage]] diff --git a/waku/v2/node/jsonrpc/private_api.nim b/waku/v2/node/jsonrpc/private_api.nim index 7a4e15e7a..0568780ed 100644 --- a/waku/v2/node/jsonrpc/private_api.nim +++ b/waku/v2/node/jsonrpc/private_api.nim @@ -1,4 +1,4 @@ -{.push raises: [Exception, Defect].} +{.push raises: [Defect, CatchableError].} import std/[tables,sequtils], diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index e6a404c12..265e2da33 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -1,4 +1,4 @@ -{.push raises: [Exception, Defect].} +{.push raises: [Defect, CatchableError].} import std/[tables,sequtils], @@ -19,7 +19,7 @@ const maxCache* = 30 # Max number of messages cached per topic @TODO make this c proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) = - proc topicHandler(topic: string, data: seq[byte]) {.async.} = + proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} = trace "Topic handler triggered", topic=topic let msg = WakuMessage.init(data) if msg.isOk(): diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 8537a55f4..536bb8832 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -1,7 +1,8 @@ -{.push raises: [Exception, Defect].} +{.push raises: [Defect, CatchableError].} import std/options, + chronicles, json_rpc/rpcserver, ../../protocol/waku_store/waku_store_types, ../wakunode2, diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index c8d0df636..e831cd0c8 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -1,4 +1,4 @@ -{.push raises: [Defect, Exception].} +{.push raises: [Defect].} import std/[options, sets, sequtils, times], @@ -34,7 +34,9 @@ proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo = proc insertOrReplace(ps: PeerStorage, peerId: PeerID, - storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64 = 0) = + storedInfo: StoredInfo, + connectedness: Connectedness, + disconnectTime: int64 = 0) {.raises: [Defect, Exception]} = # Insert peer entry into persistent storage, or replace existing entry with updated info let res = ps.put(peerId, storedInfo, connectedness, disconnectTime) if res.isErr: @@ -75,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, return none(Connection) -proc loadFromStorage(pm: PeerManager) = +proc loadFromStorage(pm: PeerManager) {.raises: [Defect, Exception]} = # Load peers from storage, if available proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) = if peerId == pm.switch.peerInfo.peerId: @@ -110,7 +112,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) return -proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = +proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager {.raises: [Defect, Exception]} = let pm = PeerManager(switch: switch, peerStore: WakuPeerStore.new(), storage: storage) @@ -160,7 +162,7 @@ proc hasPeers*(pm: PeerManager, proto: string): bool = # Returns `true` if manager has any peers for the specified protocol pm.peers.anyIt(it.protos.contains(proto)) -proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = +proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) {.raises: [Defect, Exception]} = # Adds peer to manager for the specified protocol if peerInfo.peerId == pm.switch.peerInfo.peerId: diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index f7822d3d2..017112ff5 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -1,10 +1,10 @@ {.push raises: [Defect].} import - libp2p/standard_setup, + libp2p/builders, libp2p/peerstore -export peerstore, standard_setup +export peerstore, builders type Connectedness* = enum diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 84fe20eb3..33c8c351c 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -16,5 +16,5 @@ type # MessageStore interface method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard -method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard +method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base, raises: [Defect, Exception].} = discard diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 78cf0c8fb..505a84b38 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -1,3 +1,5 @@ +{.push raises: [Defect].} + import sqlite3_abi, chronos, metrics, @@ -73,8 +75,8 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop ok() -method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = - ## Retreives all messages from the storage. +method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] {.raises: [Defect, Exception].} = + ## Retrieves all messages from the storage. ## ## **Example:** ## @@ -86,7 +88,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto ## if res.isErr: ## echo "error" var gotMessages = false - proc msg(s: ptr sqlite3_stmt) = + proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} = gotMessages = true let timestamp = sqlite3_column_int64(s, 0) diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index 930e3957c..32d46cdad 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -1,3 +1,5 @@ +{.push raises: [Defect].} + import os, sqlite3_abi, @@ -8,8 +10,6 @@ import libp2p/stream/connection, stew/results, metrics -{.push raises: [Defect].} - # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 4bd4f798c..84e78e418 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -1,6 +1,8 @@ import std/[options, tables, strutils, sequtils], - chronos, chronicles, metrics, stew/shims/net as stewNet, + chronos, chronicles, metrics, + metrics/chronos_httpserver, + stew/shims/net as stewNet, # TODO: Why do we need eth keys? eth/keys, web3, @@ -11,7 +13,7 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, - libp2p/standard_setup, + libp2p/builders, ../protocol/[waku_relay, waku_message, message_notifier], ../protocol/waku_store/waku_store, ../protocol/waku_swap/waku_swap, @@ -656,7 +658,7 @@ when isMainModule: proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) = info "Starting metrics HTTP server", serverIp, serverPort - metrics.startHttpServer($serverIp, serverPort) + startMetricsHttpServer($serverIp, serverPort) info "Metrics HTTP server started", serverIp, serverPort diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 283a30df6..0057150f0 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -2,9 +2,10 @@ ## ## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md ## for spec. +{.push raises: [Defect].} import - std/[tables, sequtils, sets], + std/[tables, sets], chronos, chronicles, metrics, libp2p/protocols/pubsub/[pubsub, gossipsub], libp2p/protocols/pubsub/rpc/messages, @@ -28,13 +29,20 @@ method init*(w: WakuRelay) = ## debug "Incoming WakuRelay connection" - await w.handleConn(conn, proto) + try: + await w.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in relay handler", conn + except CatchableError as exc: + trace "WakuRelay handler leaks an error", exc = exc.msg, conn # XXX: Handler hijack GossipSub here? w.handler = handler w.codec = WakuRelayCodec -method initPubSub*(w: WakuRelay) = +method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} = debug "initWakuRelay" # after discussions with @sinkingsugar, this is essentially what is needed for diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 6083af6b3..0415f1ce9 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -2,6 +2,8 @@ ## See spec for more details: ## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md +{.push raises: [Defect].} + import std/[tables, times, sequtils, algorithm, options], bearssl, @@ -338,8 +340,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = (result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo) -method init*(ws: WakuStore) = - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = +proc init*(ws: WakuStore) {.raises: [Defect, Exception]} = + proc handler(conn: Connection, proto: string) {.async.} = var message = await conn.readLp(64*1024) var res = HistoryRPC.init(message) if res.isErr: @@ -369,7 +371,7 @@ method init*(ws: WakuStore) = await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer) - ws.handler = handle + ws.handler = handler ws.codec = WakuStoreCodec if ws.store.isNil: @@ -388,7 +390,7 @@ method init*(ws: WakuStore) = proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, - store: MessageStore = nil, wakuSwap: WakuSwap = nil): T = + store: MessageStore = nil, wakuSwap: WakuSwap = nil): T {.raises: [Defect, Exception]} = debug "init" new result result.rng = rng @@ -398,7 +400,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte result.init() # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY -proc setPeer*(ws: WakuStore, peer: PeerInfo) = +proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() @@ -526,7 +528,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] lastSeenTime = max(lastSeenTime - offset, 0) debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime - proc handler(response: HistoryResponse) {.gcsafe.} = + proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = for msg in response.messages: let index = msg.computeIndex() ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic))