General refactoring: `nim-waku` (#671)

* General Track 1 refactoring
This commit is contained in:
Hanno Cornelius 2021-07-16 17:13:36 +02:00 committed by GitHub
parent a41b9ac80b
commit 13cf7380bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 189 additions and 115 deletions

View File

@ -4,9 +4,11 @@
when not(compileOption("threads")): when not(compileOption("threads")):
{.fatal: "Please, compile this program with the --threads:on option!".} {.fatal: "Please, compile this program with the --threads:on option!".}
{.push raises: [Defect].}
import std/[tables, strformat, strutils, times, httpclient, json, sequtils, random, options] import std/[tables, strformat, strutils, times, httpclient, json, sequtils, random, options]
import confutils, chronicles, chronos, stew/shims/net as stewNet, import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, endians2], eth/keys, bearssl, stew/[byteutils, endians2, results],
nimcrypto/pbkdf2 nimcrypto/pbkdf2
import libp2p/[switch, # manage transports, a single entry point for dialing and listening import libp2p/[switch, # manage transports, a single entry point for dialing and listening
crypto/crypto, # cryptographic functions crypto/crypto, # cryptographic functions
@ -19,10 +21,6 @@ import libp2p/[switch, # manage transports, a single entry poi
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
import ../../waku/v2/node/[wakunode2, waku_payload], import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/common/utils/nat, ../../waku/common/utils/nat,
./config_chat2 ./config_chat2
@ -61,10 +59,13 @@ type
## chat2 protobufs ## ## chat2 protobufs ##
##################### #####################
type Chat2Message* = object type
timestamp*: int64 SelectResult*[T] = Result[T, string]
nick*: string
payload*: seq[byte] Chat2Message* = object
timestamp*: int64
nick*: string
payload*: seq[byte]
proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] =
var msg = Chat2Message() var msg = Chat2Message()
@ -119,7 +120,7 @@ proc showChatPrompt(c: Chat) =
except IOError: except IOError:
discard discard
proc printReceivedMessage(c: Chat, msg: WakuMessage) {.raises: [Defect].} = proc printReceivedMessage(c: Chat, msg: WakuMessage) =
when PayloadV1: when PayloadV1:
# Use Waku v1 payload encoding/encryption # Use Waku v1 payload encoding/encryption
let let
@ -156,16 +157,29 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) {.raises: [Defect].} =
trace "Printing message", topic=DefaultTopic, chatLine, trace "Printing message", topic=DefaultTopic, chatLine,
contentTopic = msg.contentTopic contentTopic = msg.contentTopic
proc selectRandomNode(fleetStr: string): string = proc selectRandomNode(fleetStr: string): SelectResult[string] =
randomize() randomize()
let var
fleet: string
nodes: seq[tuple[key: string, val: JsonNode]]
randNode: string
try:
# Get latest fleet addresses # Get latest fleet addresses
fleet = newHttpClient().getContent("https://fleets.status.im") fleet = newHttpClient().getContent("https://fleets.status.im")
# Select the JSONObject corresponding to the selected wakuv2 fleet and convert to seq of key-val pairs # Select the JSONObject corresponding to the selected wakuv2 fleet and convert to seq of key-val pairs
nodes = toSeq(fleet.parseJson(){"fleets", "wakuv2." & fleetStr, "waku"}.pairs()) nodes = toSeq(fleet.parseJson(){"fleets", "wakuv2." & fleetStr, "waku"}.pairs())
# Select a random node from the selected fleet, convert to string and return if nodes.len < 1:
return nodes[rand(nodes.len - 1)].val.getStr() return err("Empty fleet nodes list")
# Select a random node from the selected fleet, convert to string and return
randNode = nodes[rand(nodes.len - 1)].val.getStr()
except Exception: # @TODO: HttpClient raises generic Exception
return err("Failed to select random node")
ok(randNode)
proc readNick(transp: StreamTransport): Future[string] {.async.} = proc readNick(transp: StreamTransport): Future[string] {.async.} =
# Chat prompt # Chat prompt
@ -286,7 +300,7 @@ proc readWriteLoop(c: Chat) {.async.} =
asyncSpawn c.writeAndPrint() # execute the async function but does not block asyncSpawn c.writeAndPrint() # execute the async function but does not block
asyncSpawn c.readAndPrint() asyncSpawn c.readAndPrint()
proc readInput(wfd: AsyncFD) {.thread.} = proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} =
## This procedure performs reading from `stdin` and sends data over ## This procedure performs reading from `stdin` and sends data over
## pipe to main thread. ## pipe to main thread.
let transp = fromPipe(wfd) let transp = fromPipe(wfd)
@ -295,6 +309,7 @@ proc readInput(wfd: AsyncFD) {.thread.} =
let line = stdin.readLine() let line = stdin.readLine()
discard waitFor transp.write(line & "\r\n") discard waitFor transp.write(line & "\r\n")
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let transp = fromPipe(rfd) let transp = fromPipe(rfd)
@ -335,9 +350,13 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let randNode = selectRandomNode($conf.fleet) let randNode = selectRandomNode($conf.fleet)
echo "Connecting to " & randNode if randNode.isOk():
echo "Connecting to " & randNode.get()
await connectToNodes(chat, @[randNode]) await connectToNodes(chat, @[randNode.get()])
else:
echo "Couldn't select a random node to connect to. Check --fleet configuration."
echo randNode.error()
let peerInfo = node.peerInfo let peerInfo = node.peerInfo
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
@ -356,12 +375,17 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
elif conf.fleet != Fleet.none: elif conf.fleet != Fleet.none:
echo "Store enabled, but no store nodes configured. Choosing one at random from " & $conf.fleet & " fleet..." echo "Store enabled, but no store nodes configured. Choosing one at random from " & $conf.fleet & " fleet..."
storenode = some(selectRandomNode($conf.fleet)) let selectNode = selectRandomNode($conf.fleet)
echo "Connecting to storenode: " & storenode.get() if selectNode.isOk:
storenode = some(selectNode.get())
else:
echo "Couldn't select a random store node to connect to. Check --fleet configuration."
echo selectNode.error()
if storenode.isSome(): if storenode.isSome():
# We have a viable storenode. Let's query it for historical messages. # We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & storenode.get()
node.wakuStore.setPeer(parsePeerInfo(storenode.get())) node.wakuStore.setPeer(parsePeerInfo(storenode.get()))
@ -387,7 +411,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
node.wakuFilter.setPeer(parsePeerInfo(conf.filternode)) node.wakuFilter.setPeer(parsePeerInfo(conf.filternode))
proc filterHandler(msg: WakuMessage) {.gcsafe, raises: [Defect].} = proc filterHandler(msg: WakuMessage) {.gcsafe.} =
trace "Hit filter handler", contentTopic=msg.contentTopic trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg) chat.printReceivedMessage(msg)

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import import
std/[tables, times, strutils, hashes, sequtils], std/[tables, times, strutils, hashes, sequtils],
chronos, confutils, chronicles, chronicles/topics_registry, chronos, confutils, chronicles, chronicles/topics_registry,
@ -8,7 +10,6 @@ import
../../../waku/common/utils/matterbridge_client, ../../../waku/common/utils/matterbridge_client,
# Waku v2 imports # Waku v2 imports
libp2p/crypto/crypto, libp2p/crypto/crypto,
../../../waku/v2/protocol/waku_filter/waku_filter_types,
../../../waku/v2/node/wakunode2, ../../../waku/v2/node/wakunode2,
# Chat 2 imports # Chat 2 imports
../chat2, ../chat2,
@ -107,19 +108,21 @@ proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises:
assert chat2Msg.isOk assert chat2Msg.isOk
try: let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick)
username = chat2Msg[].nick)
except OSError, IOError, TimeoutError: if postRes.isErr() or (postRes[] == false):
chat2_mb_dropped.inc(labelValues = ["duplicate"]) chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message." error "Matterbridge host unreachable. Dropping message."
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} = proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
while cmb.running: while cmb.running:
try: let getRes = cmb.mbClient.getMessages()
for jsonNode in cmb.mbClient.getMessages():
if getRes.isOk():
for jsonNode in getRes[]:
handler(jsonNode) handler(jsonNode)
except OSError, IOError: else:
error "Matterbridge host unreachable. Sleeping before retrying." error "Matterbridge host unreachable. Sleeping before retrying."
await sleepAsync(chronos.seconds(10)) await sleepAsync(chronos.seconds(10))
@ -136,20 +139,20 @@ proc new*(T: type Chat2MatterBridge,
nodev2Key: crypto.PrivateKey, nodev2Key: crypto.PrivateKey,
nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, nodev2BindIp: ValidIpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
contentTopic: string): T = contentTopic: string): T
{.raises: [Defect, ValueError, KeyError, LPError].} =
# Setup Matterbridge # Setup Matterbridge
let let
mbClient = MatterbridgeClient.new(mbHostUri, mbGateway) mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
# Let's verify the Matterbridge configuration before continuing # Let's verify the Matterbridge configuration before continuing
try: let clientHealth = mbClient.isHealthy()
if mbClient.isHealthy():
info "Reached Matterbridge host", host=mbClient.host if clientHealth.isOk() and clientHealth[]:
else: info "Reached Matterbridge host", host=mbClient.host
raise newException(ValueError, "Matterbridge client not healthy") else:
except OSError, IOError: raise newException(ValueError, "Matterbridge client not reachable/healthy")
raise newException(ValueError, "Matterbridge host unreachable")
# Setup Waku v2 node # Setup Waku v2 node
let let
@ -202,6 +205,7 @@ proc stop*(cmb: Chat2MatterBridge) {.async.} =
await cmb.nodev2.stop() await cmb.nodev2.stop()
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule: when isMainModule:
import import
../../../waku/common/utils/nat, ../../../waku/common/utils/nat,

