deploy: c570cea29f5bd777a8acc92419197ef348d87d05

This commit is contained in:
jm-clius 2021-06-09 14:59:52 +00:00
parent 2bedad68af
commit 3103788ff0
21 changed files with 73 additions and 55 deletions

View File

@ -1,5 +1,3 @@
{.push raises: [Defect, Exception].}
import
std/[tables, times, strutils, hashes, sequtils],
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
@ -61,7 +59,7 @@ proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
return false
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage =
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} =
# Translates a Matterbridge API JSON response to a Waku v2 message
let msgFields = jsonNode.getFields()
@ -89,7 +87,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
await cmb.nodev2.publish(DefaultTopic, msg)
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} =
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
@ -111,7 +109,7 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} =
try:
cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
username = chat2Msg[].nick)
except OSError, IOError:
except OSError, IOError, TimeoutError:
chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message."
@ -172,7 +170,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
debug "Start polling Matterbridge"
# Start Matterbridge polling (@TODO: use streaming interface)
proc mbHandler(jsonNode: JsonNode) {.gcsafe.} =
proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} =
trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode
waitFor cmb.toChat2(jsonNode)
@ -188,7 +186,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
let msg = WakuMessage.init(data)
if msg.isOk():
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
@ -211,7 +209,7 @@ when isMainModule:
relay_api,
store_api]
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) =
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} =
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:

View File

@ -5,7 +5,7 @@ import
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
libp2p/[standard_setup, switch, multiaddress],
libp2p/[builders, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,

View File

@ -5,7 +5,7 @@ import
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
libp2p/[standard_setup, switch, multiaddress],
libp2p/[builders, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,

View File

@ -401,19 +401,19 @@ procSuite "WakuNode":
$(peerInfo.addrs[0][1].tryGet()) == "/tcp/60002"
# Now test some common corner cases
expect ValueError:
expect LPError:
# gibberish
discard parsePeerInfo("/p2p/$UCH GIBBER!SH")
expect ValueError:
expect LPError:
# leading whitespace
discard parsePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
expect ValueError:
expect LPError:
# trailing whitespace
discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ")
expect ValueError:
expect LPError:
# invalid IP address
discard parsePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")

View File

@ -5,12 +5,12 @@ const
import random
import chronos
import libp2p/[standard_setup,
import libp2p/[builders,
protocols/pubsub/pubsub,
protocols/secure/secure]
import ../../waku/v2/protocol/waku_relay
export standard_setup
export builders
randomize()

View File

@ -171,7 +171,7 @@ proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T =
of bloomFilterKey:
let bloom = rlp.read(seq[byte])
if bloom.len != bloomSize:
raise newException(UselessPeerError, "Bloomfilter size mismatch")
raise newException(RlpTypeMismatch, "Bloomfilter size mismatch")
var bloomFilter: Bloom
bloomFilter.bytesCopy(bloom)
result.bloomFilter = some(bloomFilter)
@ -210,8 +210,9 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
return true
proc run(peer: Peer) {.gcsafe, async.}
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
proc run(peer: Peer) {.gcsafe, async, raises: [Defect].}
proc run(node: EthereumNode, network: WakuNetwork)
{.gcsafe, async, raises: [Defect].}
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
new(network.queue)
@ -389,7 +390,7 @@ p2pProtocol Waku(version = wakuVersion,
# 'Runner' calls ---------------------------------------------------------------
proc processQueue(peer: Peer) =
proc processQueue(peer: Peer) {.raises: [Defect].} =
# Send to peer all valid and previously not send envelopes in the queue.
var
envelopes: seq[Envelope] = @[]
@ -426,7 +427,7 @@ proc processQueue(peer: Peer) =
# gets dropped
traceAsyncErrors peer.messages(envelopes)
proc run(peer: Peer) {.async.} =
proc run(peer: Peer) {.async, raises: [Defect].} =
while peer.connectionState notin {Disconnecting, Disconnected}:
peer.processQueue()
await sleepAsync(messageInterval)
@ -444,7 +445,7 @@ proc pruneReceived(node: EthereumNode) {.raises: [].} =
# the received sets.
peer.received = intersection(peer.received, wakuNet.queue.itemHashes)
proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
proc run(node: EthereumNode, network: WakuNetwork) {.async, raises: [Defect].} =
while true:
# prune message queue every second
# TTL unit is in seconds, so this should be sufficient?

View File

@ -1,7 +1,8 @@
{.push raises: [Exception, Defect].}
{.push raises: [Defect, CatchableError].}
import
std/[options, sequtils, sets],
chronicles,
json_rpc/rpcserver,
libp2p/[peerinfo, switch],
../../protocol/waku_store/[waku_store_types, waku_store],

View File

@ -1,4 +1,5 @@
import
chronicles,
json_rpc/rpcserver,
../wakunode2

View File

@ -1,4 +1,4 @@
{.push raises: [Exception, Defect].}
{.push raises: [Defect, CatchableError].}
import
std/[tables,sequtils],

View File

@ -25,8 +25,8 @@ type
connected*: bool
WakuKeyPair* = object
seckey*: PrivateKey
pubkey*: PublicKey
seckey*: keys.PrivateKey
pubkey*: keys.PublicKey
TopicCache* = TableRef[string, seq[WakuMessage]]

View File

@ -1,4 +1,4 @@
{.push raises: [Exception, Defect].}
{.push raises: [Defect, CatchableError].}
import
std/[tables,sequtils],

View File

@ -1,4 +1,4 @@
{.push raises: [Exception, Defect].}
{.push raises: [Defect, CatchableError].}
import
std/[tables,sequtils],
@ -19,7 +19,7 @@ const maxCache* = 30 # Max number of messages cached per topic @TODO make this c
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
proc topicHandler(topic: string, data: seq[byte]) {.async.} =
proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} =
trace "Topic handler triggered", topic=topic
let msg = WakuMessage.init(data)
if msg.isOk():

View File

@ -1,7 +1,8 @@
{.push raises: [Exception, Defect].}
{.push raises: [Defect, CatchableError].}
import
std/options,
chronicles,
json_rpc/rpcserver,
../../protocol/waku_store/waku_store_types,
../wakunode2,

View File

@ -1,4 +1,4 @@
{.push raises: [Defect, Exception].}
{.push raises: [Defect].}
import
std/[options, sets, sequtils, times],
@ -34,7 +34,9 @@ proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo =
proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64 = 0) =
storedInfo: StoredInfo,
connectedness: Connectedness,
disconnectTime: int64 = 0) {.raises: [Defect, Exception]} =
# Insert peer entry into persistent storage, or replace existing entry with updated info
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
if res.isErr:
@ -75,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
return none(Connection)
proc loadFromStorage(pm: PeerManager) =
proc loadFromStorage(pm: PeerManager) {.raises: [Defect, Exception]} =
# Load peers from storage, if available
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
if peerId == pm.switch.peerInfo.peerId:
@ -110,7 +112,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager {.raises: [Defect, Exception]} =
let pm = PeerManager(switch: switch,
peerStore: WakuPeerStore.new(),
storage: storage)
@ -160,7 +162,7 @@ proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
pm.peers.anyIt(it.protos.contains(proto))
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) {.raises: [Defect, Exception]} =
# Adds peer to manager for the specified protocol
if peerInfo.peerId == pm.switch.peerInfo.peerId:

