mirror of https://github.com/waku-org/nwaku.git
chore: generic change to reduce the number of compilation warnings (#2696)
This commit is contained in:
parent
f0f94412a0
commit
78132dc12e
|
@ -4,8 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, strutils, os, sequtils],
|
||||
stew/shims/net as stewNet,
|
||||
std/[options, strutils, os, sequtils, net],
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
|
|
|
@ -31,7 +31,7 @@ import
|
|||
../testlib/wakucore,
|
||||
../testlib/wakunode
|
||||
|
||||
procSuite "WakuNode - Store":
|
||||
procSuite "WakuNode - Store Legacy":
|
||||
## Fixtures
|
||||
let timeOrigin = now()
|
||||
let msgListA =
|
||||
|
|
|
@ -9,7 +9,11 @@ import
|
|||
libp2p/crypto/secp,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch
|
||||
import ../testlib/common, ../testlib/wakucore, ../testlib/wakunode
|
||||
import
|
||||
../testlib/common,
|
||||
../testlib/wakucore,
|
||||
../testlib/wakunode,
|
||||
../../waku/node/waku_metrics
|
||||
|
||||
include ../../waku/factory/waku
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/strutils, stew/shims/net
|
||||
import std/[strutils, net]
|
||||
import ../../../envvar_serialization
|
||||
|
||||
export net, envvar_serialization
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import std/[times, strutils, strformat], stew/results, chronos
|
||||
import std/[times, strutils], stew/results, chronos
|
||||
|
||||
include db_postgres
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import std/strutils, regex, stew/results
|
|||
proc validateDbUrl*(dbUrl: string): Result[string, string] =
|
||||
## dbUrl mimics SQLAlchemy Database URL schema
|
||||
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
|
||||
let regex = re"^\w+:\/\/.+:.+@[\w*-.]+:[0-9]+\/[\w*-.]+$"
|
||||
let regex = re2"^\w+:\/\/.+:.+@[\w*-.]+:[0-9]+\/[\w*-.]+$"
|
||||
let dbUrl = dbUrl.strip()
|
||||
if "sqlite" in dbUrl or dbUrl == "" or dbUrl == "none" or dbUrl.match(regex):
|
||||
return ok(dbUrl)
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/[options, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
eth/keys as eth_keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto as libp2p_crypto
|
||||
|
|
|
@ -1,10 +1,6 @@
|
|||
## This code has been copied and addapted from `status-im/nimbu-eth2` project.
|
||||
## Link: https://github.com/status-im/nimbus-eth2/blob/c585b0a5b1ae4d55af38ad7f4715ad455e791552/beacon_chain/nimbus_binary_common.nim
|
||||
import
|
||||
std/[strutils, typetraits],
|
||||
chronicles,
|
||||
chronicles/log_output,
|
||||
chronicles/topics_registry
|
||||
import std/typetraits, chronicles, chronicles/log_output, chronicles/topics_registry
|
||||
|
||||
export chronicles.LogLevel
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, strutils]
|
||||
import chronicles, eth/net/nat, stew/results, stew/shims/net, nativesockets
|
||||
import std/[options, strutils, net]
|
||||
import chronicles, eth/net/nat, stew/results, nativesockets
|
||||
|
||||
logScope:
|
||||
topics = "nat"
|
||||
|
|
|
@ -3,8 +3,6 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
|
||||
proc flatten*[T](a: seq[seq[T]]): seq[T] =
|
||||
var aFlat = newSeq[T](0)
|
||||
for subseq in a:
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[sequtils, strutils, options, sets],
|
||||
std/[sequtils, strutils, options, sets, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
|
|
|
@ -9,8 +9,7 @@ else:
|
|||
## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/shims/net,
|
||||
std/[options, net],
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/[options, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/builders,
|
||||
|
|
|
@ -764,8 +764,6 @@ proc readValue*(
|
|||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
{.push warning[ProveInit]: off.}
|
||||
|
||||
proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] =
|
||||
try:
|
||||
let conf = WakuNodeConf.load(
|
||||
|
@ -789,5 +787,3 @@ proc defaultWakuNodeConf*(): ConfResult[WakuNodeConf] =
|
|||
return ok(conf)
|
||||
except CatchableError:
|
||||
return err("exception in defaultWakuNodeConf: " & getCurrentExceptionMsg())
|
||||
|
||||
{.pop.}
|
||||
|
|
|
@ -4,9 +4,8 @@ import
|
|||
libp2p/crypto/crypto,
|
||||
libp2p/multiaddress,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
std/[options, sequtils],
|
||||
stew/results,
|
||||
stew/shims/net
|
||||
std/[options, sequtils, net],
|
||||
stew/results
|
||||
import
|
||||
./external_config,
|
||||
../common/utils/nat,
|
||||
|
|
|
@ -4,14 +4,12 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/math,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
stew/byteutils,
|
||||
stew/endians2,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/errors,
|
||||
nimcrypto/sha2,
|
||||
secp256k1
|
||||
|
|
|
@ -4,7 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, strutils, sequtils],
|
||||
std/[options, sequtils],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
|
@ -21,20 +21,10 @@ import
|
|||
../../waku/common/logging,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_node,
|
||||
../../waku/node/waku_metrics,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/node/health_monitor,
|
||||
../../waku/waku_api/message_cache,
|
||||
../../waku/waku_api/handlers,
|
||||
../../waku/waku_api/rest/server,
|
||||
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
|
||||
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
|
||||
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
|
||||
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
|
||||
../../waku/waku_api/rest/store/handlers as rest_store_api,
|
||||
../../waku/waku_api/rest/legacy_store/handlers as rest_legacy_store_api,
|
||||
../../waku/waku_api/rest/health/handlers as rest_health_api,
|
||||
../../waku/waku_api/rest/admin/handlers as rest_admin_api,
|
||||
../../waku/waku_archive,
|
||||
../../waku/discovery/waku_dnsdisc,
|
||||
../../waku/discovery/waku_discv5,
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, strutils],
|
||||
std/[options, sequtils, strutils, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
libp2p/[multiaddress, multicodec]
|
||||
import ../../waku/waku_core/peers
|
||||
import ../waku_enr
|
||||
|
|
|
@ -4,13 +4,12 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[hashes, options, sugar, tables, strutils, sequtils, os],
|
||||
std/[hashes, options, sugar, tables, strutils, sequtils, os, net],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
stew/results,
|
||||
stew/byteutils,
|
||||
stew/shims/net as stewNet,
|
||||
eth/keys,
|
||||
nimcrypto,
|
||||
bearssl/rand,
|
||||
|
@ -435,7 +434,7 @@ proc mountFilter*(
|
|||
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
|
||||
) {.async, raises: [Defect, LPError].} =
|
||||
) {.async: (raises: []).} =
|
||||
## Mounting filter v2 protocol
|
||||
|
||||
info "mounting filter protocol"
|
||||
|
@ -444,9 +443,15 @@ proc mountFilter*(
|
|||
)
|
||||
|
||||
if node.started:
|
||||
await node.wakuFilter.start()
|
||||
try:
|
||||
await node.wakuFilter.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuFilter", error = getCurrentExceptionMsg()
|
||||
|
||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
try:
|
||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuFilter", error = getCurrentExceptionMsg()
|
||||
|
||||
proc filterHandleMessage*(
|
||||
node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
|
@ -457,7 +462,7 @@ proc filterHandleMessage*(
|
|||
|
||||
await node.wakuFilter.handleMessage(pubsubTopic, message)
|
||||
|
||||
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} =
|
||||
## Mounting both filter
|
||||
## Giving option for application level to choose btw own push message handling or
|
||||
## rely on node provided cache. - This only applies for v2 filter client
|
||||
|
@ -466,16 +471,22 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
|||
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
|
||||
|
||||
if node.started:
|
||||
await node.wakuFilterClient.start()
|
||||
try:
|
||||
await node.wakuFilterClient.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuFilterClient", error = getCurrentExceptionMsg()
|
||||
|
||||
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
try:
|
||||
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuFilterClient", error = getCurrentExceptionMsg()
|
||||
|
||||
proc filterSubscribe*(
|
||||
node: WakuNode,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
peer: RemotePeerInfo | string,
|
||||
): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
if node.wakuFilterClient.isNil():
|
||||
error "cannot register filter subscription to topic",
|
||||
|
@ -528,22 +539,29 @@ proc filterSubscribe*(
|
|||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
|
||||
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
var subRes: FilterSubscribeResult = FilterSubscribeResult.ok()
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
try:
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter subscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
subRes = FilterSubscribeResult.err(res.error)
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
if res.isErr():
|
||||
error "failed filter subscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
subRes = FilterSubscribeResult.err(res.error)
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: $pubsub))
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: $pubsub))
|
||||
except CatchableError:
|
||||
let errMsg = "exception in filterSubscribe: " & getCurrentExceptionMsg()
|
||||
error "exception in filterSubscribe", error = getCurrentExceptionMsg()
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
subRes =
|
||||
FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg))
|
||||
|
||||
# return the last error or ok
|
||||
return subRes
|
||||
|
@ -553,7 +571,7 @@ proc filterUnsubscribe*(
|
|||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
peer: RemotePeerInfo | string,
|
||||
): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
|
@ -600,29 +618,36 @@ proc filterUnsubscribe*(
|
|||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
|
||||
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok()
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
try:
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter unsubscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
unsubRes = FilterSubscribeResult.err(res.error)
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
if res.isErr():
|
||||
error "failed filter unsubscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
unsubRes = FilterSubscribeResult.err(res.error)
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: $pubsub))
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: $pubsub))
|
||||
except CatchableError:
|
||||
let errMsg = "exception in filterUnsubscribe: " & getCurrentExceptionMsg()
|
||||
error "exception in filterUnsubscribe", error = getCurrentExceptionMsg()
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
unsubRes =
|
||||
FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg))
|
||||
|
||||
# return the last error or ok
|
||||
return unsubRes
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
node: WakuNode, peer: RemotePeerInfo | string
|
||||
): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
|
@ -1055,15 +1080,21 @@ proc mountRlnRelay*(
|
|||
|
||||
## Waku peer-exchange
|
||||
|
||||
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountPeerExchange*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting waku peer exchange"
|
||||
|
||||
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager)
|
||||
|
||||
if node.started:
|
||||
await node.wakuPeerExchange.start()
|
||||
try:
|
||||
await node.wakuPeerExchange.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuPeerExchange", error = getCurrentExceptionMsg()
|
||||
|
||||
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
||||
try:
|
||||
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg()
|
||||
|
||||
proc fetchPeerExchangePeers*(
|
||||
node: Wakunode, amount: uint64
|
||||
|
@ -1110,21 +1141,25 @@ proc setPeerExchangePeer*(
|
|||
|
||||
## Other protocols
|
||||
|
||||
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting libp2p ping protocol"
|
||||
|
||||
try:
|
||||
node.libp2pPing = Ping.new(rng = node.rng)
|
||||
except Exception as e:
|
||||
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
|
||||
# @TODO: remove exception handling once explicit `raises` in ping module
|
||||
raise newException(LPError, "Failed to initialize ping protocol")
|
||||
error "failed to create ping", error = getCurrentExceptionMsg()
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start ping too.
|
||||
await node.libp2pPing.start()
|
||||
try:
|
||||
await node.libp2pPing.start()
|
||||
except CatchableError:
|
||||
error "failed to start libp2pPing", error = getCurrentExceptionMsg()
|
||||
|
||||
node.switch.mount(node.libp2pPing)
|
||||
try:
|
||||
node.switch.mount(node.libp2pPing)
|
||||
except LPError:
|
||||
error "failed to mount libp2pPing", error = getCurrentExceptionMsg()
|
||||
|
||||
# TODO: Move this logic to PeerManager
|
||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
|
@ -1153,15 +1188,21 @@ proc startKeepalive*(node: WakuNode) =
|
|||
|
||||
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
||||
|
||||
proc mountRendezvous*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting rendezvous discovery protocol"
|
||||
|
||||
node.rendezvous = RendezVous.new(node.switch)
|
||||
|
||||
if node.started:
|
||||
await node.rendezvous.start()
|
||||
try:
|
||||
await node.rendezvous.start()
|
||||
except CatchableError:
|
||||
error "failed to start rendezvous", error = getCurrentExceptionMsg()
|
||||
|
||||
node.switch.mount(node.rendezvous)
|
||||
try:
|
||||
node.switch.mount(node.rendezvous)
|
||||
except LPError:
|
||||
error "failed to mount rendezvous", error = getCurrentExceptionMsg()
|
||||
|
||||
proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
|
||||
let inputStr = $inputMultiAdd
|
||||
|
|
|
@ -5,7 +5,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, math],
|
||||
std/options,
|
||||
chronos,
|
||||
chronicles,
|
||||
eth/keys,
|
||||
|
|
|
@ -5,9 +5,7 @@ else:
|
|||
|
||||
import metrics
|
||||
|
||||
proc parseCollectorIntoF64(
|
||||
collector: SimpleCollector
|
||||
): float64 {.gcsafe, raises: [Defect].} =
|
||||
proc parseCollectorIntoF64(collector: SimpleCollector): float64 {.gcsafe, raises: [].} =
|
||||
{.gcsafe.}:
|
||||
var total = 0.float64
|
||||
for metrics in collector.metrics:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import std/tables, stew/objects, stew/templateutils
|
||||
import std/tables, stew/templateutils
|
||||
|
||||
template keepItIf*[A, B](tableParam: var Table[A, B], itPredicate: untyped) =
|
||||
bind evalTemplateParamOnce
|
||||
|
|
|
@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, chronicles, std/[options, sequtils], stew/results
|
||||
import chronos, std/[options, sequtils], stew/results
|
||||
import ../discovery/waku_discv5, ../waku_relay, ../waku_core, ./message_cache
|
||||
|
||||
### Discovery
|
||||
|
|
|
@ -5,7 +5,6 @@ else:
|
|||
|
||||
import
|
||||
std/[strformat, sequtils, tables],
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
presto/route,
|
||||
|
|
|
@ -6,7 +6,6 @@ else:
|
|||
import net, tables
|
||||
import presto
|
||||
import
|
||||
../../waku/common/utils/nat,
|
||||
../../waku/waku_node,
|
||||
../../waku/discovery/waku_discv5,
|
||||
../../waku/factory/external_config,
|
||||
|
|
|
@ -62,7 +62,9 @@ func decodeRequestBody[T](
|
|||
|
||||
return ok(requestResult.get())
|
||||
|
||||
proc getStatusDesc(protocolClientRes: filter_protocol_type.FilterSubscribeResult): string =
|
||||
proc getStatusDesc(
|
||||
protocolClientRes: filter_protocol_type.FilterSubscribeResult
|
||||
): string =
|
||||
## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double
|
||||
if protocolClientRes.isOk:
|
||||
return "OK"
|
||||
|
@ -84,8 +86,7 @@ proc convertResponse(
|
|||
): T =
|
||||
## Properly convert filter protocol's response to rest response
|
||||
return FilterSubscriptionResponse(
|
||||
requestId: requestId,
|
||||
statusDesc: getStatusDesc(protocolClientRes),
|
||||
requestId: requestId, statusDesc: getStatusDesc(protocolClientRes)
|
||||
)
|
||||
|
||||
proc convertResponse(
|
||||
|
@ -94,10 +95,8 @@ proc convertResponse(
|
|||
protocolClientRes: filter_protocol_type.FilterSubscribeError,
|
||||
): T =
|
||||
## Properly convert filter protocol's response to rest response in case of error
|
||||
return FilterSubscriptionResponse(
|
||||
requestId: requestId,
|
||||
statusDesc: $protocolClientRes,
|
||||
)
|
||||
return
|
||||
FilterSubscriptionResponse(requestId: requestId, statusDesc: $protocolClientRes)
|
||||
|
||||
proc convertErrorKindToHttpStatus(
|
||||
kind: filter_protocol_type.FilterSubscribeErrorKind
|
||||
|
@ -117,8 +116,6 @@ proc convertErrorKindToHttpStatus(
|
|||
return Http404
|
||||
of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
|
||||
return Http503
|
||||
else:
|
||||
return Http500
|
||||
|
||||
proc makeRestResponse(
|
||||
requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult
|
||||
|
|
|
@ -3,12 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
std/tables,
|
||||
std/sequtils
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../../../waku_node, ../serdes
|
||||
|
||||
#### Serialization and deserialization
|
||||
|
|
|
@ -16,7 +16,6 @@ import
|
|||
../../waku/node/peer_manager,
|
||||
../../../waku_node,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_lightpush/self_req_handler,
|
||||
../../handlers,
|
||||
../serdes,
|
||||
../responses,
|
||||
|
|
|
@ -8,10 +8,9 @@ import
|
|||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
presto/[route, client, common]
|
||||
presto/[route, client]
|
||||
|
||||
import
|
||||
../../../common/base64, ../../../waku_core, ../relay/types as relay_types, ../serdes
|
||||
import ../../../waku_core, ../relay/types as relay_types, ../serdes
|
||||
|
||||
export relay_types
|
||||
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, strutils, re],
|
||||
std/[options, strutils, re, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronos/apps/http/httpserver
|
||||
|
|
|
@ -3,9 +3,9 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/net
|
||||
import
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronos/apps/http/httpserver,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import std/[tables, strutils, os], stew/results, chronicles, chronos
|
||||
import std/strutils, stew/results, chronicles, chronos
|
||||
import
|
||||
../../../common/databases/common,
|
||||
../../../../migrations/message_store_postgres/pg_migration_manager,
|
||||
|
|
|
@ -4,8 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import chronos, stew/results
|
||||
import
|
||||
../../driver, ../../../common/databases/db_postgres, ../../../common/error_handling
|
||||
import ../../../common/databases/db_postgres, ../../../common/error_handling
|
||||
|
||||
## Simple query to validate that the postgres is working and attending requests
|
||||
const HealthCheckQuery = "SELECT version();"
|
||||
|
|
|
@ -19,7 +19,7 @@ proc new*(
|
|||
if retPolicy == "" or retPolicy == "none":
|
||||
return ok(none(RetentionPolicy))
|
||||
|
||||
const StoreMessageRetentionPolicyRegex = re"^\w+:\d*\.?\d+((g|m)b)?$"
|
||||
const StoreMessageRetentionPolicyRegex = re2"^\w+:\d*\.?\d+((g|m)b)?$"
|
||||
if not retPolicy.match(StoreMessageRetentionPolicyRegex):
|
||||
return err("invalid 'store message retention policy' format: " & retPolicy)
|
||||
|
||||
|
|
|
@ -4,12 +4,12 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, strutils, uri],
|
||||
std/[options, sequtils, strutils, uri, net],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
eth/net/utils,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/errors,
|
||||
|
|
|
@ -4,11 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, bitops, sequtils],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
eth/keys,
|
||||
libp2p/crypto/crypto
|
||||
std/[options, bitops, sequtils, net], stew/results, eth/keys, libp2p/crypto/crypto
|
||||
import ../common/enr
|
||||
|
||||
const CapabilitiesEnrField* = "waku2"
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[options, sequtils, net],
|
||||
stew/[endians2, results],
|
||||
stew/shims/net,
|
||||
eth/keys,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
libp2p/crypto/crypto
|
||||
|
|
|
@ -4,9 +4,8 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, bitops, sequtils],
|
||||
std/[options, bitops, sequtils, net],
|
||||
stew/[endians2, results],
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
eth/keys,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
|
|
|
@ -26,22 +26,44 @@ proc sendSubscribeRequest(
|
|||
wfc: WakuFilterClient,
|
||||
servicePeer: RemotePeerInfo,
|
||||
filterSubscribeRequest: FilterSubscribeRequest,
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
trace "Sending filter subscribe request",
|
||||
peerId = servicePeer.peerId, filterSubscribeRequest
|
||||
|
||||
let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
|
||||
if connOpt.isNone():
|
||||
trace "Failed to dial filter service peer", servicePeer
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
return err(FilterSubscribeError.peerDialFailure($servicePeer))
|
||||
var connOpt: Option[Connection]
|
||||
try:
|
||||
connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec)
|
||||
if connOpt.isNone():
|
||||
trace "Failed to dial filter service peer", servicePeer
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
return err(FilterSubscribeError.peerDialFailure($servicePeer))
|
||||
except CatchableError:
|
||||
let errMsg = "failed to dialPeer: " & getCurrentExceptionMsg()
|
||||
trace "failed to dialPeer", error = getCurrentExceptionMsg()
|
||||
waku_filter_errors.inc(labelValues = [errMsg])
|
||||
return err(FilterSubscribeError.badResponse(errMsg))
|
||||
|
||||
let connection = connOpt.get()
|
||||
|
||||
# TODO: this can raise an exception
|
||||
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
||||
try:
|
||||
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
||||
except CatchableError:
|
||||
let errMsg =
|
||||
"exception in waku_filter_v2 client writeLP: " & getCurrentExceptionMsg()
|
||||
trace "exception in waku_filter_v2 client writeLP", error = getCurrentExceptionMsg()
|
||||
waku_filter_errors.inc(labelValues = [errMsg])
|
||||
return err(FilterSubscribeError.badResponse(errMsg))
|
||||
|
||||
var respBuf: seq[byte]
|
||||
try:
|
||||
respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize)
|
||||
except CatchableError:
|
||||
let errMsg =
|
||||
"exception in waku_filter_v2 client readLp: " & getCurrentExceptionMsg()
|
||||
trace "exception in waku_filter_v2 client readLp", error = getCurrentExceptionMsg()
|
||||
waku_filter_errors.inc(labelValues = [errMsg])
|
||||
return err(FilterSubscribeError.badResponse(errMsg))
|
||||
|
||||
let respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize)
|
||||
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
|
||||
if respDecodeRes.isErr():
|
||||
trace "Failed to decode filter subscribe response", servicePeer
|
||||
|
@ -80,7 +102,7 @@ proc subscribe*(
|
|||
servicePeer: RemotePeerInfo,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
var contentTopicSeq: seq[ContentTopic]
|
||||
when contentTopics is seq[ContentTopic]:
|
||||
contentTopicSeq = contentTopics
|
||||
|
@ -99,7 +121,7 @@ proc unsubscribe*(
|
|||
servicePeer: RemotePeerInfo,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
var contentTopicSeq: seq[ContentTopic]
|
||||
when contentTopics is seq[ContentTopic]:
|
||||
contentTopicSeq = contentTopics
|
||||
|
@ -115,7 +137,7 @@ proc unsubscribe*(
|
|||
|
||||
proc unsubscribeAll*(
|
||||
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
let requestId = generateRequestId(wfc.rng)
|
||||
let filterSubscribeRequest =
|
||||
FilterSubscribeRequest.unsubscribeAll(requestId = requestId)
|
||||
|
|
|
@ -216,8 +216,7 @@ proc handleMessage*(
|
|||
) {.async.} =
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
|
||||
debug "handling message",
|
||||
pubsubTopic = pubsubTopic, msg_hash = msgHash
|
||||
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
|
||||
|
||||
let handleMessageStartTime = Moment.now()
|
||||
|
||||
|
@ -312,13 +311,13 @@ proc startMaintainingSubscriptions(wf: WakuFilter, interval: Duration) =
|
|||
|
||||
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
|
||||
|
||||
method start*(wf: WakuFilter) {.async.} =
|
||||
method start*(wf: WakuFilter) {.async, base.} =
|
||||
debug "starting filter protocol"
|
||||
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)
|
||||
|
||||
await procCall LPProtocol(wf).start()
|
||||
|
||||
method stop*(wf: WakuFilter) {.async.} =
|
||||
method stop*(wf: WakuFilter) {.async, base.} =
|
||||
debug "stopping filter protocol"
|
||||
if not wf.maintenanceTask.isNil():
|
||||
wf.maintenanceTask.clearTimer()
|
||||
|
|
|
@ -12,7 +12,7 @@ else:
|
|||
## which spawn a full service Waku node
|
||||
## that could be used also as a lightpush client, helping testing and development.
|
||||
|
||||
import stew/results, chronos, chronicles, std/options, metrics
|
||||
import stew/results, chronos, std/options, metrics
|
||||
import
|
||||
../waku_core,
|
||||
./protocol,
|
||||
|
|
|
@ -180,11 +180,11 @@ proc addValidator*(
|
|||
) {.gcsafe.} =
|
||||
w.wakuValidators.add((handler, errorMessage))
|
||||
|
||||
method start*(w: WakuRelay) {.async.} =
|
||||
method start*(w: WakuRelay) {.async, base.} =
|
||||
debug "start"
|
||||
await procCall GossipSub(w).start()
|
||||
|
||||
method stop*(w: WakuRelay) {.async.} =
|
||||
method stop*(w: WakuRelay) {.async, base.} =
|
||||
debug "stop"
|
||||
await procCall GossipSub(w).stop()
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ when defined(rln_v2):
|
|||
start: MembershipIndex,
|
||||
rateCommitments = newSeq[RateCommitment](),
|
||||
toRemoveIndices = newSeq[MembershipIndex](),
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
): Future[void] {.async: (raises: [Exception]), base.} =
|
||||
initializedGuard(g)
|
||||
|
||||
# convert the rateCommitment struct to a leaf value
|
||||
|
@ -197,7 +197,7 @@ else:
|
|||
start: MembershipIndex,
|
||||
idCommitments = newSeq[IDCommitment](),
|
||||
toRemoveIndices = newSeq[MembershipIndex](),
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
): Future[void] {.async: (raises: [Exception]), base.} =
|
||||
initializedGuard(g)
|
||||
|
||||
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import ../group_manager_base, ../../constants, ../../rln, std/sequtils
|
||||
import ../group_manager_base, ../../rln, std/sequtils, ../../constants
|
||||
|
||||
export group_manager_base
|
||||
|
||||
|
|
|
@ -4,11 +4,10 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[algorithm, sequtils, strutils, tables, times, os, deques],
|
||||
std/[sequtils, tables, times, deques],
|
||||
chronicles,
|
||||
options,
|
||||
chronos,
|
||||
chronos/ratelimit,
|
||||
stint,
|
||||
web3,
|
||||
json,
|
||||
|
@ -33,8 +32,7 @@ import
|
|||
../common/error_handling,
|
||||
../waku_relay, # for WakuRelayHandler
|
||||
../waku_core,
|
||||
../waku_keystore,
|
||||
../utils/collector
|
||||
../waku_keystore
|
||||
|
||||
logScope:
|
||||
topics = "waku rln_relay"
|
||||
|
|
|
@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options, stew/arrayops, nimcrypto/hash
|
||||
import std/options, stew/arrayops
|
||||
import ../common/[protobuf, paging], ../waku_core, ./common
|
||||
|
||||
const DefaultMaxRpcSize* = -1
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
## stored by that local store node.
|
||||
##
|
||||
|
||||
import stew/results, chronos, chronicles
|
||||
import stew/results, chronos
|
||||
import ./protocol, ./common
|
||||
|
||||
proc handleSelfStoreRequest*(
|
||||
|
|
Loading…
Reference in New Issue