View File

@ -4,7 +4,7 @@ import
std/[unittest, tables, strutils, os, sequtils], std/[unittest, tables, strutils, os, sequtils],
chronicles, chronicles,
stew/results, stew/results,
../../waku/v2/node/storage/migration/[migration_types, migration_utils] ../../waku/v2/node/storage/migration/migration_utils
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message" const MIGRATION_PATH = sourceDir / "../../waku/v2/node/storage/migration/migrations_scripts/message"

View File

@ -19,19 +19,19 @@ procSuite "Namespacing utils":
ns.encoding == "proto" ns.encoding == "proto"
# Invalid cases # Invalid cases
expect ValueError: expect CatchableError:
# Topic should be namespaced # Topic should be namespaced
discard NamespacedTopic.fromString("this-is-not-namespaced").tryGet() discard NamespacedTopic.fromString("this-is-not-namespaced").tryGet()
expect ValueError: expect CatchableError:
# Topic should start with '/' # Topic should start with '/'
discard NamespacedTopic.fromString("waku/2/default-waku/proto").tryGet() discard NamespacedTopic.fromString("waku/2/default-waku/proto").tryGet()
expect ValueError: expect CatchableError:
# Topic has too few parts # Topic has too few parts
discard NamespacedTopic.fromString("/waku/2/default-waku").tryGet() discard NamespacedTopic.fromString("/waku/2/default-waku").tryGet()
expect ValueError: expect CatchableError:
# Topic has too many parts # Topic has too many parts
discard NamespacedTopic.fromString("/waku/2/default-waku/proto/2").tryGet() discard NamespacedTopic.fromString("/waku/2/default-waku/proto/2").tryGet()