View File

@ -1,10 +1,10 @@
{.push raises: [Defect].}
import
libp2p/standard_setup,
libp2p/builders,
libp2p/peerstore
export peerstore, standard_setup
export peerstore, builders
type
Connectedness* = enum

View File

@ -16,5 +16,5 @@ type
# MessageStore interface
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base, raises: [Defect, Exception].} = discard

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import
sqlite3_abi,
chronos, metrics,
@ -73,8 +75,8 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
ok()
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
## Retreives all messages from the storage.
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] {.raises: [Defect, Exception].} =
## Retrieves all messages from the storage.
##
## **Example:**
##
@ -86,7 +88,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
## if res.isErr:
## echo "error"
var gotMessages = false
proc msg(s: ptr sqlite3_stmt) =
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
gotMessages = true
let
timestamp = sqlite3_column_int64(s, 0)

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import
os,
sqlite3_abi,
@ -8,8 +10,6 @@ import
libp2p/stream/connection,
stew/results, metrics
{.push raises: [Defect].}
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
#

View File

@ -1,6 +1,8 @@
import
std/[options, tables, strutils, sequtils],
chronos, chronicles, metrics, stew/shims/net as stewNet,
chronos, chronicles, metrics,
metrics/chronos_httpserver,
stew/shims/net as stewNet,
# TODO: Why do we need eth keys?
eth/keys,
web3,
@ -11,7 +13,7 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/standard_setup,
libp2p/builders,
../protocol/[waku_relay, waku_message, message_notifier],
../protocol/waku_store/waku_store,
../protocol/waku_swap/waku_swap,
@ -656,7 +658,7 @@ when isMainModule:
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) =
info "Starting metrics HTTP server", serverIp, serverPort
metrics.startHttpServer($serverIp, serverPort)
startMetricsHttpServer($serverIp, serverPort)
info "Metrics HTTP server started", serverIp, serverPort

View File

@ -2,9 +2,10 @@
##
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
## for spec.
{.push raises: [Defect].}
import
std/[tables, sequtils, sets],
std/[tables, sets],
chronos, chronicles, metrics,
libp2p/protocols/pubsub/[pubsub, gossipsub],
libp2p/protocols/pubsub/rpc/messages,
@ -28,13 +29,20 @@ method init*(w: WakuRelay) =
##
debug "Incoming WakuRelay connection"
await w.handleConn(conn, proto)
try:
await w.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in relay handler", conn
except CatchableError as exc:
trace "WakuRelay handler leaks an error", exc = exc.msg, conn
# XXX: Handler hijack GossipSub here?
w.handler = handler
w.codec = WakuRelayCodec
method initPubSub*(w: WakuRelay) =
method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} =
debug "initWakuRelay"
# after discussions with @sinkingsugar, this is essentially what is needed for

View File

@ -2,6 +2,8 @@
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
{.push raises: [Defect].}
import
std/[tables, times, sequtils, algorithm, options],
bearssl,
@ -338,8 +340,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
(result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo)
method init*(ws: WakuStore) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(64*1024)
var res = HistoryRPC.init(message)
if res.isErr:
@ -369,7 +371,7 @@ method init*(ws: WakuStore) =
await conn.writeLp(HistoryRPC(requestId: value.requestId,
response: response).encode().buffer)
ws.handler = handle
ws.handler = handler
ws.codec = WakuStoreCodec
if ws.store.isNil:
@ -388,7 +390,7 @@ method init*(ws: WakuStore) =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T {.raises: [Defect, Exception]} =
debug "init"
new result
result.rng = rng
@ -398,7 +400,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
@ -526,7 +528,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
proc handler(response: HistoryResponse) {.gcsafe.} =
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
for msg in response.messages:
let index = msg.computeIndex()
ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic))