mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 23:13:09 +00:00
Update submodules (#594)
This commit is contained in:
parent
2fe53c334c
commit
9e0b73bd18
@ -1,5 +1,3 @@
|
||||
{.push raises: [Defect, Exception].}
|
||||
|
||||
import
|
||||
std/[tables, times, strutils, hashes, sequtils],
|
||||
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
|
||||
@ -61,7 +59,7 @@ proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
|
||||
|
||||
return false
|
||||
|
||||
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage =
|
||||
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} =
|
||||
# Translates a Matterbridge API JSON response to a Waku v2 message
|
||||
let msgFields = jsonNode.getFields()
|
||||
|
||||
@ -89,7 +87,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
|
||||
|
||||
await cmb.nodev2.publish(DefaultTopic, msg)
|
||||
|
||||
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} =
|
||||
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
|
||||
if cmb.seen.containsOrAdd(msg.payload.hash()):
|
||||
# This is a duplicate message. Return.
|
||||
chat2_mb_dropped.inc(labelValues = ["duplicate"])
|
||||
@ -111,7 +109,7 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe.} =
|
||||
try:
|
||||
cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
|
||||
username = chat2Msg[].nick)
|
||||
except OSError, IOError:
|
||||
except OSError, IOError, TimeoutError:
|
||||
chat2_mb_dropped.inc(labelValues = ["duplicate"])
|
||||
error "Matterbridge host unreachable. Dropping message."
|
||||
|
||||
@ -172,7 +170,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
|
||||
debug "Start polling Matterbridge"
|
||||
|
||||
# Start Matterbridge polling (@TODO: use streaming interface)
|
||||
proc mbHandler(jsonNode: JsonNode) {.gcsafe.} =
|
||||
proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} =
|
||||
trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode
|
||||
waitFor cmb.toChat2(jsonNode)
|
||||
|
||||
@ -188,7 +186,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
|
||||
|
||||
# Bridging
|
||||
# Handle messages on Waku v2 and bridge to Matterbridge
|
||||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
|
||||
@ -211,7 +209,7 @@ when isMainModule:
|
||||
relay_api,
|
||||
store_api]
|
||||
|
||||
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) =
|
||||
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} =
|
||||
installDebugApiHandlers(node, rpcServer)
|
||||
|
||||
# Install enabled API handlers:
|
||||
|
||||
@ -5,7 +5,7 @@ import
|
||||
testutils/unittests, stew/shims/net as stewNet,
|
||||
json_rpc/[rpcserver, rpcclient],
|
||||
eth/[keys, rlp], eth/common/eth_types,
|
||||
libp2p/[standard_setup, switch, multiaddress],
|
||||
libp2p/[builders, switch, multiaddress],
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
|
||||
@ -5,7 +5,7 @@ import
|
||||
testutils/unittests, stew/shims/net as stewNet,
|
||||
json_rpc/[rpcserver, rpcclient],
|
||||
eth/[keys, rlp], eth/common/eth_types,
|
||||
libp2p/[standard_setup, switch, multiaddress],
|
||||
libp2p/[builders, switch, multiaddress],
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
|
||||
@ -401,19 +401,19 @@ procSuite "WakuNode":
|
||||
$(peerInfo.addrs[0][1].tryGet()) == "/tcp/60002"
|
||||
|
||||
# Now test some common corner cases
|
||||
expect ValueError:
|
||||
expect LPError:
|
||||
# gibberish
|
||||
discard parsePeerInfo("/p2p/$UCH GIBBER!SH")
|
||||
|
||||
expect ValueError:
|
||||
expect LPError:
|
||||
# leading whitespace
|
||||
discard parsePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
|
||||
|
||||
expect ValueError:
|
||||
expect LPError:
|
||||
# trailing whitespace
|
||||
discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ")
|
||||
|
||||
expect ValueError:
|
||||
expect LPError:
|
||||
# invalid IP address
|
||||
discard parsePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
|
||||
|
||||
|
||||
@ -5,12 +5,12 @@ const
|
||||
|
||||
import random
|
||||
import chronos
|
||||
import libp2p/[standard_setup,
|
||||
import libp2p/[builders,
|
||||
protocols/pubsub/pubsub,
|
||||
protocols/secure/secure]
|
||||
import ../../waku/v2/protocol/waku_relay
|
||||
|
||||
export standard_setup
|
||||
export builders
|
||||
|
||||
randomize()
|
||||
|
||||
|
||||
2
vendor/news
vendored
2
vendor/news
vendored
@ -1 +1 @@
|
||||
Subproject commit e1d63564a2a411f264e75694b8f7c66e50c3a4cb
|
||||
Subproject commit b888b22585c6bda0cd7d3f4ee2d75f743b9dd5b1
|
||||
2
vendor/nim-bearssl
vendored
2
vendor/nim-bearssl
vendored
@ -1 +1 @@
|
||||
Subproject commit 0a7401ad466d70bab31c5d6dc82d1d584e4ebd1f
|
||||
Subproject commit dc62f4fccd2d40c884009ae8f2b14bb6a86a55cf
|
||||
2
vendor/nim-chronicles
vendored
2
vendor/nim-chronicles
vendored
@ -1 +1 @@
|
||||
Subproject commit 8b1419b4a37a3a8995a9a0a992b4705427056d98
|
||||
Subproject commit 63ce43a86a40a4c73d1b3b8317278d47ec55a458
|
||||
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
||||
Subproject commit c206d2bc191712e4e5f89ecd87df7ef014bbb484
|
||||
Subproject commit 252e5d0d502c51c7bb20eeebb7a3129220b12ff8
|
||||
2
vendor/nim-confutils
vendored
2
vendor/nim-confutils
vendored
@ -1 +1 @@
|
||||
Subproject commit f091a70a5bf95ec772c8b4d9978e81b8ae89af0c
|
||||
Subproject commit ab4ba1cbfdccdb8c0398894ffc25169bc61faeed
|
||||
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit 8c27f291f535f261274be9f69de1216981ac93e4
|
||||
Subproject commit 0ad571ab27c46a32256c8568a32ef1d6ac34b733
|
||||
2
vendor/nim-faststreams
vendored
2
vendor/nim-faststreams
vendored
@ -1 +1 @@
|
||||
Subproject commit 99e89eb1e5c973ea5162342384408434b7f5715a
|
||||
Subproject commit 5eb7fd0c90d3f03b6778688a5893fdd2715e9fe2
|
||||
2
vendor/nim-http-utils
vendored
2
vendor/nim-http-utils
vendored
@ -1 +1 @@
|
||||
Subproject commit 613ad40f00ab3d0ee839f9db9c4d25e5e0248dee
|
||||
Subproject commit 8b492c74b56c62bcee991a6899d413938a3accc5
|
||||
2
vendor/nim-json-rpc
vendored
2
vendor/nim-json-rpc
vendored
@ -1 +1 @@
|
||||
Subproject commit 4eb39203ebd391c77d16a1c387dc8a6b7d90bc69
|
||||
Subproject commit 22c342bcc11515c69d59b53819bd38ab813d9b93
|
||||
2
vendor/nim-json-serialization
vendored
2
vendor/nim-json-serialization
vendored
@ -1 +1 @@
|
||||
Subproject commit fe8a82ca76150b60a950d5aa4e5baa382441ada4
|
||||
Subproject commit 652099a95960be7790e2f4b4c925d0dd703cc9aa
|
||||
2
vendor/nim-libbacktrace
vendored
2
vendor/nim-libbacktrace
vendored
@ -1 +1 @@
|
||||
Subproject commit 829a65ca3d99c18230598de5d9fc7659f321586c
|
||||
Subproject commit b70db54e073988f334904cddbfc840c9698ba74e
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit df497660bcf7aa23005f22aa7daced15b5668e3a
|
||||
Subproject commit 3da656687be63ccbf5d659af55d159130d325038
|
||||
2
vendor/nim-metrics
vendored
2
vendor/nim-metrics
vendored
@ -1 +1 @@
|
||||
Subproject commit 16ec7aeccc2020666d6b7c63288153f3917222fb
|
||||
Subproject commit 7349a421cff02c27df9f7622801459d6b5d65dcd
|
||||
2
vendor/nim-secp256k1
vendored
2
vendor/nim-secp256k1
vendored
@ -1 +1 @@
|
||||
Subproject commit 67ebdfa8014e1c6011f14d7fc5eb341684eb3b32
|
||||
Subproject commit d790c42206fab4b8008eaa91181ca8c8c68a0105
|
||||
2
vendor/nim-serialization
vendored
2
vendor/nim-serialization
vendored
@ -1 +1 @@
|
||||
Subproject commit f9a1121b8733eb75e624ab59f8d79e707f15f76f
|
||||
Subproject commit 5213d397f9d85c69279961256e19a859cd32df30
|
||||
2
vendor/nim-stew
vendored
2
vendor/nim-stew
vendored
@ -1 +1 @@
|
||||
Subproject commit ee78822e057ac5f39804ecb6ac1096734be13ef8
|
||||
Subproject commit 70680e2af2f3b0ead9ea49969731910e0898fbbe
|
||||
2
vendor/nim-stint
vendored
2
vendor/nim-stint
vendored
@ -1 +1 @@
|
||||
Subproject commit bae3fc6ee4f36a33849c9dafe5a52b9b6cdf672a
|
||||
Subproject commit ca897811df3e9aa6106916c30203a442bf3fd8d4
|
||||
2
vendor/nim-unittest2
vendored
2
vendor/nim-unittest2
vendored
@ -1 +1 @@
|
||||
Subproject commit e788deab3d59ff8a4fe103aeb5d82d3d82fcac7d
|
||||
Subproject commit 91d4eaa4ccb4bfddf179fe2ee4247ae000e2587f
|
||||
2
vendor/nim-web3
vendored
2
vendor/nim-web3
vendored
@ -1 +1 @@
|
||||
Subproject commit 21b465fcd58460e6018dcb1048254f2514696778
|
||||
Subproject commit b985323d6418a738aa2b9e0b819efe169f00b078
|
||||
2
vendor/nimbus-build-system
vendored
2
vendor/nimbus-build-system
vendored
@ -1 +1 @@
|
||||
Subproject commit 2c1e94d595b80a2d247875fe813242bbe6a0ade7
|
||||
Subproject commit 77747657f65a5fe26c281445b6ee9a1d6e72b1eb
|
||||
@ -171,7 +171,7 @@ proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T =
|
||||
of bloomFilterKey:
|
||||
let bloom = rlp.read(seq[byte])
|
||||
if bloom.len != bloomSize:
|
||||
raise newException(UselessPeerError, "Bloomfilter size mismatch")
|
||||
raise newException(RlpTypeMismatch, "Bloomfilter size mismatch")
|
||||
var bloomFilter: Bloom
|
||||
bloomFilter.bytesCopy(bloom)
|
||||
result.bloomFilter = some(bloomFilter)
|
||||
@ -210,8 +210,9 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
|
||||
|
||||
return true
|
||||
|
||||
proc run(peer: Peer) {.gcsafe, async.}
|
||||
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
|
||||
proc run(peer: Peer) {.gcsafe, async, raises: [Defect].}
|
||||
proc run(node: EthereumNode, network: WakuNetwork)
|
||||
{.gcsafe, async, raises: [Defect].}
|
||||
|
||||
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
||||
new(network.queue)
|
||||
@ -389,7 +390,7 @@ p2pProtocol Waku(version = wakuVersion,
|
||||
|
||||
# 'Runner' calls ---------------------------------------------------------------
|
||||
|
||||
proc processQueue(peer: Peer) =
|
||||
proc processQueue(peer: Peer) {.raises: [Defect].} =
|
||||
# Send to peer all valid and previously not send envelopes in the queue.
|
||||
var
|
||||
envelopes: seq[Envelope] = @[]
|
||||
@ -426,7 +427,7 @@ proc processQueue(peer: Peer) =
|
||||
# gets dropped
|
||||
traceAsyncErrors peer.messages(envelopes)
|
||||
|
||||
proc run(peer: Peer) {.async.} =
|
||||
proc run(peer: Peer) {.async, raises: [Defect].} =
|
||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.processQueue()
|
||||
await sleepAsync(messageInterval)
|
||||
@ -444,7 +445,7 @@ proc pruneReceived(node: EthereumNode) {.raises: [].} =
|
||||
# the received sets.
|
||||
peer.received = intersection(peer.received, wakuNet.queue.itemHashes)
|
||||
|
||||
proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
|
||||
proc run(node: EthereumNode, network: WakuNetwork) {.async, raises: [Defect].} =
|
||||
while true:
|
||||
# prune message queue every second
|
||||
# TTL unit is in seconds, so this should be sufficient?
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
{.push raises: [Defect, CatchableError].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, sets],
|
||||
chronicles,
|
||||
json_rpc/rpcserver,
|
||||
libp2p/[peerinfo, switch],
|
||||
../../protocol/waku_store/[waku_store_types, waku_store],
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import
|
||||
chronicles,
|
||||
json_rpc/rpcserver,
|
||||
../wakunode2
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
{.push raises: [Defect, CatchableError].}
|
||||
|
||||
import
|
||||
std/[tables,sequtils],
|
||||
|
||||
@ -25,8 +25,8 @@ type
|
||||
connected*: bool
|
||||
|
||||
WakuKeyPair* = object
|
||||
seckey*: PrivateKey
|
||||
pubkey*: PublicKey
|
||||
seckey*: keys.PrivateKey
|
||||
pubkey*: keys.PublicKey
|
||||
|
||||
TopicCache* = TableRef[string, seq[WakuMessage]]
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
{.push raises: [Defect, CatchableError].}
|
||||
|
||||
import
|
||||
std/[tables,sequtils],
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
{.push raises: [Defect, CatchableError].}
|
||||
|
||||
import
|
||||
std/[tables,sequtils],
|
||||
@ -19,7 +19,7 @@ const maxCache* = 30 # Max number of messages cached per topic @TODO make this c
|
||||
|
||||
proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: TopicCache) =
|
||||
|
||||
proc topicHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
proc topicHandler(topic: string, data: seq[byte]) {.async, raises: [Defect].} =
|
||||
trace "Topic handler triggered", topic=topic
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
{.push raises: [Defect, CatchableError].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
chronicles,
|
||||
json_rpc/rpcserver,
|
||||
../../protocol/waku_store/waku_store_types,
|
||||
../wakunode2,
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
{.push raises: [Defect, Exception].}
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, sets, sequtils, times],
|
||||
@ -34,7 +34,9 @@ proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo =
|
||||
|
||||
proc insertOrReplace(ps: PeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64 = 0) =
|
||||
storedInfo: StoredInfo,
|
||||
connectedness: Connectedness,
|
||||
disconnectTime: int64 = 0) {.raises: [Defect, Exception]} =
|
||||
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
||||
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
|
||||
if res.isErr:
|
||||
@ -75,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
|
||||
return none(Connection)
|
||||
|
||||
proc loadFromStorage(pm: PeerManager) =
|
||||
proc loadFromStorage(pm: PeerManager) {.raises: [Defect, Exception]} =
|
||||
# Load peers from storage, if available
|
||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
||||
if peerId == pm.switch.peerInfo.peerId:
|
||||
@ -110,7 +112,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||
return
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager {.raises: [Defect, Exception]} =
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new(),
|
||||
storage: storage)
|
||||
@ -160,7 +162,7 @@ proc hasPeers*(pm: PeerManager, proto: string): bool =
|
||||
# Returns `true` if manager has any peers for the specified protocol
|
||||
pm.peers.anyIt(it.protos.contains(proto))
|
||||
|
||||
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
|
||||
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) {.raises: [Defect, Exception]} =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
if peerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
libp2p/standard_setup,
|
||||
libp2p/builders,
|
||||
libp2p/peerstore
|
||||
|
||||
export peerstore, standard_setup
|
||||
export peerstore, builders
|
||||
|
||||
type
|
||||
Connectedness* = enum
|
||||
|
||||
@ -16,5 +16,5 @@ type
|
||||
|
||||
# MessageStore interface
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base, raises: [Defect, Exception].} = discard
|
||||
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
sqlite3_abi,
|
||||
chronos, metrics,
|
||||
@ -73,8 +75,8 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
||||
|
||||
ok()
|
||||
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
||||
## Retreives all messages from the storage.
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] {.raises: [Defect, Exception].} =
|
||||
## Retrieves all messages from the storage.
|
||||
##
|
||||
## **Example:**
|
||||
##
|
||||
@ -86,7 +88,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
||||
## if res.isErr:
|
||||
## echo "error"
|
||||
var gotMessages = false
|
||||
proc msg(s: ptr sqlite3_stmt) =
|
||||
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
|
||||
gotMessages = true
|
||||
let
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
os,
|
||||
sqlite3_abi,
|
||||
@ -8,8 +10,6 @@ import
|
||||
libp2p/stream/connection,
|
||||
stew/results, metrics
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import
|
||||
std/[options, tables, strutils, sequtils],
|
||||
chronos, chronicles, metrics, stew/shims/net as stewNet,
|
||||
chronos, chronicles, metrics,
|
||||
metrics/chronos_httpserver,
|
||||
stew/shims/net as stewNet,
|
||||
# TODO: Why do we need eth keys?
|
||||
eth/keys,
|
||||
web3,
|
||||
@ -11,7 +13,7 @@ import
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/standard_setup,
|
||||
libp2p/builders,
|
||||
../protocol/[waku_relay, waku_message, message_notifier],
|
||||
../protocol/waku_store/waku_store,
|
||||
../protocol/waku_swap/waku_swap,
|
||||
@ -656,7 +658,7 @@ when isMainModule:
|
||||
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) =
|
||||
info "Starting metrics HTTP server", serverIp, serverPort
|
||||
|
||||
metrics.startHttpServer($serverIp, serverPort)
|
||||
startMetricsHttpServer($serverIp, serverPort)
|
||||
|
||||
info "Metrics HTTP server started", serverIp, serverPort
|
||||
|
||||
|
||||
@ -2,9 +2,10 @@
|
||||
##
|
||||
## See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-relay.md
|
||||
## for spec.
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, sets],
|
||||
std/[tables, sets],
|
||||
chronos, chronicles, metrics,
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub],
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
@ -28,13 +29,20 @@ method init*(w: WakuRelay) =
|
||||
##
|
||||
|
||||
debug "Incoming WakuRelay connection"
|
||||
await w.handleConn(conn, proto)
|
||||
try:
|
||||
await w.handleConn(conn, proto)
|
||||
except CancelledError:
|
||||
# This is top-level procedure which will work as separate task, so it
|
||||
# do not need to propogate CancelledError.
|
||||
trace "Unexpected cancellation in relay handler", conn
|
||||
except CatchableError as exc:
|
||||
trace "WakuRelay handler leaks an error", exc = exc.msg, conn
|
||||
|
||||
# XXX: Handler hijack GossipSub here?
|
||||
w.handler = handler
|
||||
w.codec = WakuRelayCodec
|
||||
|
||||
method initPubSub*(w: WakuRelay) =
|
||||
method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} =
|
||||
debug "initWakuRelay"
|
||||
|
||||
# after discussions with @sinkingsugar, this is essentially what is needed for
|
||||
|
||||
@ -2,6 +2,8 @@
|
||||
## See spec for more details:
|
||||
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, times, sequtils, algorithm, options],
|
||||
bearssl,
|
||||
@ -338,8 +340,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
(result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo)
|
||||
|
||||
|
||||
method init*(ws: WakuStore) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var res = HistoryRPC.init(message)
|
||||
if res.isErr:
|
||||
@ -369,7 +371,7 @@ method init*(ws: WakuStore) =
|
||||
await conn.writeLp(HistoryRPC(requestId: value.requestId,
|
||||
response: response).encode().buffer)
|
||||
|
||||
ws.handler = handle
|
||||
ws.handler = handler
|
||||
ws.codec = WakuStoreCodec
|
||||
|
||||
if ws.store.isNil:
|
||||
@ -388,7 +390,7 @@ method init*(ws: WakuStore) =
|
||||
|
||||
|
||||
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
|
||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
|
||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T {.raises: [Defect, Exception]} =
|
||||
debug "init"
|
||||
new result
|
||||
result.rng = rng
|
||||
@ -398,7 +400,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte
|
||||
result.init()
|
||||
|
||||
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
|
||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} =
|
||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||
waku_store_peers.inc()
|
||||
|
||||
@ -526,7 +528,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
||||
lastSeenTime = max(lastSeenTime - offset, 0)
|
||||
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
||||
|
||||
proc handler(response: HistoryResponse) {.gcsafe.} =
|
||||
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
|
||||
for msg in response.messages:
|
||||
let index = msg.computeIndex()
|
||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user