View File

@ -14,10 +14,6 @@ import
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/storage/peer/waku_peer_storage,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers ../test_helpers
procSuite "Peer Manager": procSuite "Peer Manager":

View File

@ -82,8 +82,8 @@ procSuite "WakuBridge":
toV2ContentTopic([byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]) == ContentTopic("/waku/1/1a2b3c4d/rlp") toV2ContentTopic([byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]) == ContentTopic("/waku/1/1a2b3c4d/rlp")
# Invalid cases # Invalid cases
expect ValueError: expect LPError:
# Content topic not namespaced # Content topic not namespaced
discard toV1Topic(ContentTopic("this-is-my-content")) discard toV1Topic(ContentTopic("this-is-my-content"))

View File

@ -1,5 +1,8 @@
{.push raises: [Defect].}
import import
std/[httpclient, json, uri, options] std/[httpclient, json, uri, options],
stew/results
const const
# Resource locators # Resource locators
@ -9,6 +12,8 @@ const
health* = "/api/health" health* = "/api/health"
type type
MatterbridgeResult[T] = Result[T, string]
MatterbridgeClient* = ref object of RootObj MatterbridgeClient* = ref object of RootObj
hostClient*: HttpClient hostClient*: HttpClient
host*: Uri host*: Uri
@ -16,7 +21,9 @@ type
proc new*(T: type MatterbridgeClient, proc new*(T: type MatterbridgeClient,
hostUri: string, hostUri: string,
gateway = "gateway1"): MatterbridgeClient = gateway = "gateway1"): MatterbridgeClient
{.raises: [Defect, KeyError].} =
let mbClient = MatterbridgeClient() let mbClient = MatterbridgeClient()
mbClient.hostClient = newHttpClient() mbClient.hostClient = newHttpClient()
@ -27,29 +34,46 @@ proc new*(T: type MatterbridgeClient,
return mbClient return mbClient
proc getMessages*(mb: MatterbridgeClient): seq[JsonNode] = proc getMessages*(mb: MatterbridgeClient): MatterbridgeResult[seq[JsonNode]] =
let response = mb.hostClient.get($(mb.host / messages)) var
assert response.status == "200 OK" response: Response
msgs: seq[JsonNode]
return parseJson(response.body()).getElems() try:
response = mb.hostClient.get($(mb.host / messages))
proc postMessage*(mb: MatterbridgeClient, msg: JsonNode) = msgs = parseJson(response.body()).getElems()
let response = mb.hostClient.request($(mb.host / message), except Exception as e:
httpMethod = HttpPost, return err("failed to get messages: " & e.msg)
body = $msg)
assert response.status == "200 OK" assert response.status == "200 OK"
# @TODO: better error-handling here ok(msgs)
proc postMessage*(mb: MatterbridgeClient, text: string, username: string) = proc postMessage*(mb: MatterbridgeClient, msg: JsonNode): MatterbridgeResult[bool] =
var response: Response
try:
response = mb.hostClient.request($(mb.host / message),
httpMethod = HttpPost,
body = $msg)
except Exception as e:
return err("post request failed: " & e.msg)
ok(response.status == "200 OK")
proc postMessage*(mb: MatterbridgeClient, text: string, username: string): MatterbridgeResult[bool] =
let jsonNode = %* {"text": text, let jsonNode = %* {"text": text,
"username": username, "username": username,
"gateway": mb.gateway} "gateway": mb.gateway}
mb.postMessage(jsonNode) return mb.postMessage(jsonNode)
proc isHealthy*(mb: MatterbridgeClient): bool = proc isHealthy*(mb: MatterbridgeClient): MatterbridgeResult[bool] =
let response = mb.hostClient.get($(mb.host / health)) var
response: Response
healthOk: bool
try:
response = mb.hostClient.get($(mb.host / health))
healthOk = response.body == "OK"
except Exception as e:
return err("failed to get health: " & e.msg)
return response.status == "200 OK" and response.body == "OK" ok(response.status == "200 OK" and healthOk)

View File

@ -1,13 +1,24 @@
{.push raises: [Defect].}
import import
std/[strutils, options], std/[strutils, options],
chronicles, stew/shims/net as stewNet, chronicles, stew/shims/net as stewNet,
eth/net/nat eth/net/nat
proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port): logScope:
tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], topics = "nat"
udpPort: Option[Port]] {.gcsafe.} =
var nat: NatStrategy proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port):
tuple[ip: Option[ValidIpAddress],
tcpPort: Option[Port],
udpPort: Option[Port]] {.gcsafe.} =
var
endpoint: tuple[ip: Option[ValidIpAddress],
tcpPort: Option[Port],
udpPort: Option[Port]]
nat: NatStrategy
case natConf.toLowerAscii: case natConf.toLowerAscii:
of "any": of "any":
nat = NatAny nat = NatAny
@ -21,10 +32,10 @@ proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port):
if natConf.startsWith("extip:"): if natConf.startsWith("extip:"):
try: try:
# any required port redirection is assumed to be done by hand # any required port redirection is assumed to be done by hand
result.ip = some(ValidIpAddress.init(natConf[6..^1])) endpoint.ip = some(ValidIpAddress.init(natConf[6..^1]))
nat = NatNone nat = NatNone
except ValueError: except ValueError:
error "nor a valid IP address", address = natConf[6..^1] error "not a valid IP address", address = natConf[6..^1]
quit QuitFailure quit QuitFailure
else: else:
error "not a valid NAT mechanism", value = natConf error "not a valid NAT mechanism", value = natConf
@ -33,14 +44,23 @@ proc setupNat*(natConf, clientId: string, tcpPort, udpPort: Port):
if nat != NatNone: if nat != NatNone:
let extIp = getExternalIP(nat) let extIp = getExternalIP(nat)
if extIP.isSome: if extIP.isSome:
result.ip = some(ValidIpAddress.init extIp.get) endpoint.ip = some(ValidIpAddress.init extIp.get)
# TODO redirectPorts in considered a gcsafety violation # TODO redirectPorts in considered a gcsafety violation
# because it obtains the address of a non-gcsafe proc? # because it obtains the address of a non-gcsafe proc?
let extPorts = ({.gcsafe.}: var extPorts: Option[(Port, Port)]
redirectPorts(tcpPort = tcpPort, try:
udpPort = udpPort, extPorts = ({.gcsafe.}:
description = clientId)) redirectPorts(tcpPort = tcpPort,
udpPort = udpPort,
description = clientId))
except Exception:
# @TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now.
error "unable to determine external ports"
extPorts = none((Port, Port))
if extPorts.isSome: if extPorts.isSome:
let (extTcpPort, extUdpPort) = extPorts.get() let (extTcpPort, extUdpPort) = extPorts.get()
result.tcpPort = some(extTcpPort) endpoint.tcpPort = some(extTcpPort)
result.udpPort = some(extUdpPort) endpoint.udpPort = some(extUdpPort)
return endpoint

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import import
std/[tables, hashes, sequtils], std/[tables, hashes, sequtils],
chronos, confutils, chronicles, chronicles/topics_registry, chronos, confutils, chronicles, chronicles/topics_registry,
@ -11,7 +13,6 @@ import
# Waku v2 imports # Waku v2 imports
libp2p/crypto/crypto, libp2p/crypto/crypto,
../v2/utils/namespacing, ../v2/utils/namespacing,
../v2/protocol/waku_filter/waku_filter_types,
../v2/node/wakunode2, ../v2/node/wakunode2,
# Common cli config # Common cli config
./config_bridge ./config_bridge
@ -75,7 +76,7 @@ proc toV2ContentTopic*(v1Topic: waku_protocol.Topic): ContentTopic =
return ContentTopic($namespacedTopic) return ContentTopic($namespacedTopic)
proc toV1Topic*(contentTopic: ContentTopic): waku_protocol.Topic {.raises: [ValueError, Defect]} = proc toV1Topic*(contentTopic: ContentTopic): waku_protocol.Topic {.raises: [Defect, LPError, ValueError]} =
## Extracts the 4-byte array v1 topic from a content topic ## Extracts the 4-byte array v1 topic from a content topic
## with format `/waku/1/<v1-topic-bytes-as-hex>/proto` ## with format `/waku/1/<v1-topic-bytes-as-hex>/proto`
@ -105,7 +106,7 @@ proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
await bridge.nodev2.publish(bridge.nodev2PubsubTopic, msg) await bridge.nodev2.publish(bridge.nodev2PubsubTopic, msg)
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [ValueError, Defect].} = proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [Defect, LPError, ValueError].} =
if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): if bridge.seen.containsOrAdd(msg.encode().buffer.hash()):
# This is a duplicate message. Return # This is a duplicate message. Return
trace "Already seen. Dropping.", msg=msg trace "Already seen. Dropping.", msg=msg
@ -139,7 +140,8 @@ proc new*(T: type WakuBridge,
nodev2BindIp: ValidIpAddress, nodev2BindPort: Port, nodev2BindIp: ValidIpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
# Bridge configuration # Bridge configuration
nodev2PubsubTopic: wakunode2.Topic): T = nodev2PubsubTopic: wakunode2.Topic): T
{.raises: [Defect, LPError].} =
# Setup Waku v1 node # Setup Waku v1 node
var var
@ -164,8 +166,8 @@ proc new*(T: type WakuBridge,
# Setup Waku v2 node # Setup Waku v2 node
let let
nodev2 = WakuNode.new(nodev2Key, nodev2 = WakuNode.new(nodev2Key,
nodev2BindIp, nodev2BindPort, nodev2BindIp, nodev2BindPort,
nodev2ExtIp, nodev2ExtPort) nodev2ExtIp, nodev2ExtPort)
return WakuBridge(nodev1: nodev1, nodev2: nodev2, nodev2PubsubTopic: nodev2PubsubTopic) return WakuBridge(nodev1: nodev1, nodev2: nodev2, nodev2PubsubTopic: nodev2PubsubTopic)
@ -216,6 +218,7 @@ proc start*(bridge: WakuBridge) {.async.} =
proc stop*(bridge: WakuBridge) {.async.} = proc stop*(bridge: WakuBridge) {.async.} =
await bridge.nodev2.stop() await bridge.nodev2.stop()
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule: when isMainModule:
import import
eth/p2p/whispernodes, eth/p2p/whispernodes,

View File

@ -5,10 +5,6 @@ import
chronicles, chronicles,
json_rpc/rpcserver, json_rpc/rpcserver,
libp2p/[peerinfo, switch], libp2p/[peerinfo, switch],
../../protocol/waku_store/[waku_store_types, waku_store],
../../protocol/waku_swap/[waku_swap_types, waku_swap],
../../protocol/waku_filter/[waku_filter_types, waku_filter],
../../protocol/waku_relay,
../wakunode2, ../wakunode2,
../peer_manager/peer_manager, ../peer_manager/peer_manager,
./jsonrpc_types ./jsonrpc_types

View File

@ -4,7 +4,6 @@ import
std/[tables,sequtils], std/[tables,sequtils],
json_rpc/rpcserver, json_rpc/rpcserver,
eth/[common, rlp, keys, p2p], eth/[common, rlp, keys, p2p],
../../protocol/waku_filter/waku_filter_types,
../wakunode2, ../wakunode2,
./jsonrpc_types ./jsonrpc_types

View File

@ -6,8 +6,7 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
eth/[common, rlp, keys, p2p], eth/[common, rlp, keys, p2p],
../wakunode2, ../wakunode2,
./jsonrpc_types, ./jsonrpc_utils, ./jsonrpc_types, ./jsonrpc_utils
../../protocol/waku_message
export jsonrpc_types export jsonrpc_types

View File

@ -4,7 +4,6 @@ import
std/options, std/options,
chronicles, chronicles,
json_rpc/rpcserver, json_rpc/rpcserver,
../../protocol/waku_store/waku_store_types,
../wakunode2, ../wakunode2,
./jsonrpc_types, ./jsonrpc_utils ./jsonrpc_types, ./jsonrpc_utils

View File

@ -6,7 +6,7 @@ import
./waku_peer_store, ./waku_peer_store,
../storage/peer/peer_storage ../storage/peer/peer_storage
export waku_peer_store export waku_peer_store, peer_storage
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]

