mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
Merge branch 'master' into release/v0.37
This commit is contained in:
commit
ff93643ae9
4
.gitmodules
vendored
4
.gitmodules
vendored
@ -184,7 +184,3 @@
|
||||
url = https://github.com/waku-org/waku-rlnv2-contract.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/mix"]
|
||||
path = vendor/mix
|
||||
url = https://github.com/vacp2p/mix/
|
||||
branch = main
|
||||
|
||||
@ -317,27 +317,19 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
if conf.logLevel != LogLevel.NONE:
|
||||
setLogLevel(conf.logLevel)
|
||||
|
||||
let natRes = setupNat(
|
||||
let (extIp, extTcpPort, extUdpPort) = setupNat(
|
||||
conf.nat,
|
||||
clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift),
|
||||
)
|
||||
|
||||
if natRes.isErr():
|
||||
raise newException(ValueError, "setupNat error " & natRes.error)
|
||||
|
||||
let (extIp, extTcpPort, extUdpPort) = natRes.get()
|
||||
).valueOr:
|
||||
raise newException(ValueError, "setupNat error " & error)
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let node = block:
|
||||
var builder = WakuNodeBuilder.init()
|
||||
|
||||
@ -126,23 +126,20 @@ proc toMatterbridge(
|
||||
|
||||
assert chat2Msg.isOk
|
||||
|
||||
let postRes = cmb.mbClient.postMessage(
|
||||
text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick
|
||||
)
|
||||
|
||||
if postRes.isErr() or (postRes[] == false):
|
||||
if not cmb.mbClient
|
||||
.postMessage(text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick)
|
||||
.containsValue(true):
|
||||
chat2_mb_dropped.inc(labelValues = ["duplicate"])
|
||||
error "Matterbridge host unreachable. Dropping message."
|
||||
|
||||
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
|
||||
while cmb.running:
|
||||
if (let getRes = cmb.mbClient.getMessages(); getRes.isOk()):
|
||||
for jsonNode in getRes[]:
|
||||
await handler(jsonNode)
|
||||
else:
|
||||
let msg = cmb.mbClient.getMessages().valueOr:
|
||||
error "Matterbridge host unreachable. Sleeping before retrying."
|
||||
await sleepAsync(chronos.seconds(10))
|
||||
|
||||
continue
|
||||
for jsonNode in msg:
|
||||
await handler(jsonNode)
|
||||
await sleepAsync(cmb.pollPeriod)
|
||||
|
||||
##############
|
||||
@ -243,7 +240,7 @@ proc stop*(cmb: Chat2MatterBridge) {.async: (raises: [Exception]).} =
|
||||
{.pop.}
|
||||
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
when isMainModule:
|
||||
import waku/common/utils/nat, waku/waku_api/message_cache
|
||||
import waku/common/utils/nat, waku/rest_api/message_cache
|
||||
|
||||
let
|
||||
rng = newRng()
|
||||
@ -252,25 +249,21 @@ when isMainModule:
|
||||
if conf.logLevel != LogLevel.NONE:
|
||||
setLogLevel(conf.logLevel)
|
||||
|
||||
let natRes = setupNat(
|
||||
let (nodev2ExtIp, nodev2ExtPort, _) = setupNat(
|
||||
conf.nat,
|
||||
clientId,
|
||||
Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift),
|
||||
)
|
||||
if natRes.isErr():
|
||||
error "Error in setupNat", error = natRes.error
|
||||
).valueOr:
|
||||
raise newException(ValueError, "setupNat error " & error)
|
||||
|
||||
# Load address configuration
|
||||
let
|
||||
(nodev2ExtIp, nodev2ExtPort, _) = natRes.get()
|
||||
## The following heuristic assumes that, in absence of manual
|
||||
## config, the external port is the same as the bind port.
|
||||
extPort =
|
||||
if nodev2ExtIp.isSome() and nodev2ExtPort.isNone():
|
||||
some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift))
|
||||
else:
|
||||
nodev2ExtPort
|
||||
## The following heuristic assumes that, in absence of manual
|
||||
## config, the external port is the same as the bind port.
|
||||
let extPort =
|
||||
if nodev2ExtIp.isSome() and nodev2ExtPort.isNone():
|
||||
some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift))
|
||||
else:
|
||||
nodev2ExtPort
|
||||
|
||||
let bridge = Chat2Matterbridge.new(
|
||||
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
|
||||
|
||||
@ -29,8 +29,8 @@ import
|
||||
peerid, # Implement how peers interact
|
||||
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
|
||||
nameresolving/dnsresolver,
|
||||
protocols/mix/curve25519,
|
||||
] # define DNS resolution
|
||||
import mix/curve25519
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
@ -175,18 +175,16 @@ proc startMetricsServer(
|
||||
): Result[MetricsHttpServerRef, string] =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr:
|
||||
return err("metrics HTTP server start failed: " & $error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
||||
ok(metricsServerRes.value)
|
||||
ok(server)
|
||||
|
||||
proc publish(c: Chat, line: string) {.async.} =
|
||||
# First create a Chat2Message protobuf with this line of text
|
||||
@ -333,57 +331,56 @@ proc maintainSubscription(
|
||||
const maxFailedServiceNodeSwitches = 10
|
||||
var noFailedSubscribes = 0
|
||||
var noFailedServiceNodeSwitches = 0
|
||||
const RetryWaitMs = 2.seconds # Quick retry interval
|
||||
const SubscriptionMaintenanceMs = 30.seconds # Subscription maintenance interval
|
||||
while true:
|
||||
info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer)
|
||||
# First use filter-ping to check if we have an active subscription
|
||||
let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer)
|
||||
if pingRes.isErr():
|
||||
# No subscription found. Let's subscribe.
|
||||
error "ping failed.", err = pingRes.error
|
||||
trace "no subscription found. Sending subscribe request"
|
||||
let pingErr = (await wakuNode.wakuFilterClient.ping(actualFilterPeer)).errorOr:
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
info "subscription is live."
|
||||
continue
|
||||
|
||||
let subscribeRes = await wakuNode.filterSubscribe(
|
||||
# No subscription found. Let's subscribe.
|
||||
error "ping failed.", error = pingErr
|
||||
trace "no subscription found. Sending subscribe request"
|
||||
|
||||
let subscribeErr = (
|
||||
await wakuNode.filterSubscribe(
|
||||
some(filterPubsubTopic), filterContentTopic, actualFilterPeer
|
||||
)
|
||||
).errorOr:
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
notice "subscribe request successful."
|
||||
continue
|
||||
|
||||
if subscribeRes.isErr():
|
||||
noFailedSubscribes += 1
|
||||
error "Subscribe request failed.",
|
||||
err = subscribeRes.error,
|
||||
peer = actualFilterPeer,
|
||||
failCount = noFailedSubscribes
|
||||
noFailedSubscribes += 1
|
||||
error "Subscribe request failed.",
|
||||
error = subscribeErr, peer = actualFilterPeer, failCount = noFailedSubscribes
|
||||
|
||||
# TODO: disconnet from failed actualFilterPeer
|
||||
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
|
||||
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
|
||||
# TODO: disconnet from failed actualFilterPeer
|
||||
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
|
||||
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
|
||||
|
||||
if noFailedSubscribes < maxFailedSubscribes:
|
||||
await sleepAsync(2000) # Wait a bit before retrying
|
||||
continue
|
||||
elif not preventPeerSwitch:
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
)
|
||||
peerOpt.isOkOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
if noFailedSubscribes < maxFailedSubscribes:
|
||||
await sleepAsync(RetryWaitMs) # Wait a bit before retrying
|
||||
elif not preventPeerSwitch:
|
||||
# try again with new peer without delay
|
||||
let actualFilterPeer = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
).valueOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
|
||||
actualFilterPeer = peerOpt.get()
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
|
||||
notice "subscribe request successful."
|
||||
noFailedSubscribes = 0
|
||||
else:
|
||||
info "subscription is live."
|
||||
|
||||
await sleepAsync(30000) # Subscription maintenance interval
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
|
||||
{.pop.}
|
||||
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
@ -401,17 +398,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
if conf.logLevel != LogLevel.NONE:
|
||||
setLogLevel(conf.logLevel)
|
||||
|
||||
let natRes = setupNat(
|
||||
let (extIp, extTcpPort, extUdpPort) = setupNat(
|
||||
conf.nat,
|
||||
clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift),
|
||||
)
|
||||
|
||||
if natRes.isErr():
|
||||
raise newException(ValueError, "setupNat error " & natRes.error)
|
||||
|
||||
let (extIp, extTcpPort, extUdpPort) = natRes.get()
|
||||
).valueOr:
|
||||
raise newException(ValueError, "setupNat error " & error)
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
@ -421,13 +414,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
error "failed to add sharded topics to ENR", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let node = block:
|
||||
var builder = WakuNodeBuilder.init()
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
libp2p/wire
|
||||
|
||||
import
|
||||
../../tools/confutils/cli_args,
|
||||
tools/confutils/cli_args,
|
||||
waku/[
|
||||
node/peer_manager,
|
||||
waku_lightpush/common,
|
||||
@ -59,7 +59,4 @@ proc logSelfPeers*(pm: PeerManager) =
|
||||
{allPeers(pm)}
|
||||
*------------------------------------------------------------------------------------------*""".fmt()
|
||||
|
||||
if printable.isErr():
|
||||
echo "Error while printing statistics: " & printable.error().msg
|
||||
else:
|
||||
echo printable.get()
|
||||
echo printable.valueOr("Error while printing statistics: " & error.msg)
|
||||
|
||||
@ -11,7 +11,7 @@ import
|
||||
confutils
|
||||
|
||||
import
|
||||
../../tools/confutils/cli_args,
|
||||
tools/confutils/cli_args,
|
||||
waku/[
|
||||
common/enr,
|
||||
common/logging,
|
||||
@ -49,13 +49,10 @@ when isMainModule:
|
||||
|
||||
const versionString = "version / git commit hash: " & waku_factory.git_version
|
||||
|
||||
let confRes = LiteProtocolTesterConf.load(version = versionString)
|
||||
if confRes.isErr():
|
||||
error "failure while loading the configuration", error = confRes.error
|
||||
let conf = LiteProtocolTesterConf.load(version = versionString).valueOr:
|
||||
error "failure while loading the configuration", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var conf = confRes.get()
|
||||
|
||||
## Logging setup
|
||||
logging.setupLog(conf.logLevel, conf.logFormat)
|
||||
|
||||
@ -187,7 +184,7 @@ when isMainModule:
|
||||
error "Service node not found in time via PX"
|
||||
quit(QuitFailure)
|
||||
|
||||
if futForServiceNode.read().isErr():
|
||||
futForServiceNode.read().isOkOr:
|
||||
error "Service node for test not found via PX"
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -89,10 +89,7 @@ proc reportSentMessages() =
|
||||
|{numMessagesToSend+failedToSendCount:>11} |{messagesSent:>11} |{failedToSendCount:>11} |
|
||||
*----------------------------------------*""".fmt()
|
||||
|
||||
if report.isErr:
|
||||
echo "Error while printing statistics"
|
||||
else:
|
||||
echo report.get()
|
||||
echo report.valueOr("Error while printing statistics")
|
||||
|
||||
echo "*--------------------------------------------------------------------------------------------------*"
|
||||
echo "| Failure cause | count |"
|
||||
|
||||
@ -54,64 +54,65 @@ proc maintainSubscription(
|
||||
var noFailedSubscribes = 0
|
||||
var noFailedServiceNodeSwitches = 0
|
||||
var isFirstPingOnNewPeer = true
|
||||
const RetryWaitMs = 2.seconds # Quick retry interval
|
||||
const SubscriptionMaintenanceMs = 30.seconds # Subscription maintenance interval
|
||||
while true:
|
||||
info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer)
|
||||
# First use filter-ping to check if we have an active subscription
|
||||
let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer)
|
||||
if pingRes.isErr():
|
||||
if isFirstPingOnNewPeer == false:
|
||||
# Very first ping expected to fail as we have not yet subscribed at all
|
||||
lpt_receiver_lost_subscription_count.inc()
|
||||
isFirstPingOnNewPeer = false
|
||||
# No subscription found. Let's subscribe.
|
||||
error "ping failed.", err = pingRes.error
|
||||
trace "no subscription found. Sending subscribe request"
|
||||
let pingErr = (await wakuNode.wakuFilterClient.ping(actualFilterPeer)).errorOr:
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
info "subscription is live."
|
||||
continue
|
||||
|
||||
let subscribeRes = await wakuNode.filterSubscribe(
|
||||
if isFirstPingOnNewPeer == false:
|
||||
# Very first ping expected to fail as we have not yet subscribed at all
|
||||
lpt_receiver_lost_subscription_count.inc()
|
||||
isFirstPingOnNewPeer = false
|
||||
# No subscription found. Let's subscribe.
|
||||
error "ping failed.", error = pingErr
|
||||
trace "no subscription found. Sending subscribe request"
|
||||
|
||||
let subscribeErr = (
|
||||
await wakuNode.filterSubscribe(
|
||||
some(filterPubsubTopic), filterContentTopic, actualFilterPeer
|
||||
)
|
||||
).errorOr:
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
notice "subscribe request successful."
|
||||
continue
|
||||
|
||||
if subscribeRes.isErr():
|
||||
noFailedSubscribes += 1
|
||||
lpt_service_peer_failure_count.inc(
|
||||
labelValues = ["receiver", actualFilterPeer.getAgent()]
|
||||
)
|
||||
error "Subscribe request failed.",
|
||||
err = subscribeRes.error,
|
||||
peer = actualFilterPeer,
|
||||
failCount = noFailedSubscribes
|
||||
noFailedSubscribes += 1
|
||||
lpt_service_peer_failure_count.inc(
|
||||
labelValues = ["receiver", actualFilterPeer.getAgent()]
|
||||
)
|
||||
error "Subscribe request failed.",
|
||||
err = subscribeErr, peer = actualFilterPeer, failCount = noFailedSubscribes
|
||||
|
||||
# TODO: disconnet from failed actualFilterPeer
|
||||
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
|
||||
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
|
||||
# TODO: disconnet from failed actualFilterPeer
|
||||
# asyncSpawn(wakuNode.peerManager.switch.disconnect(p))
|
||||
# wakunode.peerManager.peerStore.delete(actualFilterPeer)
|
||||
|
||||
if noFailedSubscribes < maxFailedSubscribes:
|
||||
await sleepAsync(2.seconds) # Wait a bit before retrying
|
||||
continue
|
||||
elif not preventPeerSwitch:
|
||||
actualFilterPeer = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
).valueOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
if noFailedSubscribes < maxFailedSubscribes:
|
||||
await sleepAsync(RetryWaitMs) # Wait a bit before retrying
|
||||
elif not preventPeerSwitch:
|
||||
# try again with new peer without delay
|
||||
actualFilterPeer = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
).valueOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
lpt_change_service_peer_count.inc(labelValues = ["receiver"])
|
||||
isFirstPingOnNewPeer = true
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
|
||||
notice "subscribe request successful."
|
||||
noFailedSubscribes = 0
|
||||
lpt_change_service_peer_count.inc(labelValues = ["receiver"])
|
||||
isFirstPingOnNewPeer = true
|
||||
else:
|
||||
info "subscription is live."
|
||||
|
||||
await sleepAsync(30.seconds) # Subscription maintenance interval
|
||||
await sleepAsync(SubscriptionMaintenanceMs)
|
||||
|
||||
proc setupAndListen*(
|
||||
wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo
|
||||
|
||||
@ -11,7 +11,7 @@ import
|
||||
libp2p/wire
|
||||
|
||||
import
|
||||
../wakunode2/cli_args,
|
||||
tools/confutils/cli_args,
|
||||
waku/[
|
||||
common/enr,
|
||||
waku_node,
|
||||
@ -181,7 +181,7 @@ proc pxLookupServiceNode*(
|
||||
if not await futPeers.withTimeout(30.seconds):
|
||||
notice "Cannot get peers from PX", round = 5 - trialCount
|
||||
else:
|
||||
if futPeers.value().isErr():
|
||||
futPeers.value().isOkOr:
|
||||
info "PeerExchange reported error", error = futPeers.read().error
|
||||
return err()
|
||||
|
||||
|
||||
@ -8,6 +8,8 @@ import
|
||||
results,
|
||||
libp2p/peerid
|
||||
|
||||
from std/sugar import `=>`
|
||||
|
||||
import ./tester_message, ./lpt_metrics
|
||||
|
||||
type
|
||||
@ -114,12 +116,7 @@ proc addMessage*(
|
||||
if not self.contains(peerId):
|
||||
self[peerId] = Statistics.init()
|
||||
|
||||
let shortSenderId = block:
|
||||
let senderPeer = PeerId.init(msg.sender)
|
||||
if senderPeer.isErr():
|
||||
msg.sender
|
||||
else:
|
||||
senderPeer.get().shortLog()
|
||||
let shortSenderId = PeerId.init(msg.sender).map(p => p.shortLog()).valueOr(msg.sender)
|
||||
|
||||
discard catch:
|
||||
self[peerId].addMessage(shortSenderId, msg, msgHash)
|
||||
@ -220,10 +217,7 @@ proc echoStat*(self: Statistics, peerId: string) =
|
||||
| {self.missingIndices()} |
|
||||
*------------------------------------------------------------------------------------------*""".fmt()
|
||||
|
||||
if printable.isErr():
|
||||
echo "Error while printing statistics: " & printable.error().msg
|
||||
else:
|
||||
echo printable.get()
|
||||
echo printable.valueOr("Error while printing statistics: " & error.msg)
|
||||
|
||||
proc jsonStat*(self: Statistics): string =
|
||||
let minL, maxL, avgL = self.calcLatency()
|
||||
@ -243,20 +237,18 @@ proc jsonStat*(self: Statistics): string =
|
||||
}},
|
||||
"lostIndices": {self.missingIndices()}
|
||||
}}""".fmt()
|
||||
if json.isErr:
|
||||
return "{\"result:\": \"" & json.error.msg & "\"}"
|
||||
|
||||
return json.get()
|
||||
return json.valueOr("{\"result:\": \"" & error.msg & "\"}")
|
||||
|
||||
proc echoStats*(self: var PerPeerStatistics) =
|
||||
for peerId, stats in self.pairs:
|
||||
let peerLine = catch:
|
||||
"Receiver statistics from peer {peerId}".fmt()
|
||||
if peerLine.isErr:
|
||||
peerLine.isOkOr:
|
||||
echo "Error while printing statistics"
|
||||
else:
|
||||
echo peerLine.get()
|
||||
stats.echoStat(peerId)
|
||||
continue
|
||||
echo peerLine.get()
|
||||
stats.echoStat(peerId)
|
||||
|
||||
proc jsonStats*(self: PerPeerStatistics): string =
|
||||
try:
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
json_serialization/std/options,
|
||||
json_serialization/lexer
|
||||
|
||||
import ../../waku/waku_api/rest/serdes
|
||||
import waku/rest_api/endpoint/serdes
|
||||
|
||||
type ProtocolTesterMessage* = object
|
||||
sender*: string
|
||||
|
||||
@ -443,12 +443,8 @@ proc initAndStartApp(
|
||||
error "failed to add sharded topics to ENR", error = error
|
||||
return err("failed to add sharded topics to ENR: " & $error)
|
||||
|
||||
let recordRes = builder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
return err("cannot build record: " & $recordRes.error)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = builder.build().valueOr:
|
||||
return err("cannot build record: " & $error)
|
||||
|
||||
var nodeBuilder = WakuNodeBuilder.init()
|
||||
|
||||
@ -461,21 +457,15 @@ proc initAndStartApp(
|
||||
relayServiceRatio = "13.33:86.67",
|
||||
shardAware = true,
|
||||
)
|
||||
let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
|
||||
if res.isErr():
|
||||
return err("node building error" & $res.error)
|
||||
nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort).isOkOr:
|
||||
return err("node building error" & $error)
|
||||
|
||||
let nodeRes = nodeBuilder.build()
|
||||
let node =
|
||||
if nodeRes.isErr():
|
||||
return err("node building error" & $res.error)
|
||||
else:
|
||||
nodeRes.get()
|
||||
let node = nodeBuilder.build().valueOr:
|
||||
return err("node building error" & $error)
|
||||
|
||||
var discv5BootstrapEnrsRes = await getBootstrapFromDiscDns(conf)
|
||||
if discv5BootstrapEnrsRes.isErr():
|
||||
var discv5BootstrapEnrs = (await getBootstrapFromDiscDns(conf)).valueOr:
|
||||
error("failed discovering peers from DNS")
|
||||
var discv5BootstrapEnrs = discv5BootstrapEnrsRes.get()
|
||||
quit(QuitFailure)
|
||||
|
||||
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
||||
for enrUri in conf.bootstrapNodes:
|
||||
@ -553,12 +543,10 @@ proc subscribeAndHandleMessages(
|
||||
when isMainModule:
|
||||
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
{.pop.}
|
||||
let confRes = NetworkMonitorConf.loadConfig()
|
||||
if confRes.isErr():
|
||||
error "could not load cli variables", err = confRes.error
|
||||
quit(1)
|
||||
var conf = NetworkMonitorConf.loadConfig().valueOr:
|
||||
error "could not load cli variables", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var conf = confRes.get()
|
||||
info "cli flags", conf = conf
|
||||
|
||||
if conf.clusterId == 1:
|
||||
@ -586,37 +574,30 @@ when isMainModule:
|
||||
|
||||
# start metrics server
|
||||
if conf.metricsServer:
|
||||
let res =
|
||||
startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
|
||||
if res.isErr():
|
||||
error "could not start metrics server", err = res.error
|
||||
quit(1)
|
||||
startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort)).isOkOr:
|
||||
error "could not start metrics server", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
# start rest server for custom metrics
|
||||
let res = startRestApiServer(conf, allPeersInfo, msgPerContentTopic)
|
||||
if res.isErr():
|
||||
error "could not start rest api server", err = res.error
|
||||
quit(1)
|
||||
startRestApiServer(conf, allPeersInfo, msgPerContentTopic).isOkOr:
|
||||
error "could not start rest api server", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
# create a rest client
|
||||
let clientRest =
|
||||
RestClientRef.new(url = "http://ip-api.com", connectTimeout = ctime.seconds(2))
|
||||
if clientRest.isErr():
|
||||
error "could not start rest api client", err = res.error
|
||||
quit(1)
|
||||
let restClient = clientRest.get()
|
||||
let restClient = RestClientRef.new(
|
||||
url = "http://ip-api.com", connectTimeout = ctime.seconds(2)
|
||||
).valueOr:
|
||||
error "could not start rest api client", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
# start waku node
|
||||
let nodeRes = waitFor initAndStartApp(conf)
|
||||
if nodeRes.isErr():
|
||||
error "could not start node"
|
||||
quit 1
|
||||
|
||||
let (node, discv5) = nodeRes.get()
|
||||
let (node, discv5) = (waitFor initAndStartApp(conf)).valueOr:
|
||||
error "could not start node", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
(waitFor node.mountRelay()).isOkOr:
|
||||
error "failed to mount waku relay protocol: ", err = error
|
||||
quit 1
|
||||
error "failed to mount waku relay protocol: ", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
waitFor node.mountLibp2pPing()
|
||||
|
||||
@ -640,12 +621,12 @@ when isMainModule:
|
||||
try:
|
||||
waitFor node.mountRlnRelay(rlnConf)
|
||||
except CatchableError:
|
||||
error "failed to setup RLN", err = getCurrentExceptionMsg()
|
||||
quit 1
|
||||
error "failed to setup RLN", error = getCurrentExceptionMsg()
|
||||
quit(QuitFailure)
|
||||
|
||||
node.mountMetadata(conf.clusterId, conf.shards).isOkOr:
|
||||
error "failed to mount waku metadata protocol: ", err = error
|
||||
quit 1
|
||||
error "failed to mount waku metadata protocol: ", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
for shard in conf.shards:
|
||||
# Subscribe the node to the shards, to count messages
|
||||
|
||||
@ -181,13 +181,10 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
protocols = conf.protocols,
|
||||
logLevel = conf.logLevel
|
||||
|
||||
let peerRes = parsePeerInfo(conf.address)
|
||||
if peerRes.isErr():
|
||||
error "Couldn't parse 'conf.address'", error = peerRes.error
|
||||
let peer = parsePeerInfo(conf.address).valueOr:
|
||||
error "Couldn't parse 'conf.address'", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let peer = peerRes.value
|
||||
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
bindIp = parseIpAddress("0.0.0.0")
|
||||
@ -225,13 +222,9 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
error "could not initialize ENR with shards", error
|
||||
quit(QuitFailure)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
if isWss and
|
||||
(conf.websocketSecureKeyPath.len == 0 or conf.websocketSecureCertPath.len == 0):
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
common/logging,
|
||||
factory/waku,
|
||||
node/health_monitor,
|
||||
waku_api/rest/builder as rest_server_builder,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
waku_core/message/default_values,
|
||||
]
|
||||
|
||||
|
||||
@ -2,7 +2,14 @@
|
||||
library 'status-jenkins-lib@v1.8.17'
|
||||
|
||||
pipeline {
|
||||
agent { label 'linux' }
|
||||
agent {
|
||||
docker {
|
||||
label 'linuxcontainer'
|
||||
image 'harbor.status.im/infra/ci-build-containers:linux-base-1.0.0'
|
||||
args '--volume=/var/run/docker.sock:/var/run/docker.sock ' +
|
||||
'--user jenkins'
|
||||
}
|
||||
}
|
||||
|
||||
options {
|
||||
timestamps()
|
||||
|
||||
@ -2,7 +2,14 @@
|
||||
library 'status-jenkins-lib@v1.8.17'
|
||||
|
||||
pipeline {
|
||||
agent { label 'linux' }
|
||||
agent {
|
||||
docker {
|
||||
label 'linuxcontainer'
|
||||
image 'harbor.status.im/infra/ci-build-containers:linux-base-1.0.0'
|
||||
args '--volume=/var/run/docker.sock:/var/run/docker.sock ' +
|
||||
'--user jenkins'
|
||||
}
|
||||
}
|
||||
|
||||
options {
|
||||
timestamps()
|
||||
|
||||
@ -62,13 +62,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
||||
"Building ENR with relay sharding failed"
|
||||
)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
@ -92,20 +88,18 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
||||
while true:
|
||||
notice "maintaining subscription"
|
||||
# First use filter-ping to check if we have an active subscription
|
||||
let pingRes = await node.wakuFilterClient.ping(filterPeer)
|
||||
if pingRes.isErr():
|
||||
if (await node.wakuFilterClient.ping(filterPeer)).isErr():
|
||||
# No subscription found. Let's subscribe.
|
||||
notice "no subscription found. Sending subscribe request"
|
||||
|
||||
let subscribeRes = await node.wakuFilterClient.subscribe(
|
||||
filterPeer, FilterPubsubTopic, @[FilterContentTopic]
|
||||
)
|
||||
|
||||
if subscribeRes.isErr():
|
||||
notice "subscribe request failed. Quitting.", err = subscribeRes.error
|
||||
(
|
||||
await node.wakuFilterClient.subscribe(
|
||||
filterPeer, FilterPubsubTopic, @[FilterContentTopic]
|
||||
)
|
||||
).isOkOr:
|
||||
notice "subscribe request failed. Quitting.", error = error
|
||||
break
|
||||
else:
|
||||
notice "subscribe request successful."
|
||||
notice "subscribe request successful."
|
||||
else:
|
||||
notice "subscription found."
|
||||
|
||||
|
||||
@ -7,14 +7,14 @@ import
|
||||
confutils,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/curve25519,
|
||||
libp2p/protocols/mix,
|
||||
libp2p/protocols/mix/curve25519,
|
||||
libp2p/multiaddress,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
|
||||
import mix, mix/mix_protocol, mix/curve25519
|
||||
|
||||
import
|
||||
waku/[
|
||||
common/logging,
|
||||
@ -119,7 +119,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.}
|
||||
conn = node.wakuMix.toConnection(
|
||||
MixDestination.init(dPeerId, pxPeerInfo.addrs[0]), # destination lightpush peer
|
||||
WakuLightPushCodec, # protocol codec which will be used over the mix connection
|
||||
Opt.some(MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))),
|
||||
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))),
|
||||
# mix parameters indicating we expect a single reply
|
||||
).valueOr:
|
||||
error "failed to create mix connection", error = error
|
||||
|
||||
@ -54,13 +54,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
"Building ENR with relay sharding failed"
|
||||
)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
|
||||
@ -49,13 +49,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
|
||||
@ -47,13 +47,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create enr record", error = recordRes.error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var builder = WakuNodeBuilder.init()
|
||||
builder.withNodeKey(nodeKey)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import ../../apps/wakunode2/cli_args
|
||||
import tools/confutils/cli_args
|
||||
import waku/[common/logging, factory/[waku, networks_config]]
|
||||
import
|
||||
std/[options, strutils, os, sequtils],
|
||||
@ -18,13 +18,10 @@ proc setup*(): Waku =
|
||||
const versionString = "version / git commit hash: " & waku.git_version
|
||||
let rng = crypto.newRng()
|
||||
|
||||
let confRes = WakuNodeConf.load(version = versionString)
|
||||
if confRes.isErr():
|
||||
error "failure while loading the configuration", error = $confRes.error
|
||||
let conf = WakuNodeConf.load(version = versionString).valueOr:
|
||||
error "failure while loading the configuration", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
var conf = confRes.get()
|
||||
|
||||
let twnNetworkConf = NetworkConf.TheWakuNetworkConf()
|
||||
if len(conf.shards) != 0:
|
||||
conf.pubsubTopics = conf.shards.mapIt(twnNetworkConf.pubsubTopics[it.uint16])
|
||||
|
||||
@ -95,61 +95,54 @@ proc sendResponse*(
|
||||
type SCPHandler* = proc(msg: WakuMessage): Future[void] {.async.}
|
||||
proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler =
|
||||
let handler = proc(msg: WakuMessage): Future[void] {.async.} =
|
||||
let decodedRes = WakuStealthCommitmentMsg.decode(msg.payload)
|
||||
if decodedRes.isErr():
|
||||
error "could not decode scp message"
|
||||
let decoded = decodedRes.get()
|
||||
let decoded = WakuStealthCommitmentMsg.decode(msg.payload).valueOr:
|
||||
error "could not decode scp message", error = error
|
||||
quit(QuitFailure)
|
||||
if decoded.request == false:
|
||||
# check if the generated stealth commitment belongs to the receiver
|
||||
# if not, continue
|
||||
let ephemeralPubKeyRes =
|
||||
deserialize(StealthCommitmentFFI.PublicKey, decoded.ephemeralPubKey.get())
|
||||
if ephemeralPubKeyRes.isErr():
|
||||
error "could not deserialize ephemeral public key: ",
|
||||
err = ephemeralPubKeyRes.error()
|
||||
let ephemeralPubKey = ephemeralPubKeyRes.get()
|
||||
let stealthCommitmentPrivateKeyRes = StealthCommitmentFFI.generateStealthPrivateKey(
|
||||
let ephemeralPubKey = deserialize(
|
||||
StealthCommitmentFFI.PublicKey, decoded.ephemeralPubKey.get()
|
||||
).valueOr:
|
||||
error "could not deserialize ephemeral public key: ", error = error
|
||||
quit(QuitFailure)
|
||||
let stealthCommitmentPrivateKey = StealthCommitmentFFI.generateStealthPrivateKey(
|
||||
ephemeralPubKey,
|
||||
self.spendingKeyPair.privateKey,
|
||||
self.viewingKeyPair.privateKey,
|
||||
decoded.viewTag.get(),
|
||||
)
|
||||
if stealthCommitmentPrivateKeyRes.isErr():
|
||||
info "received stealth commitment does not belong to the receiver: ",
|
||||
err = stealthCommitmentPrivateKeyRes.error()
|
||||
|
||||
let stealthCommitmentPrivateKey = stealthCommitmentPrivateKeyRes.get()
|
||||
).valueOr:
|
||||
error "received stealth commitment does not belong to the receiver: ",
|
||||
error = error
|
||||
quit(QuitFailure)
|
||||
info "received stealth commitment belongs to the receiver: ",
|
||||
stealthCommitmentPrivateKey,
|
||||
stealthCommitmentPubKey = decoded.stealthCommitment.get()
|
||||
return
|
||||
# send response
|
||||
# deseralize the keys
|
||||
let spendingKeyRes =
|
||||
deserialize(StealthCommitmentFFI.PublicKey, decoded.spendingPubKey.get())
|
||||
if spendingKeyRes.isErr():
|
||||
error "could not deserialize spending key: ", err = spendingKeyRes.error()
|
||||
let spendingKey = spendingKeyRes.get()
|
||||
let viewingKeyRes =
|
||||
(deserialize(StealthCommitmentFFI.PublicKey, decoded.viewingPubKey.get()))
|
||||
if viewingKeyRes.isErr():
|
||||
error "could not deserialize viewing key: ", err = viewingKeyRes.error()
|
||||
let viewingKey = viewingKeyRes.get()
|
||||
let spendingKey = deserialize(
|
||||
StealthCommitmentFFI.PublicKey, decoded.spendingPubKey.get()
|
||||
).valueOr:
|
||||
error "could not deserialize spending key: ", error = error
|
||||
quit(QuitFailure)
|
||||
let viewingKey = (
|
||||
deserialize(StealthCommitmentFFI.PublicKey, decoded.viewingPubKey.get())
|
||||
).valueOr:
|
||||
error "could not deserialize viewing key: ", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
info "received spending key", spendingKey
|
||||
info "received viewing key", viewingKey
|
||||
let ephemeralKeyPairRes = StealthCommitmentFFI.generateKeyPair()
|
||||
if ephemeralKeyPairRes.isErr():
|
||||
error "could not generate ephemeral key pair: ", err = ephemeralKeyPairRes.error()
|
||||
let ephemeralKeyPair = ephemeralKeyPairRes.get()
|
||||
let ephemeralKeyPair = StealthCommitmentFFI.generateKeyPair().valueOr:
|
||||
error "could not generate ephemeral key pair: ", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let stealthCommitmentRes = StealthCommitmentFFI.generateStealthCommitment(
|
||||
let stealthCommitment = StealthCommitmentFFI.generateStealthCommitment(
|
||||
spendingKey, viewingKey, ephemeralKeyPair.privateKey
|
||||
)
|
||||
if stealthCommitmentRes.isErr():
|
||||
error "could not generate stealth commitment: ",
|
||||
err = stealthCommitmentRes.error()
|
||||
let stealthCommitment = stealthCommitmentRes.get()
|
||||
).valueOr:
|
||||
error "could not generate stealth commitment: ", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
(
|
||||
await self.sendResponse(
|
||||
@ -157,7 +150,7 @@ proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler =
|
||||
stealthCommitment.viewTag,
|
||||
)
|
||||
).isOkOr:
|
||||
error "could not send response: ", err = $error
|
||||
error "could not send response: ", error = $error
|
||||
|
||||
return handler
|
||||
|
||||
|
||||
@ -96,18 +96,16 @@ proc sendRequestToWakuThread*(
|
||||
deallocShared(req)
|
||||
return err("Couldn't send a request to the waku thread: " & $req[])
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
let fireSync = ctx.reqSignal.fireSync().valueOr:
|
||||
deallocShared(req)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
return err("failed fireSync: " & $error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
if not fireSync:
|
||||
deallocShared(req)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the Waku Thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
ctx.reqReceivedSignal.waitSync(timeout).isOkOr:
|
||||
deallocShared(req)
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
@ -176,9 +174,8 @@ proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} =
|
||||
## Handle the request
|
||||
asyncSpawn WakuThreadRequest.process(request, addr waku)
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "could not fireSync back to requester thread", error = fireRes.error
|
||||
ctx.reqReceivedSignal.fireSync().isOkOr:
|
||||
error "could not fireSync back to requester thread", error = error
|
||||
|
||||
waitFor wakuRun(ctx)
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
../../../waku/discovery/waku_discv5,
|
||||
../../../waku/waku_core/peers,
|
||||
../../../waku/node/waku_node,
|
||||
../../../waku/node/api,
|
||||
../../../waku/node/kernel_api,
|
||||
../../alloc
|
||||
|
||||
type DiscoveryMsgType* = enum
|
||||
|
||||
@ -2,13 +2,15 @@ import std/[options, json, strutils, net]
|
||||
import chronos, chronicles, results, confutils, confutils/std/net
|
||||
|
||||
import
|
||||
../../../waku/node/peer_manager/peer_manager,
|
||||
../../../tools/confutils/cli_args,
|
||||
../../../waku/factory/waku,
|
||||
../../../waku/factory/node_factory,
|
||||
../../../waku/factory/networks_config,
|
||||
../../../waku/factory/app_callbacks,
|
||||
../../../waku/waku_api/rest/builder,
|
||||
waku/node/peer_manager/peer_manager,
|
||||
tools/confutils/cli_args,
|
||||
waku/factory/waku,
|
||||
waku/factory/node_factory,
|
||||
waku/factory/networks_config,
|
||||
waku/factory/app_callbacks,
|
||||
waku/rest_api/endpoint/builder
|
||||
|
||||
import
|
||||
../../alloc
|
||||
|
||||
type NodeLifecycleMsgType* = enum
|
||||
|
||||
@ -44,13 +44,11 @@ proc process*(
|
||||
let pingFuture = ping()
|
||||
let pingRTT: Duration =
|
||||
if self[].timeout == chronos.milliseconds(0): # No timeout expected
|
||||
(await pingFuture).valueOr:
|
||||
return err(error)
|
||||
?(await pingFuture)
|
||||
else:
|
||||
let timedOut = not (await pingFuture.withTimeout(self[].timeout))
|
||||
if timedOut:
|
||||
return err("ping timed out")
|
||||
pingFuture.read().valueOr:
|
||||
return err(error)
|
||||
?(pingFuture.read())
|
||||
|
||||
ok($(pingRTT.nanos))
|
||||
|
||||
@ -8,7 +8,7 @@ import
|
||||
../../../../waku/waku_core/subscription/push_handler,
|
||||
../../../../waku/node/peer_manager/peer_manager,
|
||||
../../../../waku/node/waku_node,
|
||||
../../../../waku/node/api,
|
||||
../../../../waku/node/kernel_api,
|
||||
../../../../waku/waku_core/topics/pubsub_topic,
|
||||
../../../../waku/waku_core/topics/content_topic,
|
||||
../../../alloc
|
||||
|
||||
@ -1,16 +1,18 @@
|
||||
import std/[net, sequtils, strutils]
|
||||
import chronicles, chronos, stew/byteutils, results
|
||||
import
|
||||
../../../../waku/waku_core/message/message,
|
||||
../../../../waku/factory/[validator_signed, waku],
|
||||
../../../../tools/confutils/cli_args,
|
||||
../../../../waku/waku_node,
|
||||
../../../../waku/waku_core/message,
|
||||
../../../../waku/waku_core/time, # Timestamp
|
||||
../../../../waku/waku_core/topics/pubsub_topic,
|
||||
../../../../waku/waku_core/topics,
|
||||
../../../../waku/waku_relay/protocol,
|
||||
../../../../waku/node/peer_manager,
|
||||
waku/waku_core/message/message,
|
||||
waku/factory/[validator_signed, waku],
|
||||
tools/confutils/cli_args,
|
||||
waku/waku_node,
|
||||
waku/waku_core/message,
|
||||
waku/waku_core/time, # Timestamp
|
||||
waku/waku_core/topics/pubsub_topic,
|
||||
waku/waku_core/topics,
|
||||
waku/waku_relay/protocol,
|
||||
waku/node/peer_manager
|
||||
|
||||
import
|
||||
../../../alloc
|
||||
|
||||
type RelayMsgType* = enum
|
||||
|
||||
@ -12,7 +12,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
|
||||
@ -12,7 +12,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
waku_lightpush_legacy,
|
||||
waku_lightpush_legacy/common,
|
||||
waku_lightpush_legacy/protocol_metrics,
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_store_legacy,
|
||||
@ -446,7 +446,7 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
await otherServer.start()
|
||||
let otherServerRemotePeerInfo = otherServer.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When making a history query to the first server node
|
||||
# When making a history query to the first server node
|
||||
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
||||
|
||||
# Then the response contains the messages
|
||||
|
||||
@ -12,7 +12,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
waku_lightpush,
|
||||
waku_rln_relay,
|
||||
],
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
import
|
||||
waku/[
|
||||
waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
discovery/waku_discv5,
|
||||
waku_peer_exchange,
|
||||
node/peer_manager,
|
||||
|
||||
@ -17,7 +17,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
discovery/waku_discv5,
|
||||
waku_filter_v2/common,
|
||||
waku_relay/protocol,
|
||||
|
||||
@ -17,7 +17,7 @@ import
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
common/error_handling,
|
||||
waku_rln_relay,
|
||||
waku_rln_relay/rln,
|
||||
@ -514,7 +514,7 @@ suite "Waku RlnRelay - End to End - OnChain":
|
||||
## Issues
|
||||
### TreeIndex
|
||||
For some reason the calls to `getWakuRlnConfigOnChain` need to be made with `treeIndex` = 0 and 1, in that order.
|
||||
But the registration needs to be made with 1 and 2.
|
||||
But the registration needs to be made with 1 and 2.
|
||||
#### Solutions
|
||||
Requires investigation
|
||||
### Monkeypatching
|
||||
|
||||
@ -16,7 +16,7 @@ import
|
||||
waku_core/topics/sharding,
|
||||
waku_store_legacy/common,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
common/paging,
|
||||
waku_core,
|
||||
waku_store/common,
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/message/digest,
|
||||
@ -486,7 +486,7 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
await otherServer.start()
|
||||
let otherServerRemotePeerInfo = otherServer.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When making a history query to the first server node
|
||||
# When making a history query to the first server node
|
||||
let queryResponse = await client.query(storeQuery, serverRemotePeerInfo)
|
||||
|
||||
# Then the response contains the messages
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import std/[sets, random], results, stew/byteutils, testutils/unittests
|
||||
import waku/waku_core, waku/waku_api/message_cache, ./testlib/wakucore
|
||||
import waku/waku_core, waku/rest_api/message_cache, ./testlib/wakucore
|
||||
|
||||
randomize()
|
||||
|
||||
|
||||
@ -22,7 +22,7 @@ import
|
||||
factory/conf_builder/conf_builder,
|
||||
factory/waku,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/kernel_api,
|
||||
node/peer_manager,
|
||||
],
|
||||
../testlib/[wakucore, testasync, assertions, futures, wakunode, testutils],
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import std/[options, sequtils, json], testutils/unittests, results, chronos
|
||||
|
||||
import
|
||||
waku/node/[peer_manager, waku_node, api],
|
||||
waku/node/[peer_manager, waku_node, kernel_api],
|
||||
waku/waku_core,
|
||||
waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
|
||||
../testlib/[wakucore, testasync, testutils, futures, sequtils, wakunode],
|
||||
|
||||
@ -506,7 +506,7 @@ suite "Waku Sync: reconciliation":
|
||||
let (_, deliveredHash) = await remoteNeeds.get()
|
||||
check deliveredHash in diffMsgHashes
|
||||
|
||||
asyncTest "sync 2 nodes, 40 msgs: 18 in-window diff, 20 out-window ignored":
|
||||
#[ asyncTest "sync 2 nodes, 40 msgs: 17 in-window diff, 20 out-window ignored":
|
||||
server = await newTestWakuRecon(
|
||||
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
|
||||
)
|
||||
@ -515,10 +515,10 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
const
|
||||
diffInWin = 18
|
||||
diffInWin = 17
|
||||
diffOutWin = 20
|
||||
stepOutNs = 100_000_000'u64
|
||||
outOffsetNs = 2_300_000_000'u64 # for 20 mesg they sent 2 seconds earlier
|
||||
outOffsetNs = 3_000_000_000'u64 # for 20 mesg they sent 2 seconds earlier
|
||||
|
||||
randomize()
|
||||
|
||||
@ -572,7 +572,7 @@ suite "Waku Sync: reconciliation":
|
||||
for _ in 0 ..< diffInWin:
|
||||
let (_, deliveredHashes) = await remoteNeeds.popFirst()
|
||||
check deliveredHashes in inWinHashes
|
||||
check deliveredHashes notin outWinHashes
|
||||
check deliveredHashes notin outWinHashes ]#
|
||||
|
||||
asyncTest "hash-fingerprint collision, same timestamp – stable sort":
|
||||
server = await newTestWakuRecon(
|
||||
|
||||
@ -14,12 +14,12 @@ import
|
||||
waku_node,
|
||||
waku_filter_v2/client,
|
||||
node/peer_manager,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/admin/types,
|
||||
waku_api/rest/admin/handlers as admin_api,
|
||||
waku_api/rest/admin/client as admin_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/admin/types,
|
||||
rest_api/endpoint/admin/handlers as admin_rest_interface,
|
||||
rest_api/endpoint/admin/client as admin_rest_client,
|
||||
waku_relay,
|
||||
waku_peer_exchange,
|
||||
],
|
||||
|
||||
@ -11,8 +11,8 @@ import
|
||||
waku/[
|
||||
waku_node,
|
||||
node/waku_node as waku_node2,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/debug/handlers as debug_api,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/debug/handlers as debug_rest_interface,
|
||||
],
|
||||
../testlib/common,
|
||||
../testlib/wakucore,
|
||||
|
||||
@ -12,11 +12,11 @@ import
|
||||
waku_node,
|
||||
node/waku_node as waku_node2,
|
||||
# TODO: Remove after moving `git_version` to the app code.
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/debug/handlers as debug_api,
|
||||
waku_api/rest/debug/client as debug_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/debug/handlers as debug_rest_interface,
|
||||
rest_api/endpoint/debug/client as debug_rest_client,
|
||||
],
|
||||
../testlib/common,
|
||||
../testlib/wakucore,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import results, stew/byteutils, testutils/unittests, json_serialization
|
||||
import waku/waku_api/rest/serdes, waku/waku_api/rest/debug/types
|
||||
import waku/rest_api/endpoint/serdes, waku/rest_api/endpoint/debug/types
|
||||
|
||||
suite "Waku v2 REST API - Debug - serialization":
|
||||
suite "DebugWakuInfo - decode":
|
||||
|
||||
@ -9,21 +9,21 @@ import
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
waku/[
|
||||
waku_api/message_cache,
|
||||
rest_api/message_cache,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/filter/types,
|
||||
waku_api/rest/filter/handlers as filter_api,
|
||||
waku_api/rest/filter/client as filter_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/filter/types,
|
||||
rest_api/endpoint/filter/handlers as filter_rest_interface,
|
||||
rest_api/endpoint/filter/client as filter_rest_client,
|
||||
waku_relay,
|
||||
waku_filter_v2/subscriptions,
|
||||
waku_filter_v2/common,
|
||||
waku_api/rest/relay/handlers as relay_api,
|
||||
waku_api/rest/relay/client as relay_api_client,
|
||||
rest_api/endpoint/relay/handlers as relay_rest_interface,
|
||||
rest_api/endpoint/relay/client as relay_rest_client,
|
||||
],
|
||||
../testlib/wakucore,
|
||||
../testlib/wakunode
|
||||
|
||||
@ -13,11 +13,11 @@ import
|
||||
waku_node,
|
||||
node/waku_node as waku_node2,
|
||||
# TODO: Remove after moving `git_version` to the app code.
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/health/handlers as health_api,
|
||||
waku_api/rest/health/client as health_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/health/handlers as health_rest_interface,
|
||||
rest_api/endpoint/health/client as health_rest_client,
|
||||
waku_rln_relay,
|
||||
node/health_monitor,
|
||||
],
|
||||
|
||||
@ -10,17 +10,17 @@ import
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_api/message_cache,
|
||||
rest_api/message_cache,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
waku_lightpush/common,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/lightpush/types,
|
||||
waku_api/rest/lightpush/handlers as lightpush_api,
|
||||
waku_api/rest/lightpush/client as lightpush_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/lightpush/types,
|
||||
rest_api/endpoint/lightpush/handlers as lightpush_rest_interface,
|
||||
rest_api/endpoint/lightpush/client as lightpush_rest_client,
|
||||
waku_relay,
|
||||
common/rate_limit/setting,
|
||||
],
|
||||
|
||||
@ -10,17 +10,17 @@ import
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_api/message_cache,
|
||||
rest_api/message_cache,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
waku_lightpush_legacy/common,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/legacy_lightpush/types,
|
||||
waku_api/rest/legacy_lightpush/handlers as lightpush_api,
|
||||
waku_api/rest/legacy_lightpush/client as lightpush_api_client,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/legacy_lightpush/types,
|
||||
rest_api/endpoint/legacy_lightpush/handlers as lightpush_rest_interface,
|
||||
rest_api/endpoint/legacy_lightpush/client as lightpush_rest_client,
|
||||
waku_relay,
|
||||
common/rate_limit/setting,
|
||||
],
|
||||
|
||||
@ -12,13 +12,13 @@ import
|
||||
common/base64,
|
||||
waku_core,
|
||||
waku_node,
|
||||
waku_api/message_cache,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/relay/types,
|
||||
waku_api/rest/relay/handlers as relay_api,
|
||||
waku_api/rest/relay/client as relay_api_client,
|
||||
rest_api/message_cache,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/relay/types,
|
||||
rest_api/endpoint/relay/handlers as relay_rest_interface,
|
||||
rest_api/endpoint/relay/client as relay_rest_client,
|
||||
waku_relay,
|
||||
waku_rln_relay,
|
||||
],
|
||||
@ -263,7 +263,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
|
||||
await node.mountRlnRelay(wakuRlnConfig)
|
||||
await node.start()
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
@ -514,7 +514,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
|
||||
await node.mountRlnRelay(wakuRlnConfig)
|
||||
await node.start()
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
@ -586,7 +586,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
await node.mountRlnRelay(wakuRlnConfig)
|
||||
await node.start()
|
||||
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
@ -648,7 +648,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
await node.mountRlnRelay(wakuRlnConfig)
|
||||
await node.start()
|
||||
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
@ -723,7 +723,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
await node.mountRlnRelay(wakuRlnConfig)
|
||||
await node.start()
|
||||
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
# Registration is mandatory before sending messages with rln-relay
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
{.used.}
|
||||
|
||||
import results, stew/byteutils, unittest2, json_serialization
|
||||
import waku/[common/base64, waku_api/rest/serdes, waku_api/rest/relay/types, waku_core]
|
||||
import
|
||||
waku/
|
||||
[common/base64, rest_api/endpoint/serdes, rest_api/endpoint/relay/types, waku_core]
|
||||
|
||||
suite "Waku v2 Rest API - Relay - serialization":
|
||||
suite "RelayWakuMessage - decode":
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
{.used.}
|
||||
|
||||
import results, stew/byteutils, chronicles, unittest2, json_serialization
|
||||
import waku/waku_api/rest/serdes, waku/waku_api/rest/debug/types
|
||||
import waku/rest_api/endpoint/serdes, waku/rest_api/endpoint/debug/types
|
||||
|
||||
# TODO: Decouple this test suite from the `debug_api` module by defining
|
||||
# TODO: Decouple this test suite from the `debug_rest_interface` module by defining
|
||||
# private custom types for this test suite module
|
||||
suite "Waku v2 Rest API - Serdes":
|
||||
suite "decode":
|
||||
|
||||
@ -17,12 +17,12 @@ import
|
||||
waku_core/time,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
waku_api/rest/server,
|
||||
waku_api/rest/client,
|
||||
waku_api/rest/responses,
|
||||
waku_api/rest/store/handlers as store_api,
|
||||
waku_api/rest/store/client as store_api_client,
|
||||
waku_api/rest/store/types,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/client,
|
||||
rest_api/endpoint/responses,
|
||||
rest_api/endpoint/store/handlers as store_rest_interface,
|
||||
rest_api/endpoint/store/client as store_rest_client,
|
||||
rest_api/endpoint/store/types,
|
||||
waku_archive,
|
||||
waku_archive/driver/queue_driver,
|
||||
waku_archive/driver/sqlite_driver,
|
||||
@ -34,7 +34,7 @@ import
|
||||
../testlib/wakunode
|
||||
|
||||
logScope:
|
||||
topics = "waku node rest store_api test"
|
||||
topics = "waku node rest store_rest_interface test"
|
||||
|
||||
proc put(
|
||||
store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
@ -485,7 +485,7 @@ procSuite "Waku Rest API - Store v3":
|
||||
$response.contentType == $MIMETYPE_TEXT
|
||||
response.data.messages.len == 0
|
||||
response.data.statusDesc ==
|
||||
"Failed parsing remote peer info [MultiAddress.init [multiaddress: Invalid MultiAddress, must start with `/`]]"
|
||||
"Failed parsing remote peer info: MultiAddress.init [multiaddress: Invalid MultiAddress, must start with `/`]"
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
|
||||
@ -34,7 +34,9 @@ import
|
||||
|
||||
import ./envvar as confEnvvarDefs, ./envvar_net as confEnvvarNet
|
||||
|
||||
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet, ProtectedShard
|
||||
export
|
||||
confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet, ProtectedShard,
|
||||
DefaultMaxWakuMessageSizeStr
|
||||
|
||||
logScope:
|
||||
topics = "waku cli args"
|
||||
@ -725,12 +727,11 @@ proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
||||
raise newException(
|
||||
ValueError, "Invalid format for protected shard expected shard:publickey"
|
||||
)
|
||||
let publicKey = secp256k1.SkPublicKey.fromHex(elements[1])
|
||||
if publicKey.isErr:
|
||||
let publicKey = secp256k1.SkPublicKey.fromHex(elements[1]).valueOr:
|
||||
raise newException(ValueError, "Invalid public key")
|
||||
|
||||
if isNumber(elements[0]):
|
||||
return ProtectedShard(shard: uint16.parseCmdArg(elements[0]), key: publicKey.get())
|
||||
return ProtectedShard(shard: uint16.parseCmdArg(elements[0]), key: publicKey)
|
||||
|
||||
# TODO: Remove when removing protected-topic configuration
|
||||
let shard = RelayShard.parse(elements[0]).valueOr:
|
||||
@ -738,7 +739,7 @@ proc parseCmdArg*(T: type ProtectedShard, p: string): T =
|
||||
ValueError,
|
||||
"Invalid pubsub topic. Pubsub topics must be in the format /waku/2/rs/<cluster-id>/<shard-id>",
|
||||
)
|
||||
return ProtectedShard(shard: shard.shardId, key: publicKey.get())
|
||||
return ProtectedShard(shard: shard.shardId, key: publicKey)
|
||||
|
||||
proc completeCmdArg*(T: type ProtectedShard, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
@ -31,12 +31,10 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
trace "configuration", conf = $conf
|
||||
|
||||
# 2. generate credentials
|
||||
let credentialRes = membershipKeyGen()
|
||||
if credentialRes.isErr():
|
||||
error "failure while generating credentials", error = credentialRes.error
|
||||
quit(1)
|
||||
let credential = membershipKeyGen().valueOr:
|
||||
error "failure while generating credentials", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let credential = credentialRes.get()
|
||||
info "credentials",
|
||||
idTrapdoor = credential.idTrapdoor.inHex(),
|
||||
idNullifier = credential.idNullifier.inHex(),
|
||||
@ -45,7 +43,7 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
|
||||
if not conf.execute:
|
||||
info "not executing, exiting"
|
||||
quit(0)
|
||||
quit(QuitSuccess)
|
||||
|
||||
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
|
||||
## Action to be taken when an internal error occurs during the node run.
|
||||
@ -66,12 +64,12 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
try:
|
||||
(waitFor groupManager.init()).isOkOr:
|
||||
error "failure while initializing OnchainGroupManager", error = $error
|
||||
quit(1)
|
||||
quit(QuitFailure)
|
||||
# handling the exception is required since waitFor raises an exception
|
||||
except Exception, CatchableError:
|
||||
error "failure while initializing OnchainGroupManager",
|
||||
error = getCurrentExceptionMsg()
|
||||
quit(1)
|
||||
quit(QuitFailure)
|
||||
|
||||
# 4. register on-chain
|
||||
try:
|
||||
@ -79,7 +77,7 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
except Exception, CatchableError:
|
||||
error "failure while registering credentials on-chain",
|
||||
error = getCurrentExceptionMsg()
|
||||
quit(1)
|
||||
quit(QuitFailure)
|
||||
|
||||
info "Transaction hash", txHash = groupManager.registrationTxHash.get()
|
||||
|
||||
@ -99,11 +97,9 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
userMessageLimit: conf.userMessageLimit,
|
||||
)
|
||||
|
||||
let persistRes =
|
||||
addMembershipCredentials(conf.credPath, keystoreCred, conf.credPassword, RLNAppInfo)
|
||||
if persistRes.isErr():
|
||||
error "failed to persist credentials", error = persistRes.error
|
||||
quit(1)
|
||||
addMembershipCredentials(conf.credPath, keystoreCred, conf.credPassword, RLNAppInfo).isOkOr:
|
||||
error "failed to persist credentials", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
info "credentials persisted", path = conf.credPath
|
||||
|
||||
@ -111,5 +107,5 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
waitFor groupManager.stop()
|
||||
except CatchableError:
|
||||
error "failure while stopping OnchainGroupManager", error = getCurrentExceptionMsg()
|
||||
quit(0) # 0 because we already registered on-chain
|
||||
quit(0)
|
||||
quit(QuitSuccess) # 0 because we already registered on-chain
|
||||
quit(QuitSuccess)
|
||||
|
||||
1
vendor/mix
vendored
1
vendor/mix
vendored
@ -1 +0,0 @@
|
||||
Subproject commit 5e95337693ad0787baec1ee25293f454c2d105ca
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit aa8ce46f782240cb99a7222c474022b8cfd24e52
|
||||
Subproject commit 0309685cd27d4bf763c8b3be86a76c33bcfe67ea
|
||||
@ -24,14 +24,13 @@ requires "nim >= 2.2.4",
|
||||
"stew",
|
||||
"stint",
|
||||
"metrics",
|
||||
"libp2p >= 1.13.0",
|
||||
"libp2p >= 1.14.2",
|
||||
"web3",
|
||||
"presto",
|
||||
"regex",
|
||||
"results",
|
||||
"db_connector",
|
||||
"minilru",
|
||||
"https://github.com/vacp2p/mix#0.1.0"
|
||||
"minilru"
|
||||
|
||||
### Helper functions
|
||||
proc buildModule(filePath, params = "", lang = "c"): bool =
|
||||
|
||||
3
waku/api.nim
Normal file
3
waku/api.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import ./api/[api, api_conf, entry_nodes]
|
||||
|
||||
export api, api_conf, entry_nodes
|
||||
@ -63,9 +63,8 @@ proc openDbConn(connString: string): Result[DbConn, string] =
|
||||
return err("exception opening new connection: " & getCurrentExceptionMsg())
|
||||
|
||||
if conn.status != CONNECTION_OK:
|
||||
let checkRes = conn.check()
|
||||
if checkRes.isErr():
|
||||
return err("failed to connect to database: " & checkRes.error)
|
||||
conn.check().isOkOr:
|
||||
return err("failed to connect to database: " & error)
|
||||
|
||||
return err("unknown reason")
|
||||
|
||||
|
||||
@ -174,8 +174,8 @@ proc runStmt*(
|
||||
let len = paramValues.len
|
||||
discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len)
|
||||
|
||||
if res.isErr():
|
||||
return err("failed prepare in runStmt: " & res.error.msg)
|
||||
res.isOkOr:
|
||||
return err("failed prepare in runStmt: " & error.msg)
|
||||
|
||||
pool.conns[connIndex].inclPreparedStmt(stmtName)
|
||||
|
||||
|
||||
@ -265,8 +265,7 @@ proc getPageSize*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
proc handler(s: RawStmtPtr) =
|
||||
size = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_size;", handler)
|
||||
if res.isErr():
|
||||
db.query("PRAGMA page_size;", handler).isOkOr:
|
||||
return err("failed to get page_size")
|
||||
|
||||
return ok(size)
|
||||
@ -277,8 +276,7 @@ proc getFreelistCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA freelist_count;", handler)
|
||||
if res.isErr():
|
||||
db.query("PRAGMA freelist_count;", handler).isOkOr:
|
||||
return err("failed to get freelist_count")
|
||||
|
||||
return ok(count)
|
||||
@ -289,8 +287,7 @@ proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
proc handler(s: RawStmtPtr) =
|
||||
count = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = db.query("PRAGMA page_count;", handler)
|
||||
if res.isErr():
|
||||
db.query("PRAGMA page_count;", handler).isOkOr:
|
||||
return err("failed to get page_count")
|
||||
|
||||
return ok(count)
|
||||
@ -319,8 +316,7 @@ proc gatherSqlitePageStats*(db: SqliteDatabase): DatabaseResult[(int64, int64, i
|
||||
|
||||
proc vacuum*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
## The VACUUM command rebuilds the database file, repacking it into a minimal amount of disk space.
|
||||
let res = db.query("VACUUM;", NoopRowHandler)
|
||||
if res.isErr():
|
||||
db.query("VACUUM;", NoopRowHandler).isOkOr:
|
||||
return err("vacuum failed")
|
||||
|
||||
return ok()
|
||||
@ -339,8 +335,7 @@ proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
|
||||
proc handler(s: ptr sqlite3_stmt) =
|
||||
version = sqlite3_column_int64(s, 0)
|
||||
|
||||
let res = database.query("PRAGMA user_version;", handler)
|
||||
if res.isErr():
|
||||
database.query("PRAGMA user_version;", handler).isOkOr:
|
||||
return err("failed to get user_version")
|
||||
|
||||
ok(version)
|
||||
@ -354,8 +349,7 @@ proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[v
|
||||
##
|
||||
## For more info check: https://www.sqlite.org/pragma.html#pragma_user_version
|
||||
let query = "PRAGMA user_version=" & $version & ";"
|
||||
let res = database.query(query, NoopRowHandler)
|
||||
if res.isErr():
|
||||
database.query(query, NoopRowHandler).isOkOr:
|
||||
return err("failed to set user_version")
|
||||
|
||||
ok()
|
||||
@ -400,11 +394,9 @@ proc filterMigrationScripts(
|
||||
if direction != "" and not script.toLower().endsWith("." & direction & ".sql"):
|
||||
return false
|
||||
|
||||
let scriptVersionRes = getMigrationScriptVersion(script)
|
||||
if scriptVersionRes.isErr():
|
||||
let scriptVersion = getMigrationScriptVersion(script).valueOr:
|
||||
return false
|
||||
|
||||
let scriptVersion = scriptVersionRes.value
|
||||
return lowVersion < scriptVersion and scriptVersion <= highVersion
|
||||
|
||||
paths.filter(filterPredicate)
|
||||
@ -476,10 +468,9 @@ proc migrate*(
|
||||
for statement in script.breakIntoStatements():
|
||||
info "executing migration statement", statement = statement
|
||||
|
||||
let execRes = db.query(statement, NoopRowHandler)
|
||||
if execRes.isErr():
|
||||
db.query(statement, NoopRowHandler).isOkOr:
|
||||
error "failed to execute migration statement",
|
||||
statement = statement, error = execRes.error
|
||||
statement = statement, error = error
|
||||
return err("failed to execute migration statement")
|
||||
|
||||
info "migration statement executed succesfully", statement = statement
|
||||
@ -497,9 +488,8 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
|
||||
info "starting sqlite database vacuuming"
|
||||
|
||||
let resVacuum = db.vacuum()
|
||||
if resVacuum.isErr():
|
||||
return err("failed to execute vacuum: " & resVacuum.error)
|
||||
db.vacuum().isOkOr:
|
||||
return err("failed to execute vacuum: " & error)
|
||||
|
||||
info "finished sqlite database vacuuming"
|
||||
ok()
|
||||
|
||||
@ -65,11 +65,10 @@ func id*(record: TypedRecord): Option[RecordId] =
|
||||
if fieldOpt.isNone():
|
||||
return none(RecordId)
|
||||
|
||||
let fieldRes = toRecordId(fieldOpt.get())
|
||||
if fieldRes.isErr():
|
||||
let field = toRecordId(fieldOpt.get()).valueOr:
|
||||
return none(RecordId)
|
||||
|
||||
some(fieldRes.value)
|
||||
return some(field)
|
||||
|
||||
func secp256k1*(record: TypedRecord): Option[array[33, byte]] =
|
||||
record.tryGet("secp256k1", array[33, byte])
|
||||
|
||||
@ -393,12 +393,11 @@ proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record]
|
||||
if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#':
|
||||
return
|
||||
|
||||
let enrRes = parseBootstrapAddress(bootstrapAddr)
|
||||
if enrRes.isErr():
|
||||
info "ignoring invalid bootstrap address", reason = enrRes.error
|
||||
let enr = parseBootstrapAddress(bootstrapAddr).valueOr:
|
||||
info "ignoring invalid bootstrap address", reason = error
|
||||
return
|
||||
|
||||
bootstrapEnrs.add(enrRes.value)
|
||||
bootstrapEnrs.add(enr)
|
||||
|
||||
proc setupDiscoveryV5*(
|
||||
myENR: enr.Record,
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import chronicles, std/options, results
|
||||
import libp2p/crypto/crypto, libp2p/crypto/curve25519, mix/curve25519
|
||||
import libp2p/crypto/crypto, libp2p/crypto/curve25519, libp2p/protocols/mix/curve25519
|
||||
import ../waku_conf, waku/waku_mix
|
||||
|
||||
logScope:
|
||||
|
||||
@ -29,13 +29,9 @@ proc enrConfiguration*(
|
||||
).isOkOr:
|
||||
return err("could not initialize ENR with shards")
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create record", error = recordRes.error
|
||||
return err($recordRes.error)
|
||||
else:
|
||||
recordRes.get()
|
||||
let record = enrBuilder.build().valueOr:
|
||||
error "failed to create enr record", error = error
|
||||
return err($error)
|
||||
|
||||
return ok(record)
|
||||
|
||||
@ -70,16 +66,13 @@ proc networkConfiguration*(
|
||||
): Future[NetConfigResult] {.async.} =
|
||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||
## actually a supported transport for libp2p traffic.
|
||||
let natRes = setupNat(
|
||||
var (extIp, extTcpPort, _) = setupNat(
|
||||
conf.natStrategy.string,
|
||||
clientId,
|
||||
Port(uint16(conf.p2pTcpPort) + portsShift),
|
||||
Port(uint16(conf.p2pTcpPort) + portsShift),
|
||||
)
|
||||
if natRes.isErr():
|
||||
return err("failed to setup NAT: " & $natRes.error)
|
||||
|
||||
var (extIp, extTcpPort, _) = natRes.get()
|
||||
).valueOr:
|
||||
return err("failed to setup NAT: " & $error)
|
||||
|
||||
let
|
||||
discv5UdpPort =
|
||||
@ -101,12 +94,10 @@ proc networkConfiguration*(
|
||||
# Resolve and use DNS domain IP
|
||||
if conf.dns4DomainName.isSome() and extIp.isNone():
|
||||
try:
|
||||
let dnsRes = await dnsResolve(conf.dns4DomainName.get(), dnsAddrsNameServers)
|
||||
let dns = (await dnsResolve(conf.dns4DomainName.get(), dnsAddrsNameServers)).valueOr:
|
||||
return err($error) # Pass error down the stack
|
||||
|
||||
if dnsRes.isErr():
|
||||
return err($dnsRes.error) # Pass error down the stack
|
||||
|
||||
extIp = some(parseIpAddress(dnsRes.get()))
|
||||
extIp = some(parseIpAddress(dns))
|
||||
except CatchableError:
|
||||
return
|
||||
err("Could not update extIp to resolved DNS IP: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -47,11 +47,10 @@ proc setupPeerStorage(): Result[Option[WakuPeerStorage], string] =
|
||||
|
||||
?peer_store_sqlite_migrations.migrate(db)
|
||||
|
||||
let res = WakuPeerStorage.new(db)
|
||||
if res.isErr():
|
||||
return err("failed to init peer store" & res.error)
|
||||
let res = WakuPeerStorage.new(db).valueOr:
|
||||
return err("failed to init peer store" & error)
|
||||
|
||||
ok(some(res.value))
|
||||
return ok(some(res))
|
||||
|
||||
## Init waku node instance
|
||||
|
||||
@ -167,16 +166,17 @@ proc setupProtocols(
|
||||
if conf.storeServiceConf.isSome():
|
||||
let storeServiceConf = conf.storeServiceConf.get()
|
||||
if storeServiceConf.supportV2:
|
||||
let archiveDriverRes = await legacy_driver.ArchiveDriver.new(
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, storeServiceConf.dbMigration,
|
||||
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
||||
)
|
||||
if archiveDriverRes.isErr():
|
||||
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)
|
||||
let archiveDriver = (
|
||||
await legacy_driver.ArchiveDriver.new(
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum,
|
||||
storeServiceConf.dbMigration, storeServiceConf.maxNumDbConnections,
|
||||
onFatalErrorAction,
|
||||
)
|
||||
).valueOr:
|
||||
return err("failed to setup legacy archive driver: " & error)
|
||||
|
||||
let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
|
||||
if mountArcRes.isErr():
|
||||
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)
|
||||
node.mountLegacyArchive(archiveDriver).isOkOr:
|
||||
return err("failed to mount waku legacy archive protocol: " & error)
|
||||
|
||||
## For now we always mount the future archive driver but if the legacy one is mounted,
|
||||
## then the legacy will be in charge of performing the archiving.
|
||||
@ -189,11 +189,8 @@ proc setupProtocols(
|
||||
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
|
||||
## that we migrate our db according to legacy store's schema to have the extra field
|
||||
|
||||
let engineRes = dburl.getDbEngine(storeServiceConf.dbUrl)
|
||||
if engineRes.isErr():
|
||||
return err("error getting db engine in setupProtocols: " & engineRes.error)
|
||||
|
||||
let engine = engineRes.get()
|
||||
let engine = dburl.getDbEngine(storeServiceConf.dbUrl).valueOr:
|
||||
return err("error getting db engine in setupProtocols: " & error)
|
||||
|
||||
let migrate =
|
||||
if engine == "sqlite" and storeServiceConf.supportV2:
|
||||
@ -201,20 +198,19 @@ proc setupProtocols(
|
||||
else:
|
||||
storeServiceConf.dbMigration
|
||||
|
||||
let archiveDriverRes = await driver.ArchiveDriver.new(
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, migrate,
|
||||
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
||||
)
|
||||
if archiveDriverRes.isErr():
|
||||
return err("failed to setup archive driver: " & archiveDriverRes.error)
|
||||
let archiveDriver = (
|
||||
await driver.ArchiveDriver.new(
|
||||
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, migrate,
|
||||
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
||||
)
|
||||
).valueOr:
|
||||
return err("failed to setup archive driver: " & error)
|
||||
|
||||
let retPolicyRes = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy)
|
||||
if retPolicyRes.isErr():
|
||||
return err("failed to create retention policy: " & retPolicyRes.error)
|
||||
let retPolicy = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy).valueOr:
|
||||
return err("failed to create retention policy: " & error)
|
||||
|
||||
let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
|
||||
if mountArcRes.isErr():
|
||||
return err("failed to mount waku archive protocol: " & mountArcRes.error)
|
||||
node.mountArchive(archiveDriver, retPolicy).isOkOr:
|
||||
return err("failed to mount waku archive protocol: " & error)
|
||||
|
||||
if storeServiceConf.supportV2:
|
||||
# Store legacy setup
|
||||
|
||||
@ -28,9 +28,9 @@ import
|
||||
../node/health_monitor,
|
||||
../node/waku_metrics,
|
||||
../node/delivery_monitor/delivery_monitor,
|
||||
../waku_api/message_cache,
|
||||
../waku_api/rest/server,
|
||||
../waku_api/rest/builder as rest_server_builder,
|
||||
../rest_api/message_cache,
|
||||
../rest_api/endpoint/server,
|
||||
../rest_api/endpoint/builder as rest_server_builder,
|
||||
../waku_archive,
|
||||
../waku_relay/protocol,
|
||||
../discovery/waku_dnsdisc,
|
||||
@ -205,13 +205,11 @@ proc new*(
|
||||
if wakuConf.remoteStoreNode.isNone():
|
||||
return err("A storenode should be set when reliability mode is on")
|
||||
|
||||
let deliveryMonitorRes = DeliveryMonitor.new(
|
||||
let deliveryMonitor = DeliveryMonitor.new(
|
||||
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
|
||||
node.wakuFilterClient,
|
||||
)
|
||||
if deliveryMonitorRes.isErr():
|
||||
return err("could not create delivery monitor: " & $deliveryMonitorRes.error)
|
||||
deliveryMonitor = deliveryMonitorRes.get()
|
||||
).valueOr:
|
||||
return err("could not create delivery monitor: " & $error)
|
||||
|
||||
var waku = Waku(
|
||||
version: git_version,
|
||||
@ -328,16 +326,14 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
await sleepAsync(30.seconds)
|
||||
if waku.conf.dnsDiscoveryConf.isSome():
|
||||
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
|
||||
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
|
||||
)
|
||||
if dynamicBootstrapNodesRes.isErr():
|
||||
error "Retrieving dynamic bootstrap nodes failed",
|
||||
error = dynamicBootstrapNodesRes.error
|
||||
waku[].dynamicBootstrapNodes = (
|
||||
await waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
|
||||
)
|
||||
).valueOr:
|
||||
error "Retrieving dynamic bootstrap nodes failed", error = error
|
||||
continue
|
||||
|
||||
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||
|
||||
if not waku[].wakuDiscv5.isNil():
|
||||
let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes
|
||||
.filterIt(it.hasUdpPort())
|
||||
|
||||
@ -9,7 +9,7 @@ import
|
||||
|
||||
import
|
||||
../waku_rln_relay/rln_relay,
|
||||
../waku_api/rest/builder,
|
||||
../rest_api/endpoint/builder,
|
||||
../discovery/waku_discv5,
|
||||
../node/waku_metrics,
|
||||
../common/logging,
|
||||
@ -38,7 +38,7 @@ type ProtectedShard* {.requiresInit.} = object
|
||||
|
||||
type DnsDiscoveryConf* {.requiresInit.} = object
|
||||
enrTreeUrl*: string
|
||||
# TODO: should probably only have one set of name servers (see dnsaddrs)
|
||||
# TODO: should probably only have one set of name servers (see dnsaddrs)
|
||||
nameServers*: seq[IpAddress]
|
||||
|
||||
type StoreSyncConf* {.requiresInit.} = object
|
||||
|
||||
@ -42,10 +42,9 @@ proc getTxAndTxReceipt(
|
||||
let receiptFuture = eligibilityManager.getMinedTransactionReceipt(txHash)
|
||||
await allFutures(txFuture, receiptFuture)
|
||||
let tx = txFuture.read()
|
||||
let txReceipt = receiptFuture.read()
|
||||
if txReceipt.isErr():
|
||||
return err("Cannot get tx receipt: " & txReceipt.error)
|
||||
return ok((tx, txReceipt.get()))
|
||||
let txReceipt = receiptFuture.read().valueOr:
|
||||
return err("Cannot get tx receipt: " & error)
|
||||
return ok((tx, txReceipt))
|
||||
|
||||
proc isEligibleTxId*(
|
||||
eligibilityManager: EligibilityManager,
|
||||
|
||||
@ -1,9 +0,0 @@
|
||||
import
|
||||
./api/filter as filter_api,
|
||||
./api/lightpush as lightpush_api,
|
||||
./api/store as store_api,
|
||||
./api/relay as relay_api,
|
||||
./api/peer_exchange as peer_exchange_api,
|
||||
./api/ping as ping_api
|
||||
|
||||
export filter_api, lightpush_api, store_api, relay_api, peer_exchange_api, ping_api
|
||||
@ -17,10 +17,8 @@ const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs"
|
||||
proc migrate*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
info "starting peer store's sqlite database migration for sent messages"
|
||||
|
||||
let migrationRes =
|
||||
migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath)
|
||||
if migrationRes.isErr():
|
||||
return err("failed to execute migration scripts: " & migrationRes.error)
|
||||
migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath).isOkOr:
|
||||
return err("failed to execute migration scripts: " & error)
|
||||
|
||||
info "finished peer store's sqlite database migration for sent messages"
|
||||
ok()
|
||||
|
||||
@ -8,7 +8,7 @@ import
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../api,
|
||||
../kernel_api,
|
||||
../../waku_rln_relay,
|
||||
../../waku_relay,
|
||||
../peer_manager,
|
||||
|
||||
9
waku/node/kernel_api.nim
Normal file
9
waku/node/kernel_api.nim
Normal file
@ -0,0 +1,9 @@
|
||||
import
|
||||
./kernel_api/filter as filter_api,
|
||||
./kernel_api/lightpush as lightpush_api,
|
||||
./kernel_api/store as store_api,
|
||||
./kernel_api/relay as relay_api,
|
||||
./kernel_api/peer_exchange as peer_exchange_api,
|
||||
./kernel_api/ping as ping_api
|
||||
|
||||
export filter_api, lightpush_api, store_api, relay_api, peer_exchange_api, ping_api
|
||||
@ -108,13 +108,10 @@ proc filterSubscribe*(
|
||||
error = "waku filter client is not set up"
|
||||
return err(FilterSubscribeError.serviceUnavailable())
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "Couldn't parse the peer info properly", error = remotePeerRes.error
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
error "Couldn't parse the peer info properly", error = error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
if pubsubTopic.isSome():
|
||||
info "registering filter subscription to content",
|
||||
pubsubTopic = pubsubTopic.get(),
|
||||
@ -143,15 +140,11 @@ proc filterSubscribe*(
|
||||
else:
|
||||
# No pubsub topic, autosharding is used to deduce it
|
||||
# but content topics must be well-formed for this
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
let topicMap = node.wakuAutoSharding
|
||||
.get()
|
||||
.getShardsFromContentTopics(contentTopics).valueOr:
|
||||
error "can't get shard", error = error
|
||||
return err(FilterSubscribeError.badResponse("can't get shard"))
|
||||
else:
|
||||
topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for shard, topics in topicMap.pairs:
|
||||
@ -195,13 +188,10 @@ proc filterUnsubscribe*(
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
error "couldn't parse remotePeerInfo", error = error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
if pubsubTopic.isSome():
|
||||
info "deregistering filter subscription to content",
|
||||
pubsubTopic = pubsubTopic.get(),
|
||||
@ -226,15 +216,11 @@ proc filterUnsubscribe*(
|
||||
error "Failed filter un-subscription, pubsub topic must be specified with static sharding"
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
else: # pubsubTopic.isNone
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
let topicMap = node.wakuAutoSharding
|
||||
.get()
|
||||
.getShardsFromContentTopics(contentTopics).valueOr:
|
||||
error "can't get shard", error = error
|
||||
return err(FilterSubscribeError.badResponse("can't get shard"))
|
||||
else:
|
||||
topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for shard, topics in topicMap.pairs:
|
||||
@ -275,13 +261,10 @@ proc filterUnsubscribeAll*(
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
error "couldn't parse remotePeerInfo", error = error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
info "deregistering all filter subscription to content", peer = remotePeer.peerId
|
||||
|
||||
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
|
||||
@ -17,7 +17,7 @@ import
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility,
|
||||
mix
|
||||
libp2p/protocols/mix
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
@ -114,14 +114,8 @@ proc legacyLightpushPublish*(
|
||||
|
||||
if node.wakuAutoSharding.isNone():
|
||||
return err("Pubsub topic must be specified when static sharding is enabled")
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
return err(topicMapRes.error)
|
||||
else:
|
||||
topicMapRes.get()
|
||||
?node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic)
|
||||
|
||||
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
||||
return await internalPublish(node, $pubsub, message, peer)
|
||||
@ -207,10 +201,8 @@ proc lightpushPublishHandler(
|
||||
let conn = node.wakuMix.toConnection(
|
||||
MixDestination.init(peer.peerId, peer.addrs[0]),
|
||||
WakuLightPushCodec,
|
||||
Opt.some(
|
||||
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))
|
||||
# indicating we only want a single path to be used for reply hence numSurbs = 1
|
||||
),
|
||||
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))),
|
||||
# indicating we only want a single path to be used for reply hence numSurbs = 1
|
||||
).valueOr:
|
||||
error "could not create mix connection"
|
||||
return lighpushErrorResult(
|
||||
@ -111,10 +111,9 @@ proc setPeerExchangePeer*(
|
||||
|
||||
info "Set peer-exchange peer", peer = peer
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "could not parse peer info", error = remotePeerRes.error
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
error "could not parse peer info", error = error
|
||||
return
|
||||
|
||||
node.peerManager.addPeer(remotePeerRes.value, PeerExchange)
|
||||
node.peerManager.addPeer(remotePeer, PeerExchange)
|
||||
waku_px_peers.inc()
|
||||
@ -240,11 +240,8 @@ proc mountRlnRelay*(
|
||||
CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay"
|
||||
)
|
||||
|
||||
let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler)
|
||||
if rlnRelayRes.isErr():
|
||||
raise
|
||||
newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
|
||||
let rlnRelay = rlnRelayRes.get()
|
||||
let rlnRelay = (await WakuRlnRelay.new(rlnConf, registrationHandler)).valueOr:
|
||||
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & error)
|
||||
if (rlnConf.userMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit):
|
||||
error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
|
||||
let validator = generateRlnValidator(rlnRelay, spamHandler)
|
||||
@ -87,30 +87,27 @@ proc toArchiveQuery(
|
||||
proc toHistoryResult*(
|
||||
res: waku_archive_legacy.ArchiveResult
|
||||
): legacy_store_common.HistoryResult =
|
||||
if res.isErr():
|
||||
let error = res.error
|
||||
case res.error.kind
|
||||
let response = res.valueOr:
|
||||
case error.kind
|
||||
of waku_archive_legacy.ArchiveErrorKind.DRIVER_ERROR,
|
||||
waku_archive_legacy.ArchiveErrorKind.INVALID_QUERY:
|
||||
err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: res.error.cause))
|
||||
return err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: error.cause))
|
||||
else:
|
||||
err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
|
||||
else:
|
||||
let response = res.get()
|
||||
ok(
|
||||
HistoryResponse(
|
||||
messages: response.messages,
|
||||
cursor: response.cursor.map(
|
||||
proc(cursor: waku_archive_legacy.ArchiveCursor): HistoryCursor =
|
||||
HistoryCursor(
|
||||
pubsubTopic: cursor.pubsubTopic,
|
||||
senderTime: cursor.senderTime,
|
||||
storeTime: cursor.storeTime,
|
||||
digest: cursor.digest,
|
||||
)
|
||||
),
|
||||
)
|
||||
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
|
||||
return ok(
|
||||
HistoryResponse(
|
||||
messages: response.messages,
|
||||
cursor: response.cursor.map(
|
||||
proc(cursor: waku_archive_legacy.ArchiveCursor): HistoryCursor =
|
||||
HistoryCursor(
|
||||
pubsubTopic: cursor.pubsubTopic,
|
||||
senderTime: cursor.senderTime,
|
||||
storeTime: cursor.storeTime,
|
||||
digest: cursor.digest,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
proc mountLegacyStore*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
@ -126,8 +123,7 @@ proc mountLegacyStore*(
|
||||
request: HistoryQuery
|
||||
): Future[legacy_store_common.HistoryResult] {.async.} =
|
||||
if request.cursor.isSome():
|
||||
request.cursor.get().checkHistCursor().isOkOr:
|
||||
return err(error)
|
||||
?request.cursor.get().checkHistCursor()
|
||||
|
||||
let request = request.toArchiveQuery()
|
||||
let response = await node.wakuLegacyArchive.findMessagesV2(request)
|
||||
@ -160,11 +156,8 @@ proc query*(
|
||||
if node.wakuLegacyStoreClient.isNil():
|
||||
return err("waku legacy store client is nil")
|
||||
|
||||
let queryRes = await node.wakuLegacyStoreClient.query(query, peer)
|
||||
if queryRes.isErr():
|
||||
return err("legacy store client query error: " & $queryRes.error)
|
||||
|
||||
let response = queryRes.get()
|
||||
let response = (await node.wakuLegacyStoreClient.query(query, peer)).valueOr:
|
||||
return err("legacy store client query error: " & $error)
|
||||
|
||||
return ok(response)
|
||||
|
||||
@ -201,9 +194,8 @@ when defined(waku_exp_store_resume):
|
||||
if node.wakuLegacyStoreClient.isNil():
|
||||
return
|
||||
|
||||
let retrievedMessages = await node.wakuLegacyStoreClient.resume(peerList)
|
||||
if retrievedMessages.isErr():
|
||||
error "failed to resume store", error = retrievedMessages.error
|
||||
let retrievedMessages = (await node.wakuLegacyStoreClient.resume(peerList)).valueOr:
|
||||
error "failed to resume store", error = error
|
||||
return
|
||||
|
||||
info "the number of retrieved messages since the last online time: ",
|
||||
@ -18,16 +18,14 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult
|
||||
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
|
||||
## points to the directory holding the migrations scripts once the db is updated, it sets the
|
||||
## `user_version` to the `tragetVersion`.
|
||||
##
|
||||
##
|
||||
## If not `targetVersion` is provided, it defaults to `SchemaVersion`.
|
||||
##
|
||||
## NOTE: Down migration it is not currently supported
|
||||
info "starting peer store's sqlite database migration"
|
||||
|
||||
let migrationRes =
|
||||
migrate(db, targetVersion, migrationsScriptsDir = PeerStoreMigrationPath)
|
||||
if migrationRes.isErr():
|
||||
return err("failed to execute migration scripts: " & migrationRes.error)
|
||||
migrate(db, targetVersion, migrationsScriptsDir = PeerStoreMigrationPath).isOkOr:
|
||||
return err("failed to execute migration scripts: " & error)
|
||||
|
||||
info "finished peer store's sqlite database migration"
|
||||
ok()
|
||||
|
||||
@ -67,7 +67,7 @@ proc encode*(remotePeerInfo: RemotePeerInfo): PeerStorageResult[ProtoBuffer] =
|
||||
|
||||
let catchRes = catch:
|
||||
pb.write(4, remotePeerInfo.publicKey)
|
||||
if catchRes.isErr():
|
||||
catchRes.isOkOr:
|
||||
return err("Enncoding public key failed: " & catchRes.error.msg)
|
||||
|
||||
pb.write(5, uint32(ord(remotePeerInfo.connectedness)))
|
||||
@ -154,14 +154,11 @@ method getAll*(
|
||||
let catchRes = catch:
|
||||
db.database.query("SELECT peerId, storedInfo FROM Peer", peer)
|
||||
|
||||
let queryRes =
|
||||
if catchRes.isErr():
|
||||
return err("failed to extract peer from query result: " & catchRes.error.msg)
|
||||
else:
|
||||
catchRes.get()
|
||||
let queryRes = catchRes.valueOr:
|
||||
return err("failed to extract peer from query result: " & catchRes.error.msg)
|
||||
|
||||
if queryRes.isErr():
|
||||
return err("peer storage query failed: " & queryRes.error)
|
||||
queryRes.isOkOr:
|
||||
return err("peer storage query failed: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -22,9 +22,8 @@ import
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility,
|
||||
mix,
|
||||
mix/mix_node,
|
||||
mix/mix_protocol
|
||||
libp2p/protocols/mix,
|
||||
libp2p/protocols/mix/mix_protocol
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
@ -226,8 +225,8 @@ proc mountMetadata*(
|
||||
|
||||
let catchRes = catch:
|
||||
node.switch.mount(node.wakuMetadata, protocolMatcher(WakuMetadataCodec))
|
||||
if catchRes.isErr():
|
||||
return err(catchRes.error.msg)
|
||||
catchRes.isOkOr:
|
||||
return err(error.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
@ -267,8 +266,8 @@ proc mountMix*(
|
||||
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
|
||||
let catchRes = catch:
|
||||
node.switch.mount(node.wakuMix)
|
||||
if catchRes.isErr():
|
||||
return err(catchRes.error.msg)
|
||||
catchRes.isOkOr:
|
||||
return err(error.msg)
|
||||
return ok()
|
||||
|
||||
## Waku Sync
|
||||
@ -301,8 +300,8 @@ proc mountStoreSync*(
|
||||
node.switch.mount(
|
||||
node.wakuStoreReconciliation, protocolMatcher(WakuReconciliationCodec)
|
||||
)
|
||||
if reconMountRes.isErr():
|
||||
return err(reconMountRes.error.msg)
|
||||
reconMountRes.isOkOr:
|
||||
return err(error.msg)
|
||||
|
||||
let transfer = SyncTransfer.new(
|
||||
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
|
||||
@ -312,8 +311,8 @@ proc mountStoreSync*(
|
||||
|
||||
let transMountRes = catch:
|
||||
node.switch.mount(node.wakuStoreTransfer, protocolMatcher(WakuTransferCodec))
|
||||
if transMountRes.isErr():
|
||||
return err(transMountRes.error.msg)
|
||||
transMountRes.isOkOr:
|
||||
return err(error.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -426,14 +426,13 @@ proc installAdminV1GetFilterSubsHandler(router: var RestRouter, node: WakuNode)
|
||||
FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria)
|
||||
)
|
||||
|
||||
let resp = RestApiResponse.jsonResponse(subscriptions, status = Http200)
|
||||
if resp.isErr():
|
||||
error "An error ocurred while building the json respose: ", error = resp.error
|
||||
let resp = RestApiResponse.jsonResponse(subscriptions, status = Http200).valueOr:
|
||||
error "An error ocurred while building the json respose", error = error
|
||||
return RestApiResponse.internalServerError(
|
||||
fmt("An error ocurred while building the json respose: {resp.error}")
|
||||
fmt("An error ocurred while building the json respose: {error}")
|
||||
)
|
||||
|
||||
return resp.get()
|
||||
return resp
|
||||
|
||||
proc installAdminV1PostLogLevelHandler(router: var RestRouter, node: WakuNode) =
|
||||
router.api(MethodPost, ROUTE_ADMIN_V1_POST_LOG_LEVEL) do(
|
||||
@ -5,18 +5,18 @@ import presto
|
||||
import
|
||||
waku/waku_node,
|
||||
waku/discovery/waku_discv5,
|
||||
waku/waku_api/message_cache,
|
||||
waku/waku_api/handlers,
|
||||
waku/waku_api/rest/server,
|
||||
waku/waku_api/rest/debug/handlers as rest_debug_api,
|
||||
waku/waku_api/rest/relay/handlers as rest_relay_api,
|
||||
waku/waku_api/rest/filter/handlers as rest_filter_api,
|
||||
waku/waku_api/rest/legacy_lightpush/handlers as rest_legacy_lightpush_api,
|
||||
waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
|
||||
waku/waku_api/rest/store/handlers as rest_store_api,
|
||||
waku/waku_api/rest/legacy_store/handlers as rest_store_legacy_api,
|
||||
waku/waku_api/rest/health/handlers as rest_health_api,
|
||||
waku/waku_api/rest/admin/handlers as rest_admin_api,
|
||||
waku/rest_api/message_cache,
|
||||
waku/rest_api/handlers,
|
||||
waku/rest_api/endpoint/server,
|
||||
waku/rest_api/endpoint/debug/handlers as rest_debug_endpoint,
|
||||
waku/rest_api/endpoint/relay/handlers as rest_relay_endpoint,
|
||||
waku/rest_api/endpoint/filter/handlers as rest_filter_endpoint,
|
||||
waku/rest_api/endpoint/legacy_lightpush/handlers as rest_legacy_lightpush_endpoint,
|
||||
waku/rest_api/endpoint/lightpush/handlers as rest_lightpush_endpoint,
|
||||
waku/rest_api/endpoint/store/handlers as rest_store_endpoint,
|
||||
waku/rest_api/endpoint/legacy_store/handlers as rest_store_legacy_endpoint,
|
||||
waku/rest_api/endpoint/health/handlers as rest_health_endpoint,
|
||||
waku/rest_api/endpoint/admin/handlers as rest_admin_endpoint,
|
||||
waku/waku_core/topics,
|
||||
waku/waku_relay/protocol
|
||||
|
||||
@ -180,7 +180,7 @@ proc startRestServerProtocolSupport*(
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_filter_api.installFilterRestApiHandlers(
|
||||
rest_filter_endpoint.installFilterRestApiHandlers(
|
||||
router, node, filterCache, filterDiscoHandler
|
||||
)
|
||||
else:
|
||||
@ -193,8 +193,8 @@ proc startRestServerProtocolSupport*(
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_store_api.installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||
rest_store_legacy_api.installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||
rest_store_endpoint.installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||
rest_store_legacy_endpoint.installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||
|
||||
## Light push API
|
||||
## Install it either if client is mounted)
|
||||
@ -208,10 +208,12 @@ proc startRestServerProtocolSupport*(
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_legacy_lightpush_api.installLightPushRequestHandler(
|
||||
rest_legacy_lightpush_endpoint.installLightPushRequestHandler(
|
||||
router, node, lightDiscoHandler
|
||||
)
|
||||
rest_lightpush_endpoint.installLightPushRequestHandler(
|
||||
router, node, lightDiscoHandler
|
||||
)
|
||||
rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler)
|
||||
else:
|
||||
restServerNotInstalledTab["lightpush"] = "/lightpush endpoints are not available."
|
||||
|
||||
@ -15,12 +15,11 @@ const ROUTE_DEBUG_INFOV1 = "/debug/v1/info"
|
||||
proc installDebugInfoV1Handler(router: var RestRouter, node: WakuNode) =
|
||||
let getInfo = proc(): RestApiResponse =
|
||||
let info = node.info().toDebugWakuInfo()
|
||||
let resp = RestApiResponse.jsonResponse(info, status = Http200)
|
||||
if resp.isErr():
|
||||
info "An error occurred while building the json respose", error = resp.error
|
||||
let resp = RestApiResponse.jsonResponse(info, status = Http200).valueOr:
|
||||
info "An error occurred while building the json respose", error = error
|
||||
return RestApiResponse.internalServerError()
|
||||
|
||||
return resp.get()
|
||||
return resp
|
||||
|
||||
# /debug route is deprecated, will be removed
|
||||
router.api(MethodGet, ROUTE_DEBUG_INFOV1) do() -> RestApiResponse:
|
||||
@ -49,15 +49,12 @@ func decodeRequestBody[T](
|
||||
|
||||
let reqBodyData = contentBody.get().data
|
||||
|
||||
let requestResult = decodeFromJsonBytes(T, reqBodyData)
|
||||
if requestResult.isErr():
|
||||
let requestResult = decodeFromJsonBytes(T, reqBodyData).valueOr:
|
||||
return err(
|
||||
RestApiResponse.badRequest(
|
||||
"Invalid content body, could not decode. " & $requestResult.error
|
||||
)
|
||||
RestApiResponse.badRequest("Invalid content body, could not decode. " & $error)
|
||||
)
|
||||
|
||||
return ok(requestResult.get())
|
||||
return ok(requestResult)
|
||||
|
||||
proc getStatusDesc(
|
||||
protocolClientRes: filter_protocol_type.FilterSubscribeResult
|
||||
@ -129,16 +126,15 @@ proc makeRestResponse(
|
||||
httpStatus = convertErrorKindToHttpStatus(protocolClientRes.error().kind)
|
||||
# TODO: convert status codes!
|
||||
|
||||
let resp =
|
||||
RestApiResponse.jsonResponse(filterSubscriptionResponse, status = httpStatus)
|
||||
|
||||
if resp.isErr():
|
||||
error "An error ocurred while building the json respose: ", error = resp.error
|
||||
let resp = RestApiResponse.jsonResponse(
|
||||
filterSubscriptionResponse, status = httpStatus
|
||||
).valueOr:
|
||||
error "An error ocurred while building the json respose: ", error = error
|
||||
return RestApiResponse.internalServerError(
|
||||
fmt("An error ocurred while building the json respose: {resp.error}")
|
||||
fmt("An error ocurred while building the json respose: {error}")
|
||||
)
|
||||
|
||||
return resp.get()
|
||||
return resp
|
||||
|
||||
proc makeRestResponse(
|
||||
requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError
|
||||
@ -149,16 +145,15 @@ proc makeRestResponse(
|
||||
let httpStatus = convertErrorKindToHttpStatus(protocolClientRes.kind)
|
||||
# TODO: convert status codes!
|
||||
|
||||
let resp =
|
||||
RestApiResponse.jsonResponse(filterSubscriptionResponse, status = httpStatus)
|
||||
|
||||
if resp.isErr():
|
||||
error "An error ocurred while building the json respose: ", error = resp.error
|
||||
let resp = RestApiResponse.jsonResponse(
|
||||
filterSubscriptionResponse, status = httpStatus
|
||||
).valueOr:
|
||||
error "An error ocurred while building the json respose: ", error = error
|
||||
return RestApiResponse.internalServerError(
|
||||
fmt("An error ocurred while building the json respose: {resp.error}")
|
||||
fmt("An error ocurred while building the json respose: {error}")
|
||||
)
|
||||
|
||||
return resp.get()
|
||||
return resp
|
||||
|
||||
const NoPeerNoDiscoError = FilterSubscribeError.serviceUnavailable(
|
||||
"No suitable service peer & no discovery method"
|
||||
@ -175,18 +170,14 @@ proc filterPostPutSubscriptionRequestHandler(
|
||||
): Future[RestApiResponse] {.async.} =
|
||||
## handles any filter subscription requests, adds or modifies.
|
||||
|
||||
let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
|
||||
|
||||
if decodedBody.isErr():
|
||||
let req: FilterSubscribeRequest = decodeRequestBody[FilterSubscribeRequest](
|
||||
contentBody
|
||||
).valueOr:
|
||||
return makeRestResponse(
|
||||
"unknown",
|
||||
FilterSubscribeError.badRequest(
|
||||
fmt("Failed to decode request: {decodedBody.error}")
|
||||
),
|
||||
FilterSubscribeError.badRequest(fmt("Failed to decode request: {error}")),
|
||||
)
|
||||
|
||||
let req: FilterSubscribeRequest = decodedBody.value()
|
||||
|
||||
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let handler = discHandler.valueOr:
|
||||
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
|
||||
@ -256,18 +247,14 @@ proc installFilterDeleteSubscriptionsHandler(
|
||||
## Subscribes a node to a list of contentTopics of a PubSub topic
|
||||
info "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
|
||||
|
||||
let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody)
|
||||
|
||||
if decodedBody.isErr():
|
||||
let req: FilterUnsubscribeRequest = decodeRequestBody[FilterUnsubscribeRequest](
|
||||
contentBody
|
||||
).valueOr:
|
||||
return makeRestResponse(
|
||||
"unknown",
|
||||
FilterSubscribeError.badRequest(
|
||||
fmt("Failed to decode request: {decodedBody.error}")
|
||||
),
|
||||
FilterSubscribeError.badRequest(fmt("Failed to decode request: {error}")),
|
||||
)
|
||||
|
||||
let req: FilterUnsubscribeRequest = decodedBody.value()
|
||||
|
||||
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let handler = discHandler.valueOr:
|
||||
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
|
||||
@ -308,18 +295,14 @@ proc installFilterDeleteAllSubscriptionsHandler(
|
||||
## Subscribes a node to a list of contentTopics of a PubSub topic
|
||||
info "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
|
||||
|
||||
let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody)
|
||||
|
||||
if decodedBody.isErr():
|
||||
let req: FilterUnsubscribeAllRequest = decodeRequestBody[
|
||||
FilterUnsubscribeAllRequest
|
||||
](contentBody).valueOr:
|
||||
return makeRestResponse(
|
||||
"unknown",
|
||||
FilterSubscribeError.badRequest(
|
||||
fmt("Failed to decode request: {decodedBody.error}")
|
||||
),
|
||||
FilterSubscribeError.badRequest(fmt("Failed to decode request: {error}")),
|
||||
)
|
||||
|
||||
let req: FilterUnsubscribeAllRequest = decodedBody.value()
|
||||
|
||||
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let handler = discHandler.valueOr:
|
||||
return makeRestResponse(req.requestId, NoPeerNoDiscoError)
|
||||
@ -399,24 +382,20 @@ proc installFilterGetMessagesHandler(
|
||||
## TODO: ability to specify a return message limit, maybe use cursor to control paging response.
|
||||
info "get", ROUTE_FILTER_MESSAGES, contentTopic = contentTopic
|
||||
|
||||
if contentTopic.isErr():
|
||||
let contentTopic = contentTopic.valueOr:
|
||||
return RestApiResponse.badRequest("Missing contentTopic")
|
||||
|
||||
let contentTopic = contentTopic.get()
|
||||
|
||||
let msgRes = cache.getAutoMessages(contentTopic, clear = true)
|
||||
if msgRes.isErr():
|
||||
let msg = cache.getAutoMessages(contentTopic, clear = true).valueOr:
|
||||
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
|
||||
|
||||
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
|
||||
let resp = RestApiResponse.jsonResponse(data, status = Http200)
|
||||
if resp.isErr():
|
||||
error "An error ocurred while building the json respose: ", error = resp.error
|
||||
let data = FilterGetMessagesResponse(msg.map(toFilterWakuMessage))
|
||||
let resp = RestApiResponse.jsonResponse(data, status = Http200).valueOr:
|
||||
error "An error ocurred while building the json respose: ", error = error
|
||||
return RestApiResponse.internalServerError(
|
||||
"An error ocurred while building the json respose"
|
||||
)
|
||||
|
||||
return resp.get()
|
||||
return resp
|
||||
|
||||
proc installFilterRestApiHandlers*(
|
||||
router: var RestRouter,
|
||||
@ -50,12 +50,8 @@ proc installLightPushRequestHandler*(
|
||||
## Send a request to push a waku message
|
||||
info "post", ROUTE_LIGHTPUSH, contentBody
|
||||
|
||||
let decodedBody = decodeRequestBody[PushRequest](contentBody)
|
||||
|
||||
if decodedBody.isErr():
|
||||
return decodedBody.error()
|
||||
|
||||
let req: PushRequest = decodedBody.value()
|
||||
let req: PushRequest = decodeRequestBody[PushRequest](contentBody).valueOr:
|
||||
return error
|
||||
|
||||
let msg = req.message.toWakuMessage().valueOr:
|
||||
return RestApiResponse.badRequest("Invalid message: " & $error)
|
||||
@ -80,12 +76,12 @@ proc installLightPushRequestHandler*(
|
||||
error "Failed to request a message push due to timeout!"
|
||||
return RestApiResponse.serviceUnavailable("Push request timed out")
|
||||
|
||||
if subFut.value().isErr():
|
||||
if subFut.value().error == TooManyRequestsMessage:
|
||||
subFut.value().isOkOr:
|
||||
if error == TooManyRequestsMessage:
|
||||
return RestApiResponse.tooManyRequests("Request rate limmit reached")
|
||||
|
||||
return RestApiResponse.serviceUnavailable(
|
||||
fmt("Failed to request a message push: {subFut.value().error}")
|
||||
fmt("Failed to request a message push: {error}")
|
||||
)
|
||||
|
||||
return RestApiResponse.ok()
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user