View File

@ -1,18 +1,14 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[os, algorithm, tables, strutils], std/[tables, strutils],
chronos, metrics, chronicles,
sqlite3_abi, sqlite3_abi,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
stew/[byteutils, results], stew/[byteutils, results],
./message_store, ./message_store,
../sqlite, ../sqlite,
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../utils/pagination ../../../utils/pagination
export sqlite export sqlite
const TABLE_TITLE = "Message" const TABLE_TITLE = "Message"

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import tables, stew/results, strutils, os import tables, stew/results, strutils, os
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]

View File

@ -1,9 +1,16 @@
{.push raises: [Defect].}
import import
std/[os, algorithm, tables, strutils], std/[os, algorithm, tables, strutils],
chronicles, chronicles,
stew/results, stew/results,
migration_types migration_types
export migration_types
logScope:
topics = "migration_utils"
proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] = proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] =
## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4 ## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4
var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]()) var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]())

View File

@ -2,7 +2,6 @@
import import
stew/results, stew/results,
chronos,
../../peer_manager/waku_peer_store ../../peer_manager/waku_peer_store
## This module defines a peer storage interface. Implementations of ## This module defines a peer storage interface. Implementations of

View File

@ -3,7 +3,6 @@
import import
std/sets, std/sets,
sqlite3_abi, sqlite3_abi,
chronos, metrics,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
stew/results, stew/results,
./peer_storage, ./peer_storage,

View File

@ -3,18 +3,17 @@
import import
os, os,
sqlite3_abi, sqlite3_abi,
chronos, chronicles, metrics, chronicles,
stew/results, stew/results,
libp2p/crypto/crypto, migration/migration_utils
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
migration/[migration_types,migration_utils]
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
# #
# Most of it is a direct copy, the only unique functions being `get` and `put`. # Most of it is a direct copy, the only unique functions being `get` and `put`.
logScope:
topics = "sqlite"
type type
DatabaseResult*[T] = Result[T, string] DatabaseResult*[T] = Result[T, string]
@ -233,7 +232,7 @@ proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[b
ok(true) ok(true)
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration_types.USER_VERSION): DatabaseResult[bool] = proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration_utils.USER_VERSION): DatabaseResult[bool] =
## compares the user_version of the db with the targetVersion ## compares the user_version of the db with the targetVersion
## runs migration scripts if the user_version is outdated (does not support down migration) ## runs migration scripts if the user_version is outdated (does not support down migration)
## path points to the directory holding the migrations scripts ## path points to the directory holding the migrations scripts

View File

@ -1,3 +1,5 @@
{.push raises: [Defect].}
import import
std/options, std/options,
eth/keys, eth/keys,

View File

@ -6,7 +6,6 @@ import
metrics/chronos_httpserver, metrics/chronos_httpserver,
stew/shims/net as stewNet, stew/shims/net as stewNet,
eth/keys, eth/keys,
web3,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/ping, libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
@ -19,11 +18,18 @@ import
../protocol/waku_rln_relay/waku_rln_relay_types, ../protocol/waku_rln_relay/waku_rln_relay_types,
../utils/peers, ../utils/peers,
../utils/requests, ../utils/requests,
./storage/message/message_store,
./storage/peer/peer_storage,
./storage/migration/migration_types, ./storage/migration/migration_types,
./peer_manager/peer_manager ./peer_manager/peer_manager
export
builders,
waku_relay, waku_message,
waku_store,
waku_swap,
waku_filter,
waku_lightpush,
waku_rln_relay_types
when defined(rln): when defined(rln):
import ../protocol/waku_rln_relay/[rln, waku_rln_relay_utils] import ../protocol/waku_rln_relay/[rln, waku_rln_relay_utils]

View File

@ -18,7 +18,7 @@ import
../../utils/requests, ../../utils/requests,
../../node/peer_manager/peer_manager ../../node/peer_manager/peer_manager
export waku_store_types export waku_store_types, message_store
declarePublicGauge waku_store_messages, "number of historical messages", ["type"] declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
declarePublicGauge waku_store_peers, "number of store peers" declarePublicGauge waku_store_peers, "number of store peers"