mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
Merge branch 'master' into release/v0.37
This commit is contained in:
commit
282a929826
@ -7,7 +7,7 @@ ARG NIM_COMMIT
|
||||
ARG LOG_LEVEL=TRACE
|
||||
|
||||
# Get build tools and required header files
|
||||
RUN apk add --no-cache bash git build-base openssl-dev pcre-dev linux-headers curl jq
|
||||
RUN apk add --no-cache bash git build-base openssl-dev linux-headers curl jq
|
||||
|
||||
WORKDIR /app
|
||||
COPY . .
|
||||
@ -40,14 +40,12 @@ LABEL version="unknown"
|
||||
EXPOSE 30303 60000 8545
|
||||
|
||||
# Referenced in the binary
|
||||
RUN apk add --no-cache libgcc pcre-dev libpq-dev \
|
||||
RUN apk add --no-cache libgcc libpq-dev \
|
||||
wget \
|
||||
iproute2 \
|
||||
python3 \
|
||||
jq
|
||||
|
||||
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
|
||||
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
|
||||
|
||||
COPY --from=nim-build /app/build/lightpush_publisher_mix /usr/bin/
|
||||
RUN chmod +x /usr/bin/lightpush_publisher_mix
|
||||
|
||||
@ -28,10 +28,9 @@ proc benchmark(
|
||||
iter = i, elapsed_ms = (getTime() - start_time).inMilliseconds
|
||||
|
||||
discard await manager.updateRoots()
|
||||
let proofResult = await manager.fetchMerkleProofElements()
|
||||
if proofResult.isErr():
|
||||
error "Failed to fetch Merkle proof", error = proofResult.error
|
||||
manager.merkleProofCache = proofResult.get()
|
||||
manager.merkleProofCache = (await manager.fetchMerkleProofElements()).valueOr:
|
||||
error "Failed to fetch Merkle proof", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let epoch = default(Epoch)
|
||||
debug "epoch in bytes", epochHex = epoch.inHex()
|
||||
|
||||
@ -132,25 +132,14 @@ proc showChatPrompt(c: Chat) =
|
||||
except IOError:
|
||||
discard
|
||||
|
||||
proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] =
|
||||
proc getChatLine(payload: seq[byte]): string =
|
||||
# No payload encoding/encryption from Waku
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
return ok(chatline)
|
||||
let pb = Chat2Message.init(payload).valueOr:
|
||||
return string.fromBytes(payload)
|
||||
return $pb
|
||||
|
||||
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
let chatLine = getChatLine(msg.payload)
|
||||
try:
|
||||
echo &"{chatLine}"
|
||||
except ValueError:
|
||||
@ -173,18 +162,16 @@ proc startMetricsServer(
|
||||
): Result[MetricsHttpServerRef, string] =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr:
|
||||
return err("metrics HTTP server start failed: " & $error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
||||
ok(metricsServerRes.value)
|
||||
ok(server)
|
||||
|
||||
proc publish(c: Chat, line: string) =
|
||||
# First create a Chat2Message protobuf with this line of text
|
||||
@ -202,19 +189,17 @@ proc publish(c: Chat, line: string) =
|
||||
version: 0,
|
||||
timestamp: getNanosecondTime(time),
|
||||
)
|
||||
|
||||
if not isNil(c.node.wakuRlnRelay):
|
||||
# for future version when we support more than one rln protected content topic,
|
||||
# we should check the message content topic as well
|
||||
let appendRes = c.node.wakuRlnRelay.appendRLNProof(message, float64(time))
|
||||
if appendRes.isErr():
|
||||
if c.node.wakuRlnRelay.appendRLNProof(message, float64(time)).isErr():
|
||||
debug "could not append rate limit proof to the message"
|
||||
else:
|
||||
debug "rate limit proof is appended to the message"
|
||||
let decodeRes = RateLimitProof.init(message.proof)
|
||||
if decodeRes.isErr():
|
||||
let proof = RateLimitProof.init(message.proof).valueOr:
|
||||
error "could not decode the RLN proof"
|
||||
|
||||
let proof = decodeRes.get()
|
||||
return
|
||||
# TODO move it to log after dogfooding
|
||||
let msgEpoch = fromEpoch(proof.epoch)
|
||||
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch:
|
||||
@ -438,7 +423,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
let resolved = await dnsResolver.resolveTxt(domain)
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
|
||||
let wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
|
||||
if wakuDnsDiscovery.isOk:
|
||||
let discoveredPeers = await wakuDnsDiscovery.get().findPeers()
|
||||
if discoveredPeers.isOk:
|
||||
@ -446,8 +431,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
discoveredNodes = discoveredPeers.get()
|
||||
echo "Discovered and connecting to " & $discoveredNodes
|
||||
waitFor chat.node.connectToNodes(discoveredNodes)
|
||||
else:
|
||||
warn "Failed to find peers via DNS discovery", error = discoveredPeers.error
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
warn "Failed to init Waku DNS discovery", error = wakuDnsDiscovery.error
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||
@ -483,21 +470,19 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
else:
|
||||
newSeq[byte](0)
|
||||
|
||||
let
|
||||
pb = Chat2Message.init(payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(payload)
|
||||
let chatLine = getChatLine(payload)
|
||||
echo &"{chatLine}"
|
||||
info "Hit store handler"
|
||||
|
||||
let queryRes = await node.query(
|
||||
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
|
||||
)
|
||||
if queryRes.isOk():
|
||||
storeHandler(queryRes.value)
|
||||
block storeQueryBlock:
|
||||
let queryRes = (
|
||||
await node.query(
|
||||
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
|
||||
)
|
||||
).valueOr:
|
||||
error "Store query failed", error = error
|
||||
break storeQueryBlock
|
||||
storeHandler(queryRes)
|
||||
|
||||
# NOTE Must be mounted after relay
|
||||
if conf.lightpushnode != "":
|
||||
@ -511,8 +496,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
error = peerInfo.error
|
||||
|
||||
if conf.filternode != "":
|
||||
let peerInfo = parsePeerInfo(conf.filternode)
|
||||
if peerInfo.isOk():
|
||||
if (let peerInfo = parsePeerInfo(conf.filternode); peerInfo.isErr()):
|
||||
error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error
|
||||
else:
|
||||
await node.mountFilter()
|
||||
await node.mountFilterClient()
|
||||
|
||||
@ -523,8 +509,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
chat.printReceivedMessage(msg)
|
||||
|
||||
# TODO: Here to support FilterV2 relevant subscription.
|
||||
else:
|
||||
error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error
|
||||
|
||||
# Subscribe to a topic, if relay is mounted
|
||||
if conf.relay:
|
||||
@ -545,11 +529,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
proc spamHandler(wakuMessage: WakuMessage) {.gcsafe, closure.} =
|
||||
debug "spam handler is called"
|
||||
let chatLineResult = chat.getChatLine(wakuMessage)
|
||||
if chatLineResult.isOk():
|
||||
echo "A spam message is found and discarded : ", chatLineResult.value
|
||||
else:
|
||||
echo "A spam message is found and discarded"
|
||||
let chatLineResult = getChatLine(wakuMessage.payload)
|
||||
echo "spam message is found and discarded : " & chatLineResult
|
||||
chat.prompt = false
|
||||
showChatPrompt(chat)
|
||||
|
||||
|
||||
@ -136,9 +136,7 @@ proc toMatterbridge(
|
||||
|
||||
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
|
||||
while cmb.running:
|
||||
let getRes = cmb.mbClient.getMessages()
|
||||
|
||||
if getRes.isOk():
|
||||
if (let getRes = cmb.mbClient.getMessages(); getRes.isOk()):
|
||||
for jsonNode in getRes[]:
|
||||
await handler(jsonNode)
|
||||
else:
|
||||
@ -169,9 +167,7 @@ proc new*(
|
||||
let mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
|
||||
|
||||
# Let's verify the Matterbridge configuration before continuing
|
||||
let clientHealth = mbClient.isHealthy()
|
||||
|
||||
if clientHealth.isOk() and clientHealth[]:
|
||||
if mbClient.isHealthy().valueOr(false):
|
||||
info "Reached Matterbridge host", host = mbClient.host
|
||||
else:
|
||||
raise newException(ValueError, "Matterbridge client not reachable/healthy")
|
||||
|
||||
@ -91,7 +91,7 @@ type Chat2MatterbridgeConf* = object
|
||||
name: "filternode"
|
||||
.}: string
|
||||
|
||||
# Matterbridge options
|
||||
# Matterbridge options
|
||||
mbHostAddress* {.
|
||||
desc: "Listening address of the Matterbridge host",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
@ -126,11 +126,9 @@ proc completeCmdArg*(T: type keys.KeyPair, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
||||
let key = SkPrivateKey.init(p)
|
||||
if key.isOk():
|
||||
crypto.PrivateKey(scheme: Secp256k1, skkey: key.get())
|
||||
else:
|
||||
let key = SkPrivateKey.init(p).valueOr:
|
||||
raise newException(ValueError, "Invalid private key")
|
||||
return crypto.PrivateKey(scheme: Secp256k1, skkey: key)
|
||||
|
||||
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
@ -146,25 +146,14 @@ proc showChatPrompt(c: Chat) =
|
||||
except IOError:
|
||||
discard
|
||||
|
||||
proc getChatLine(c: Chat, msg: WakuMessage): Result[string, string] =
|
||||
proc getChatLine(payload: seq[byte]): string =
|
||||
# No payload encoding/encryption from Waku
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
return ok(chatline)
|
||||
let pb = Chat2Message.init(payload).valueOr:
|
||||
return string.fromBytes(payload)
|
||||
return $pb
|
||||
|
||||
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
let
|
||||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(msg.payload)
|
||||
let chatLine = getChatLine(msg.payload)
|
||||
try:
|
||||
echo &"{chatLine}"
|
||||
except ValueError:
|
||||
@ -375,18 +364,17 @@ proc maintainSubscription(
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
)
|
||||
if peerOpt.isOk():
|
||||
actualFilterPeer = peerOpt.get()
|
||||
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
peerOpt.isOkOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
|
||||
actualFilterPeer = peerOpt.get()
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
@ -530,7 +518,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
let resolved = await dnsResolver.resolveTxt(domain)
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
|
||||
let wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver)
|
||||
if wakuDnsDiscovery.isOk:
|
||||
let discoveredPeers = await wakuDnsDiscovery.get().findPeers()
|
||||
if discoveredPeers.isOk:
|
||||
@ -538,8 +526,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
discoveredNodes = discoveredPeers.get()
|
||||
echo "Discovered and connecting to " & $discoveredNodes
|
||||
waitFor chat.node.connectToNodes(discoveredNodes)
|
||||
else:
|
||||
warn "Failed to find peers via DNS discovery", error = discoveredPeers.error
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
warn "Failed to init Waku DNS discovery", error = wakuDnsDiscovery.error
|
||||
|
||||
let peerInfo = node.switch.peerInfo
|
||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||
@ -575,13 +565,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
else:
|
||||
newSeq[byte](0)
|
||||
|
||||
let
|
||||
pb = Chat2Message.init(payload)
|
||||
chatLine =
|
||||
if pb.isOk:
|
||||
pb[].toString()
|
||||
else:
|
||||
string.fromBytes(payload)
|
||||
let chatLine = getChatLine(payload)
|
||||
echo &"{chatLine}"
|
||||
info "Hit store handler"
|
||||
|
||||
|
||||
@ -1,37 +1,33 @@
|
||||
# TESTING IMAGE --------------------------------------------------------------
|
||||
# TESTING IMAGE --------------------------------------------------------------
|
||||
|
||||
## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro.
|
||||
## This is used for faster turnaround time for testing the compiled binary.
|
||||
## Prerequisites: compiled liteprotocoltester binary in build/ directory
|
||||
## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro.
|
||||
## This is used for faster turnaround time for testing the compiled binary.
|
||||
## Prerequisites: compiled liteprotocoltester binary in build/ directory
|
||||
|
||||
FROM ubuntu:noble AS prod
|
||||
FROM ubuntu:noble AS prod
|
||||
|
||||
LABEL maintainer="zoltan@status.im"
|
||||
LABEL source="https://github.com/waku-org/nwaku"
|
||||
LABEL description="Lite Protocol Tester: Waku light-client"
|
||||
LABEL commit="unknown"
|
||||
LABEL version="unknown"
|
||||
LABEL maintainer="zoltan@status.im"
|
||||
LABEL source="https://github.com/waku-org/nwaku"
|
||||
LABEL description="Lite Protocol Tester: Waku light-client"
|
||||
LABEL commit="unknown"
|
||||
LABEL version="unknown"
|
||||
|
||||
# DevP2P, LibP2P, and JSON RPC ports
|
||||
EXPOSE 30303 60000 8545
|
||||
# DevP2P, LibP2P, and JSON RPC ports
|
||||
EXPOSE 30303 60000 8545
|
||||
|
||||
# Referenced in the binary
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libgcc1 \
|
||||
libpcre3 \
|
||||
libpq-dev \
|
||||
wget \
|
||||
iproute2 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
# Referenced in the binary
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libgcc1 \
|
||||
libpq-dev \
|
||||
wget \
|
||||
iproute2 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
|
||||
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
|
||||
COPY build/liteprotocoltester /usr/bin/
|
||||
COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/
|
||||
COPY apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/
|
||||
|
||||
COPY build/liteprotocoltester /usr/bin/
|
||||
COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/
|
||||
COPY apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/
|
||||
ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"]
|
||||
|
||||
ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"]
|
||||
|
||||
# # By default just show help if called without arguments
|
||||
CMD ["--help"]
|
||||
# # By default just show help if called without arguments
|
||||
CMD ["--help"]
|
||||
|
||||
@ -45,9 +45,6 @@ RUN apk add --no-cache libgcc libpq-dev \
|
||||
iproute2 \
|
||||
python3
|
||||
|
||||
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
|
||||
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
|
||||
|
||||
COPY --from=nim-build /app/build/liteprotocoltester /usr/bin/
|
||||
RUN chmod +x /usr/bin/liteprotocoltester
|
||||
|
||||
|
||||
@ -190,25 +190,22 @@ proc publishMessages(
|
||||
)
|
||||
if not preventPeerSwitch and noFailedPush > maxFailedPush:
|
||||
info "Max push failure limit reached, Try switching peer."
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
actualServicePeer = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualServicePeer), WakuLightPushCodec
|
||||
)
|
||||
if peerOpt.isOk():
|
||||
actualServicePeer = peerOpt.get()
|
||||
|
||||
info "New service peer in use",
|
||||
codec = lightpushPubsubTopic,
|
||||
peer = constructMultiaddrStr(actualServicePeer)
|
||||
|
||||
noFailedPush = 0
|
||||
noOfServicePeerSwitches += 1
|
||||
lpt_change_service_peer_count.inc(labelValues = ["publisher"])
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
).valueOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
|
||||
info "New service peer in use",
|
||||
codec = lightpushPubsubTopic,
|
||||
peer = constructMultiaddrStr(actualServicePeer)
|
||||
|
||||
noFailedPush = 0
|
||||
noOfServicePeerSwitches += 1
|
||||
lpt_change_service_peer_count.inc(labelValues = ["publisher"])
|
||||
continue # try again with new peer without delay
|
||||
|
||||
await sleepAsync(messageInterval)
|
||||
|
||||
proc setupAndPublish*(
|
||||
|
||||
@ -89,23 +89,20 @@ proc maintainSubscription(
|
||||
await sleepAsync(2.seconds) # Wait a bit before retrying
|
||||
continue
|
||||
elif not preventPeerSwitch:
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
actualFilterPeer = selectRandomServicePeer(
|
||||
wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec
|
||||
)
|
||||
if peerOpt.isOk():
|
||||
actualFilterPeer = peerOpt.get()
|
||||
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
lpt_change_service_peer_count.inc(labelValues = ["receiver"])
|
||||
isFirstPingOnNewPeer = true
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
).valueOr:
|
||||
error "Failed to find new service peer. Exiting."
|
||||
noFailedServiceNodeSwitches += 1
|
||||
break
|
||||
|
||||
info "Found new peer for codec",
|
||||
codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer)
|
||||
|
||||
noFailedSubscribes = 0
|
||||
lpt_change_service_peer_count.inc(labelValues = ["receiver"])
|
||||
isFirstPingOnNewPeer = true
|
||||
continue # try again with new peer without delay
|
||||
else:
|
||||
if noFailedSubscribes > 0:
|
||||
noFailedSubscribes -= 1
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[net, tables, strutils, times, sequtils, random],
|
||||
std/[net, tables, strutils, times, sequtils, random, sugar],
|
||||
results,
|
||||
chronicles,
|
||||
chronicles/topics_registry,
|
||||
@ -183,16 +183,14 @@ proc setConnectedPeersMetrics(
|
||||
for maddr in peerInfo.addrs:
|
||||
if $maddr notin customPeerInfo.maddrs:
|
||||
customPeerInfo.maddrs.add $maddr
|
||||
let typedRecord = discNode.toTypedRecord()
|
||||
if not typedRecord.isOk():
|
||||
let typedRecord = discNode.toTypedRecord().valueOr:
|
||||
warn "could not convert record to typed record", record = discNode
|
||||
continue
|
||||
if not typedRecord.get().ip.isSome():
|
||||
warn "ip field is not set", record = typedRecord.get()
|
||||
let ipAddr = typedRecord.ip.valueOr:
|
||||
warn "ip field is not set", record = typedRecord
|
||||
continue
|
||||
|
||||
let ip = $typedRecord.get().ip.get().join(".")
|
||||
customPeerInfo.ip = ip
|
||||
customPeerInfo.ip = $ipAddr.join(".")
|
||||
|
||||
# try to ping the peer
|
||||
if shouldReconnect(customPeerInfo):
|
||||
@ -374,14 +372,9 @@ proc retrieveDynamicBootstrapNodes(
|
||||
if resolved.len > 0:
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
||||
if wakuDnsDiscovery.isOk():
|
||||
return (await wakuDnsDiscovery.get().findPeers()).mapErr(
|
||||
proc(e: cstring): string =
|
||||
$e
|
||||
)
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver).errorOr:
|
||||
return (await value.findPeers()).mapErr(e => $e)
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||
@ -391,11 +384,10 @@ proc getBootstrapFromDiscDns(
|
||||
): Future[Result[seq[enr.Record], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
|
||||
let dynamicBootstrapNodesRes =
|
||||
let dynamicBootstrapNodes = (
|
||||
await retrieveDynamicBootstrapNodes(conf.dnsDiscoveryUrl, dnsNameServers)
|
||||
if not dynamicBootstrapNodesRes.isOk():
|
||||
error("failed discovering peers from DNS")
|
||||
let dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||
).valueOr:
|
||||
return err("Failed retrieving dynamic bootstrap nodes: " & $error)
|
||||
|
||||
# select dynamic bootstrap nodes that have an ENR containing a udp port.
|
||||
# Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
|
||||
@ -411,7 +403,7 @@ proc getBootstrapFromDiscDns(
|
||||
discv5BootstrapEnrs.add(enr)
|
||||
return ok(discv5BootstrapEnrs)
|
||||
except CatchableError:
|
||||
error("failed discovering peers from DNS")
|
||||
error("failed discovering peers from DNS: " & getCurrentExceptionMsg())
|
||||
|
||||
proc initAndStartApp(
|
||||
conf: NetworkMonitorConf
|
||||
|
||||
@ -14,12 +14,9 @@ EXPOSE 30303 60000 8545
|
||||
|
||||
# Referenced in the binary
|
||||
RUN apt-get update &&\
|
||||
apt-get install -y libpcre3 libpq-dev curl iproute2 wget jq dnsutils &&\
|
||||
apt-get install -y libpq-dev curl iproute2 wget jq dnsutils &&\
|
||||
apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
|
||||
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
|
||||
|
||||
# Copy to separate location to accomodate different MAKE_TARGET values
|
||||
ADD ./build/$MAKE_TARGET /usr/local/bin/
|
||||
|
||||
|
||||
@ -86,7 +86,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
timestamp: now(),
|
||||
) # current timestamp
|
||||
|
||||
let lightpushPeer = parsePeerInfo(LightpushPeer).get()
|
||||
let lightpushPeer = parsePeerInfo(LightpushPeer).valueOr:
|
||||
error "failed to parse LightpushPeer", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let res = await node.legacyLightpushPublish(
|
||||
some(LightpushPubsubTopic), message, lightpushPeer
|
||||
|
||||
@ -6,6 +6,7 @@ import
|
||||
../../../waku/discovery/waku_discv5,
|
||||
../../../waku/waku_core/peers,
|
||||
../../../waku/node/waku_node,
|
||||
../../../waku/node/api,
|
||||
../../alloc
|
||||
|
||||
type DiscoveryMsgType* = enum
|
||||
|
||||
@ -8,6 +8,7 @@ import
|
||||
../../../../waku/waku_core/subscription/push_handler,
|
||||
../../../../waku/node/peer_manager/peer_manager,
|
||||
../../../../waku/node/waku_node,
|
||||
../../../../waku/node/api,
|
||||
../../../../waku/waku_core/topics/pubsub_topic,
|
||||
../../../../waku/waku_core/topics/content_topic,
|
||||
../../../alloc
|
||||
|
||||
@ -12,6 +12,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
|
||||
@ -12,6 +12,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
waku_lightpush_legacy,
|
||||
waku_lightpush_legacy/common,
|
||||
waku_lightpush_legacy/protocol_metrics,
|
||||
|
||||
@ -6,6 +6,7 @@ import
|
||||
waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_store_legacy,
|
||||
|
||||
@ -8,7 +8,14 @@ import
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
waku/[waku_core, node/peer_manager, node/waku_node, waku_lightpush, waku_rln_relay],
|
||||
waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
waku_lightpush,
|
||||
waku_rln_relay,
|
||||
],
|
||||
../testlib/[wakucore, wakunode, testasync, futures],
|
||||
../resources/payloads,
|
||||
../waku_rln_relay/[rln/waku_rln_relay_utils, utils_onchain]
|
||||
|
||||
@ -12,8 +12,14 @@ import
|
||||
eth/p2p/discoveryv5/enr
|
||||
|
||||
import
|
||||
waku/
|
||||
[waku_node, discovery/waku_discv5, waku_peer_exchange, node/peer_manager, waku_core],
|
||||
waku/[
|
||||
waku_node,
|
||||
node/api,
|
||||
discovery/waku_discv5,
|
||||
waku_peer_exchange,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
],
|
||||
../waku_peer_exchange/utils,
|
||||
../testlib/[wakucore, wakunode, testasync]
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
discovery/waku_discv5,
|
||||
waku_filter_v2/common,
|
||||
waku_relay/protocol,
|
||||
|
||||
@ -17,6 +17,7 @@ import
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/api,
|
||||
common/error_handling,
|
||||
waku_rln_relay,
|
||||
waku_rln_relay/rln,
|
||||
|
||||
@ -16,6 +16,7 @@ import
|
||||
waku_core/topics/sharding,
|
||||
waku_store_legacy/common,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
common/paging,
|
||||
waku_core,
|
||||
waku_store/common,
|
||||
|
||||
@ -6,6 +6,7 @@ import
|
||||
waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/message/digest,
|
||||
|
||||
@ -22,6 +22,7 @@ import
|
||||
factory/conf_builder/conf_builder,
|
||||
factory/waku,
|
||||
node/waku_node,
|
||||
node/api,
|
||||
node/peer_manager,
|
||||
],
|
||||
../testlib/[wakucore, testasync, assertions, futures, wakunode, testutils],
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import std/[options, sequtils, json], testutils/unittests, results, chronos
|
||||
|
||||
import
|
||||
waku/node/[peer_manager, waku_node],
|
||||
waku/node/[peer_manager, waku_node, api],
|
||||
waku/waku_core,
|
||||
waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
|
||||
../testlib/[wakucore, testasync, testutils, futures, sequtils, wakunode],
|
||||
|
||||
@ -17,7 +17,6 @@ import
|
||||
waku_peer_exchange/rpc_codec,
|
||||
waku_peer_exchange/protocol,
|
||||
waku_peer_exchange/client,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
common/enr/builder,
|
||||
waku_enr/sharding,
|
||||
|
||||
@ -20,6 +20,7 @@ requires "nim >= 2.2.4",
|
||||
"json_rpc",
|
||||
"libbacktrace",
|
||||
"nimcrypto",
|
||||
"serialization",
|
||||
"stew",
|
||||
"stint",
|
||||
"metrics",
|
||||
@ -30,7 +31,6 @@ requires "nim >= 2.2.4",
|
||||
"results",
|
||||
"db_connector",
|
||||
"minilru",
|
||||
"quic",
|
||||
"https://github.com/vacp2p/mix#0.1.0"
|
||||
|
||||
### Helper functions
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options
|
||||
import chronos/timer
|
||||
import metrics, setting
|
||||
|
||||
export metrics
|
||||
|
||||
@ -421,13 +421,11 @@ proc setupDiscoveryV5*(
|
||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||
|
||||
for enr in discv5BootstrapEnrs:
|
||||
let peerInfoRes = enr.toRemotePeerInfo()
|
||||
if peerInfoRes.isOk():
|
||||
nodePeerManager.addPeer(peerInfoRes.get(), PeerOrigin.Discv5)
|
||||
else:
|
||||
let peerInfo = enr.toRemotePeerInfo().valueOr:
|
||||
debug "could not convert discv5 bootstrap node to peerInfo, not adding peer to Peer Store",
|
||||
enr = enr.toUri(), error = peerInfoRes.error
|
||||
|
||||
enr = enr.toUri(), error = error
|
||||
continue
|
||||
nodePeerManager.addPeer(peerInfo, PeerOrigin.Discv5)
|
||||
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
||||
|
||||
let discv5Config =
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459
|
||||
|
||||
import
|
||||
std/[options, net, sequtils],
|
||||
std/[options, net, sequtils, sugar],
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
@ -67,14 +67,11 @@ proc findPeers*(
|
||||
|
||||
for enr in discoveredEnr:
|
||||
# Convert discovered ENR to RemotePeerInfo and add to discovered nodes
|
||||
let res = enr.toRemotePeerInfo()
|
||||
|
||||
if res.isOk():
|
||||
discoveredNodes.add(res.get())
|
||||
else:
|
||||
error "Failed to convert ENR to peer info", enr = $enr, err = res.error()
|
||||
let peerInfo = enr.toRemotePeerInfo().valueOr:
|
||||
error "Failed to convert ENR to peer info", enr = $enr, error = error
|
||||
waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"])
|
||||
|
||||
continue
|
||||
discoveredNodes.add(peerInfo)
|
||||
if discoveredNodes.len > 0:
|
||||
info "Successfully discovered nodes", count = discoveredNodes.len
|
||||
waku_dnsdisc_discovered.inc(discoveredNodes.len.int64)
|
||||
@ -117,14 +114,9 @@ proc retrieveDynamicBootstrapNodes*(
|
||||
if resolved.len > 0:
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
||||
if wakuDnsDiscovery.isOk():
|
||||
return (await wakuDnsDiscovery.get().findPeers()).mapErr(
|
||||
proc(e: cstring): string =
|
||||
$e
|
||||
)
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver).errorOr:
|
||||
return (await value.findPeers()).mapErr(e => $e)
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||
|
||||
@ -251,21 +251,15 @@ proc setupProtocols(
|
||||
|
||||
mountStoreClient(node)
|
||||
if conf.remoteStoreNode.isSome():
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get())
|
||||
if storeNode.isOk():
|
||||
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
|
||||
else:
|
||||
return err("failed to set node waku store peer: " & storeNode.error)
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
||||
return err("failed to set node waku store peer: " & error)
|
||||
node.peerManager.addServicePeer(storeNode, WakuStoreCodec)
|
||||
|
||||
mountLegacyStoreClient(node)
|
||||
if conf.remoteStoreNode.isSome():
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get())
|
||||
if storeNode.isOk():
|
||||
node.peerManager.addServicePeer(
|
||||
storeNode.value, legacy_common.WakuLegacyStoreCodec
|
||||
)
|
||||
else:
|
||||
return err("failed to set node waku legacy store peer: " & storeNode.error)
|
||||
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
||||
return err("failed to set node waku legacy store peer: " & error)
|
||||
node.peerManager.addServicePeer(storeNode, WakuLegacyStoreCodec)
|
||||
|
||||
if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume:
|
||||
node.setupStoreResume()
|
||||
@ -377,12 +371,10 @@ proc setupProtocols(
|
||||
mountLightPushClient(node)
|
||||
mountLegacyLightPushClient(node)
|
||||
if conf.remoteLightPushNode.isSome():
|
||||
let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get())
|
||||
if lightPushNode.isOk():
|
||||
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
|
||||
node.peerManager.addServicePeer(lightPushNode.value, WakuLegacyLightPushCodec)
|
||||
else:
|
||||
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
|
||||
let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get()).valueOr:
|
||||
return err("failed to set node waku lightpush peer: " & error)
|
||||
node.peerManager.addServicePeer(lightPushNode, WakuLightPushCodec)
|
||||
node.peerManager.addServicePeer(lightPushNode, WakuLegacyLightPushCodec)
|
||||
|
||||
# Filter setup. NOTE Must be mounted after relay
|
||||
if conf.filterServiceConf.isSome():
|
||||
@ -400,16 +392,13 @@ proc setupProtocols(
|
||||
|
||||
await node.mountFilterClient()
|
||||
if conf.remoteFilterNode.isSome():
|
||||
let filterNode = parsePeerInfo(conf.remoteFilterNode.get())
|
||||
if filterNode.isOk():
|
||||
try:
|
||||
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
|
||||
except CatchableError:
|
||||
return err(
|
||||
"failed to mount waku filter client protocol: " & getCurrentExceptionMsg()
|
||||
)
|
||||
else:
|
||||
return err("failed to set node waku filter peer: " & filterNode.error)
|
||||
let filterNode = parsePeerInfo(conf.remoteFilterNode.get()).valueOr:
|
||||
return err("failed to set node waku filter peer: " & error)
|
||||
try:
|
||||
node.peerManager.addServicePeer(filterNode, WakuFilterSubscribeCodec)
|
||||
except CatchableError:
|
||||
return
|
||||
err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
# waku peer exchange setup
|
||||
if conf.peerExchangeService:
|
||||
@ -422,12 +411,9 @@ proc setupProtocols(
|
||||
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
if conf.remotePeerExchangeNode.isSome():
|
||||
let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get())
|
||||
if peerExchangeNode.isOk():
|
||||
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
|
||||
else:
|
||||
return
|
||||
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||
let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()).valueOr:
|
||||
return err("failed to set node waku peer-exchange peer: " & error)
|
||||
node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec)
|
||||
|
||||
if conf.peerExchangeDiscovery:
|
||||
await node.mountPeerExchangeClient()
|
||||
@ -481,7 +467,7 @@ proc startNode*(
|
||||
error "error while fetching peers from peer exchange", error = error
|
||||
|
||||
# TODO: behavior described by comment is undesired. PX as client should be used in tandem with discv5.
|
||||
#
|
||||
#
|
||||
# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
|
||||
# periodic loop to find peers and px returned peers actually come from discv5
|
||||
if conf.peerExchangeDiscovery and not conf.discv5Conf.isSome():
|
||||
|
||||
@ -67,9 +67,8 @@ proc addSignedShardsValidator*(
|
||||
if msg.timestamp != 0:
|
||||
if msg.withinTimeWindow():
|
||||
let msgHash = SkMessage(topic.msgHash(msg))
|
||||
let recoveredSignature = SkSignature.fromRaw(msg.meta)
|
||||
if recoveredSignature.isOk():
|
||||
if recoveredSignature.get.verify(msgHash, protectedShard.key):
|
||||
SkSignature.fromRaw(msg.meta).isErrOr:
|
||||
if value.verify(msgHash, protectedShard.key):
|
||||
outcome = errors.ValidationResult.Accept
|
||||
|
||||
if outcome != errors.ValidationResult.Accept:
|
||||
|
||||
9
waku/node/api.nim
Normal file
9
waku/node/api.nim
Normal file
@ -0,0 +1,9 @@
|
||||
import
|
||||
./api/filter as filter_api,
|
||||
./api/lightpush as lightpush_api,
|
||||
./api/store as store_api,
|
||||
./api/relay as relay_api,
|
||||
./api/peer_exchange as peer_exchange_api,
|
||||
./api/ping as ping_api
|
||||
|
||||
export filter_api, lightpush_api, store_api, relay_api, peer_exchange_api, ping_api
|
||||
297
waku/node/api/filter.nim
Normal file
297
waku/node/api/filter.nim
Normal file
@ -0,0 +1,297 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sugar, tables, sequtils, os, net],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
nimcrypto,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_core,
|
||||
../../waku_core/topics/sharding,
|
||||
../../waku_filter_v2,
|
||||
../../waku_filter_v2/client as filter_client,
|
||||
../../waku_filter_v2/subscriptions as filter_subscriptions,
|
||||
../../common/rate_limit/setting,
|
||||
../peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku node filter api"
|
||||
|
||||
## Waku filter
|
||||
|
||||
proc mountFilter*(
|
||||
node: WakuNode,
|
||||
subscriptionTimeout: Duration =
|
||||
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
|
||||
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
|
||||
rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit,
|
||||
) {.async: (raises: []).} =
|
||||
## Mounting filter v2 protocol
|
||||
|
||||
info "mounting filter protocol"
|
||||
node.wakuFilter = WakuFilter.new(
|
||||
node.peerManager,
|
||||
subscriptionTimeout,
|
||||
maxFilterPeers,
|
||||
maxFilterCriteriaPerPeer,
|
||||
messageCacheTTL,
|
||||
some(rateLimitSetting),
|
||||
)
|
||||
|
||||
try:
|
||||
await node.wakuFilter.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuFilter", error = getCurrentExceptionMsg()
|
||||
|
||||
try:
|
||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuFilter", error = getCurrentExceptionMsg()
|
||||
|
||||
proc filterHandleMessage*(
|
||||
node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
) {.async.} =
|
||||
if node.wakuFilter.isNil():
|
||||
error "cannot handle filter message", error = "waku filter is required"
|
||||
return
|
||||
|
||||
await node.wakuFilter.handleMessage(pubsubTopic, message)
|
||||
|
||||
proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} =
|
||||
## Mounting both filter
|
||||
## Giving option for application level to choose btw own push message handling or
|
||||
## rely on node provided cache. - This only applies for v2 filter client
|
||||
info "mounting filter client"
|
||||
|
||||
if not node.wakuFilterClient.isNil():
|
||||
trace "Filter client already mounted."
|
||||
return
|
||||
|
||||
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
|
||||
|
||||
try:
|
||||
await node.wakuFilterClient.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuFilterClient", error = getCurrentExceptionMsg()
|
||||
|
||||
try:
|
||||
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuFilterClient", error = getCurrentExceptionMsg()
|
||||
|
||||
proc filterSubscribe*(
|
||||
node: WakuNode,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
peer: RemotePeerInfo | string,
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
if node.wakuFilterClient.isNil():
|
||||
error "cannot register filter subscription to topic",
|
||||
error = "waku filter client is not set up"
|
||||
return err(FilterSubscribeError.serviceUnavailable())
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "Couldn't parse the peer info properly", error = remotePeerRes.error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
if pubsubTopic.isSome():
|
||||
info "registering filter subscription to content",
|
||||
pubsubTopic = pubsubTopic.get(),
|
||||
contentTopics = contentTopics,
|
||||
peer = remotePeer.peerId
|
||||
|
||||
when (contentTopics is ContentTopic):
|
||||
let contentTopics = @[contentTopics]
|
||||
let subRes = await node.wakuFilterClient.subscribe(
|
||||
remotePeer, pubsubTopic.get(), contentTopics
|
||||
)
|
||||
if subRes.isOk():
|
||||
info "v2 subscribed to topic",
|
||||
pubsubTopic = pubsubTopic, contentTopics = contentTopics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic.get()))
|
||||
else:
|
||||
error "failed filter v2 subscription", error = subRes.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
|
||||
return subRes
|
||||
elif node.wakuAutoSharding.isNone():
|
||||
error "Failed filter subscription, pubsub topic must be specified with static sharding"
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
else:
|
||||
# No pubsub topic, autosharding is used to deduce it
|
||||
# but content topics must be well-formed for this
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
return err(FilterSubscribeError.badResponse("can't get shard"))
|
||||
else:
|
||||
topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for shard, topics in topicMap.pairs:
|
||||
info "registering filter subscription to content",
|
||||
shard = shard, contentTopics = topics, peer = remotePeer.peerId
|
||||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClient.subscribe(remotePeer, $shard, content)
|
||||
|
||||
var subRes: FilterSubscribeResult = FilterSubscribeResult.ok()
|
||||
try:
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter subscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
subRes = FilterSubscribeResult.err(res.error)
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: $pubsub))
|
||||
except CatchableError:
|
||||
let errMsg = "exception in filterSubscribe: " & getCurrentExceptionMsg()
|
||||
error "exception in filterSubscribe", error = getCurrentExceptionMsg()
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
subRes =
|
||||
FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg))
|
||||
|
||||
# return the last error or ok
|
||||
return subRes
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
node: WakuNode,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: ContentTopic | seq[ContentTopic],
|
||||
peer: RemotePeerInfo | string,
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
if pubsubTopic.isSome():
|
||||
info "deregistering filter subscription to content",
|
||||
pubsubTopic = pubsubTopic.get(),
|
||||
contentTopics = contentTopics,
|
||||
peer = remotePeer.peerId
|
||||
|
||||
let unsubRes = await node.wakuFilterClient.unsubscribe(
|
||||
remotePeer, pubsubTopic.get(), contentTopics
|
||||
)
|
||||
if unsubRes.isOk():
|
||||
info "unsubscribed from topic",
|
||||
pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic.get()))
|
||||
else:
|
||||
error "failed filter unsubscription", error = unsubRes.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
|
||||
return unsubRes
|
||||
elif node.wakuAutoSharding.isNone():
|
||||
error "Failed filter un-subscription, pubsub topic must be specified with static sharding"
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
else: # pubsubTopic.isNone
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
return err(FilterSubscribeError.badResponse("can't get shard"))
|
||||
else:
|
||||
topicMapRes.get()
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for shard, topics in topicMap.pairs:
|
||||
info "deregistering filter subscription to content",
|
||||
shard = shard, contentTopics = topics, peer = remotePeer.peerId
|
||||
let content = topics.mapIt($it)
|
||||
node.wakuFilterClient.unsubscribe(remotePeer, $shard, content)
|
||||
|
||||
var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok()
|
||||
try:
|
||||
let finished = await allFinished(futures)
|
||||
|
||||
for fut in finished:
|
||||
let res = fut.read()
|
||||
|
||||
if res.isErr():
|
||||
error "failed filter unsubscription", error = res.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
unsubRes = FilterSubscribeResult.err(res.error)
|
||||
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics
|
||||
|
||||
# Purpose is to update Waku Metadata
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: $pubsub))
|
||||
except CatchableError:
|
||||
let errMsg = "exception in filterUnsubscribe: " & getCurrentExceptionMsg()
|
||||
error "exception in filterUnsubscribe", error = getCurrentExceptionMsg()
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
unsubRes =
|
||||
FilterSubscribeResult.err(FilterSubscribeError.serviceUnavailable(errMsg))
|
||||
|
||||
# return the last error or ok
|
||||
return unsubRes
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
node: WakuNode, peer: RemotePeerInfo | string
|
||||
): Future[FilterSubscribeResult] {.async: (raises: []).} =
|
||||
## Unsubscribe from a content filter V2".
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "couldn't parse remotePeerInfo", error = remotePeerRes.error
|
||||
return err(FilterSubscribeError.serviceUnavailable("No peers available"))
|
||||
|
||||
let remotePeer = remotePeerRes.value
|
||||
|
||||
info "deregistering all filter subscription to content", peer = remotePeer.peerId
|
||||
|
||||
let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer)
|
||||
if unsubRes.isOk():
|
||||
info "unsubscribed from all content-topic", peerId = remotePeer.peerId
|
||||
else:
|
||||
error "failed filter unsubscription from all content-topic", error = unsubRes.error
|
||||
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
|
||||
|
||||
return unsubRes
|
||||
|
||||
# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated
|
||||
# yet incompatible to handle both type of filters - use specific filter registration instead
|
||||
288
waku/node/api/lightpush.nim
Normal file
288
waku/node/api/lightpush.nim
Normal file
@ -0,0 +1,288 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[hashes, options, tables, net],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility,
|
||||
mix
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_core,
|
||||
../../waku_core/topics/sharding,
|
||||
../../waku_lightpush_legacy/client as legacy_lightpush_client,
|
||||
../../waku_lightpush_legacy as legacy_lightpush_protocol,
|
||||
../../waku_lightpush/client as lightpush_client,
|
||||
../../waku_lightpush as lightpush_protocol,
|
||||
../peer_manager,
|
||||
../../common/rate_limit/setting,
|
||||
../../waku_rln_relay
|
||||
|
||||
logScope:
|
||||
topics = "waku node lightpush api"
|
||||
|
||||
## Waku lightpush
|
||||
proc mountLegacyLightPush*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
) {.async.} =
|
||||
info "mounting legacy light push"
|
||||
|
||||
let pushHandler =
|
||||
if node.wakuRelay.isNil:
|
||||
debug "mounting legacy lightpush without relay (nil)"
|
||||
legacy_lightpush_protocol.getNilPushHandler()
|
||||
else:
|
||||
debug "mounting legacy lightpush with relay"
|
||||
let rlnPeer =
|
||||
if isNil(node.wakuRlnRelay):
|
||||
debug "mounting legacy lightpush without rln-relay"
|
||||
none(WakuRLNRelay)
|
||||
else:
|
||||
debug "mounting legacy lightpush with rln-relay"
|
||||
some(node.wakuRlnRelay)
|
||||
legacy_lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer)
|
||||
|
||||
node.wakuLegacyLightPush =
|
||||
WakuLegacyLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start lightpush too.
|
||||
await node.wakuLegacyLightPush.start()
|
||||
|
||||
node.switch.mount(node.wakuLegacyLightPush, protocolMatcher(WakuLegacyLightPushCodec))
|
||||
|
||||
proc mountLegacyLightPushClient*(node: WakuNode) =
|
||||
info "mounting legacy light push client"
|
||||
|
||||
if node.wakuLegacyLightpushClient.isNil():
|
||||
node.wakuLegacyLightpushClient =
|
||||
WakuLegacyLightPushClient.new(node.peerManager, node.rng)
|
||||
|
||||
proc legacyLightpushPublish*(
|
||||
node: WakuNode,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo,
|
||||
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||
## Returns whether relaying was successful or not.
|
||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||
## functionality.
|
||||
if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil():
|
||||
error "failed to publish message as legacy lightpush not available"
|
||||
return err("Waku lightpush not available")
|
||||
|
||||
let internalPublish = proc(
|
||||
node: WakuNode,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo,
|
||||
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.async, gcsafe.} =
|
||||
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
if not node.wakuLegacyLightpushClient.isNil():
|
||||
notice "publishing message with legacy lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msgHash
|
||||
return await node.wakuLegacyLightpushClient.publish(pubsubTopic, message, peer)
|
||||
|
||||
if not node.wakuLegacyLightPush.isNil():
|
||||
notice "publishing message with self hosted legacy lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msgHash
|
||||
return
|
||||
await node.wakuLegacyLightPush.handleSelfLightPushRequest(pubsubTopic, message)
|
||||
try:
|
||||
if pubsubTopic.isSome():
|
||||
return await internalPublish(node, pubsubTopic.get(), message, peer)
|
||||
|
||||
if node.wakuAutoSharding.isNone():
|
||||
return err("Pubsub topic must be specified when static sharding is enabled")
|
||||
let topicMapRes =
|
||||
node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic)
|
||||
|
||||
let topicMap =
|
||||
if topicMapRes.isErr():
|
||||
return err(topicMapRes.error)
|
||||
else:
|
||||
topicMapRes.get()
|
||||
|
||||
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
|
||||
return await internalPublish(node, $pubsub, message, peer)
|
||||
except CatchableError:
|
||||
return err(getCurrentExceptionMsg())
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc legacyLightpushPublish*(
|
||||
node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage
|
||||
): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {.
|
||||
async, gcsafe, deprecated: "Use 'node.legacyLightpushPublish()' instead"
|
||||
.} =
|
||||
if node.wakuLegacyLightpushClient.isNil() and node.wakuLegacyLightPush.isNil():
|
||||
error "failed to publish message as legacy lightpush not available"
|
||||
return err("waku legacy lightpush not available")
|
||||
|
||||
var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo)
|
||||
if not node.wakuLegacyLightpushClient.isNil():
|
||||
peerOpt = node.peerManager.selectPeer(WakuLegacyLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
let msg = "no suitable remote peers"
|
||||
error "failed to publish message", err = msg
|
||||
return err(msg)
|
||||
elif not node.wakuLegacyLightPush.isNil():
|
||||
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))
|
||||
|
||||
return await node.legacyLightpushPublish(pubsubTopic, message, peer = peerOpt.get())
|
||||
|
||||
proc mountLightPush*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
) {.async.} =
|
||||
info "mounting light push"
|
||||
|
||||
let pushHandler =
|
||||
if node.wakuRelay.isNil():
|
||||
debug "mounting lightpush v2 without relay (nil)"
|
||||
lightpush_protocol.getNilPushHandler()
|
||||
else:
|
||||
debug "mounting lightpush with relay"
|
||||
let rlnPeer =
|
||||
if isNil(node.wakuRlnRelay):
|
||||
debug "mounting lightpush without rln-relay"
|
||||
none(WakuRLNRelay)
|
||||
else:
|
||||
debug "mounting lightpush with rln-relay"
|
||||
some(node.wakuRlnRelay)
|
||||
lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer)
|
||||
|
||||
node.wakuLightPush = WakuLightPush.new(
|
||||
node.peerManager, node.rng, pushHandler, node.wakuAutoSharding, some(rateLimit)
|
||||
)
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start lightpush too.
|
||||
await node.wakuLightPush.start()
|
||||
|
||||
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
||||
|
||||
proc mountLightPushClient*(node: WakuNode) =
|
||||
info "mounting light push client"
|
||||
|
||||
if node.wakuLightpushClient.isNil():
|
||||
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
|
||||
|
||||
proc lightpushPublishHandler(
|
||||
node: WakuNode,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
peer: RemotePeerInfo | PeerInfo,
|
||||
mixify: bool = false,
|
||||
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
|
||||
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
if not node.wakuLightpushClient.isNil():
|
||||
notice "publishing message with lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msgHash,
|
||||
mixify = mixify
|
||||
if mixify: #indicates we want to use mix to send the message
|
||||
#TODO: How to handle multiple addresses?
|
||||
let conn = node.wakuMix.toConnection(
|
||||
MixDestination.init(peer.peerId, peer.addrs[0]),
|
||||
WakuLightPushCodec,
|
||||
Opt.some(
|
||||
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1)))
|
||||
# indicating we only want a single path to be used for reply hence numSurbs = 1
|
||||
),
|
||||
).valueOr:
|
||||
error "could not create mix connection"
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
|
||||
"Waku lightpush with mix not available",
|
||||
)
|
||||
|
||||
return await node.wakuLightpushClient.publishWithConn(
|
||||
pubsubTopic, message, conn, peer.peerId
|
||||
)
|
||||
else:
|
||||
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
|
||||
|
||||
if not node.wakuLightPush.isNil():
|
||||
if mixify:
|
||||
error "mixify is not supported with self hosted lightpush"
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
|
||||
"Waku lightpush with mix not available",
|
||||
)
|
||||
notice "publishing message with self hosted lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
msg_hash = msgHash
|
||||
return
|
||||
await node.wakuLightPush.handleSelfLightPushRequest(some(pubsubTopic), message)
|
||||
|
||||
proc lightpushPublish*(
|
||||
node: WakuNode,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
message: WakuMessage,
|
||||
peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo),
|
||||
mixify: bool = false,
|
||||
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
|
||||
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
|
||||
error "failed to publish message as lightpush not available"
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available"
|
||||
)
|
||||
if mixify and node.wakuMix.isNil():
|
||||
error "failed to publish message using mix as mix protocol is not mounted"
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available"
|
||||
)
|
||||
let toPeer: RemotePeerInfo = peerOpt.valueOr:
|
||||
if not node.wakuLightPush.isNil():
|
||||
RemotePeerInfo.init(node.peerId())
|
||||
elif not node.wakuLightpushClient.isNil():
|
||||
node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||
let msg = "no suitable remote peers"
|
||||
error "failed to publish message", msg = msg
|
||||
return lighpushErrorResult(LightPushErrorCode.NO_PEERS_TO_RELAY, msg)
|
||||
else:
|
||||
return lighpushErrorResult(
|
||||
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
|
||||
)
|
||||
|
||||
let pubsubForPublish = pubSubTopic.valueOr:
|
||||
if node.wakuAutoSharding.isNone():
|
||||
let msg = "Pubsub topic must be specified when static sharding is enabled"
|
||||
error "lightpush publish error", error = msg
|
||||
return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg)
|
||||
|
||||
let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr:
|
||||
let msg = "Invalid content-topic:" & $error
|
||||
error "lightpush request handling error", error = msg
|
||||
return lighpushErrorResult(LightPushErrorCode.INVALID_MESSAGE, msg)
|
||||
|
||||
node.wakuAutoSharding.get().getShard(parsedTopic).valueOr:
|
||||
let msg = "Autosharding error: " & error
|
||||
error "lightpush publish error", error = msg
|
||||
return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg)
|
||||
|
||||
return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify)
|
||||
120
waku/node/api/peer_exchange.nim
Normal file
120
waku/node/api/peer_exchange.nim
Normal file
@ -0,0 +1,120 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, tables, net],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_peer_exchange,
|
||||
../../waku_core,
|
||||
../peer_manager,
|
||||
../../common/rate_limit/setting
|
||||
|
||||
logScope:
|
||||
topics = "waku node peerexchange api"
|
||||
|
||||
## Waku peer-exchange
|
||||
|
||||
proc mountPeerExchange*(
|
||||
node: WakuNode,
|
||||
cluster: Option[uint16] = none(uint16),
|
||||
rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit,
|
||||
) {.async: (raises: []).} =
|
||||
info "mounting waku peer exchange"
|
||||
|
||||
node.wakuPeerExchange =
|
||||
WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit))
|
||||
|
||||
if node.started:
|
||||
try:
|
||||
await node.wakuPeerExchange.start()
|
||||
except CatchableError:
|
||||
error "failed to start wakuPeerExchange", error = getCurrentExceptionMsg()
|
||||
|
||||
try:
|
||||
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
||||
except LPError:
|
||||
error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg()
|
||||
|
||||
proc mountPeerExchangeClient*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting waku peer exchange client"
|
||||
if node.wakuPeerExchangeClient.isNil():
|
||||
node.wakuPeerExchangeClient = WakuPeerExchangeClient.new(node.peerManager)
|
||||
|
||||
proc fetchPeerExchangePeers*(
|
||||
node: Wakunode, amount = DefaultPXNumPeersReq
|
||||
): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} =
|
||||
if node.wakuPeerExchangeClient.isNil():
|
||||
error "could not get peers from px, waku peer-exchange-client is nil"
|
||||
return err(
|
||||
(
|
||||
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
||||
status_desc: some("PeerExchangeClient is not mounted"),
|
||||
)
|
||||
)
|
||||
|
||||
info "Retrieving peer info via peer exchange protocol", amount
|
||||
let pxPeersRes = await node.wakuPeerExchangeClient.request(amount)
|
||||
if pxPeersRes.isOk():
|
||||
var validPeers = 0
|
||||
let peers = pxPeersRes.get().peerInfos
|
||||
for pi in peers:
|
||||
var record: enr.Record
|
||||
if enr.fromBytes(record, pi.enr):
|
||||
node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExchange)
|
||||
validPeers += 1
|
||||
info "Retrieved peer info via peer exchange protocol",
|
||||
validPeers = validPeers, totalPeers = peers.len
|
||||
return ok(validPeers)
|
||||
else:
|
||||
warn "failed to retrieve peer info via peer exchange protocol",
|
||||
error = pxPeersRes.error
|
||||
return err(pxPeersRes.error)
|
||||
|
||||
proc peerExchangeLoop(node: WakuNode) {.async.} =
|
||||
while true:
|
||||
if not node.started:
|
||||
await sleepAsync(5.seconds)
|
||||
continue
|
||||
(await node.fetchPeerExchangePeers()).isOkOr:
|
||||
warn "Cannot fetch peers from peer exchange", cause = error
|
||||
await sleepAsync(1.minutes)
|
||||
|
||||
proc startPeerExchangeLoop*(node: WakuNode) =
|
||||
if node.wakuPeerExchangeClient.isNil():
|
||||
error "startPeerExchangeLoop: Peer Exchange is not mounted"
|
||||
return
|
||||
info "Starting peer exchange loop"
|
||||
node.wakuPeerExchangeClient.pxLoopHandle = node.peerExchangeLoop()
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc setPeerExchangePeer*(
|
||||
node: WakuNode, peer: RemotePeerInfo | MultiAddress | string
|
||||
) =
|
||||
if node.wakuPeerExchange.isNil():
|
||||
error "could not set peer, waku peer-exchange is nil"
|
||||
return
|
||||
|
||||
info "Set peer-exchange peer", peer = peer
|
||||
|
||||
let remotePeerRes = parsePeerInfo(peer)
|
||||
if remotePeerRes.isErr():
|
||||
error "could not parse peer info", error = remotePeerRes.error
|
||||
return
|
||||
|
||||
node.peerManager.addPeer(remotePeerRes.value, PeerExchange)
|
||||
waku_px_peers.inc()
|
||||
87
waku/node/api/ping.nim
Normal file
87
waku/node/api/ping.nim
Normal file
@ -0,0 +1,87 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/utility
|
||||
|
||||
import ../waku_node, ../peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku node ping api"
|
||||
|
||||
proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting libp2p ping protocol"
|
||||
|
||||
try:
|
||||
node.libp2pPing = Ping.new(rng = node.rng)
|
||||
except Exception as e:
|
||||
error "failed to create ping", error = getCurrentExceptionMsg()
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start ping too.
|
||||
try:
|
||||
await node.libp2pPing.start()
|
||||
except CatchableError:
|
||||
error "failed to start libp2pPing", error = getCurrentExceptionMsg()
|
||||
|
||||
try:
|
||||
node.switch.mount(node.libp2pPing)
|
||||
except LPError:
|
||||
error "failed to mount libp2pPing", error = getCurrentExceptionMsg()
|
||||
|
||||
proc pingPeer(node: WakuNode, peerId: PeerId): Future[Result[void, string]] {.async.} =
|
||||
## Ping a single peer and return the result
|
||||
|
||||
try:
|
||||
# Establish a stream
|
||||
let stream = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
|
||||
error "pingPeer: failed dialing peer", peerId = peerId
|
||||
return err("pingPeer failed dialing peer peerId: " & $peerId)
|
||||
defer:
|
||||
# Always close the stream
|
||||
try:
|
||||
await stream.close()
|
||||
except CatchableError as e:
|
||||
debug "Error closing ping connection", peerId = peerId, error = e.msg
|
||||
|
||||
# Perform ping
|
||||
let pingDuration = await node.libp2pPing.ping(stream)
|
||||
|
||||
trace "Ping successful", peerId = peerId, duration = pingDuration
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
error "pingPeer: exception raised pinging peer", peerId = peerId, error = e.msg
|
||||
return err("pingPeer: exception raised pinging peer: " & e.msg)
|
||||
|
||||
# Returns the number of succesful pings performed
|
||||
proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} =
|
||||
if len(peerIds) == 0:
|
||||
return 0
|
||||
|
||||
var pingFuts: seq[Future[Result[void, string]]]
|
||||
|
||||
# Create ping futures for each peer
|
||||
for i, peerId in peerIds:
|
||||
let fut = pingPeer(node, peerId)
|
||||
pingFuts.add(fut)
|
||||
|
||||
# Wait for all pings to complete
|
||||
discard await allFutures(pingFuts).withTimeout(5.seconds)
|
||||
|
||||
var successCount = 0
|
||||
for fut in pingFuts:
|
||||
if not fut.completed() or fut.failed():
|
||||
continue
|
||||
|
||||
let res = fut.read()
|
||||
if res.isOk():
|
||||
successCount.inc()
|
||||
|
||||
return successCount
|
||||
256
waku/node/api/relay.nim
Normal file
256
waku/node/api/relay.nim
Normal file
@ -0,0 +1,256 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, net],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_relay,
|
||||
../../waku_core,
|
||||
../../waku_core/topics/sharding,
|
||||
../../waku_filter_v2,
|
||||
../../waku_archive_legacy,
|
||||
../../waku_archive,
|
||||
../../waku_store_sync,
|
||||
../peer_manager,
|
||||
../../waku_rln_relay
|
||||
|
||||
declarePublicHistogram waku_histogram_message_size,
|
||||
"message size histogram in kB",
|
||||
buckets = [
|
||||
0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf
|
||||
]
|
||||
|
||||
logScope:
|
||||
topics = "waku node relay api"
|
||||
|
||||
## Waku relay
|
||||
|
||||
proc registerRelayHandler(
|
||||
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
|
||||
) =
|
||||
## Registers the only handler for the given topic.
|
||||
## Notice that this handler internally calls other handlers, such as filter,
|
||||
## archive, etc, plus the handler provided by the application.
|
||||
|
||||
if node.wakuRelay.isSubscribed(topic):
|
||||
return
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
let msgSizeKB = msg.payload.len / 1000
|
||||
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
waku_histogram_message_size.observe(msgSizeKB)
|
||||
|
||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuFilter.isNil():
|
||||
return
|
||||
|
||||
await node.wakuFilter.handleMessage(topic, msg)
|
||||
|
||||
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if not node.wakuLegacyArchive.isNil():
|
||||
## we try to store with legacy archive
|
||||
await node.wakuLegacyArchive.handleMessage(topic, msg)
|
||||
return
|
||||
|
||||
if node.wakuArchive.isNil():
|
||||
return
|
||||
|
||||
await node.wakuArchive.handleMessage(topic, msg)
|
||||
|
||||
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuStoreReconciliation.isNil():
|
||||
return
|
||||
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await traceHandler(topic, msg)
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
await syncHandler(topic, msg)
|
||||
await appHandler(topic, msg)
|
||||
|
||||
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
|
||||
|
||||
proc subscribe*(
|
||||
node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler
|
||||
): Result[void, string] =
|
||||
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
|
||||
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
|
||||
|
||||
let (pubsubTopic, contentTopicOp) =
|
||||
case subscription.kind
|
||||
of ContentSub:
|
||||
if node.wakuAutoSharding.isSome():
|
||||
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
|
||||
error "Autosharding error", error = error
|
||||
return err("Autosharding error: " & error)
|
||||
($shard, some(subscription.topic))
|
||||
else:
|
||||
return err(
|
||||
"Static sharding is used, relay subscriptions must specify a pubsub topic"
|
||||
)
|
||||
of PubsubSub:
|
||||
(subscription.topic, none(ContentTopic))
|
||||
else:
|
||||
return err("Unsupported subscription type in relay subscribe")
|
||||
|
||||
if node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
|
||||
return ok()
|
||||
|
||||
node.registerRelayHandler(pubsubTopic, handler)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
||||
|
||||
return ok()
|
||||
|
||||
proc unsubscribe*(
|
||||
node: WakuNode, subscription: SubscriptionEvent
|
||||
): Result[void, string] =
|
||||
## Unsubscribes from a specific PubSub or Content topic.
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
|
||||
|
||||
let (pubsubTopic, contentTopicOp) =
|
||||
case subscription.kind
|
||||
of ContentUnsub:
|
||||
if node.wakuAutoSharding.isSome():
|
||||
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
|
||||
error "Autosharding error", error = error
|
||||
return err("Autosharding error: " & error)
|
||||
($shard, some(subscription.topic))
|
||||
else:
|
||||
return err(
|
||||
"Static sharding is used, relay subscriptions must specify a pubsub topic"
|
||||
)
|
||||
of PubsubUnsub:
|
||||
(subscription.topic, none(ContentTopic))
|
||||
else:
|
||||
return err("Unsupported subscription type in relay unsubscribe")
|
||||
|
||||
if not node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
|
||||
return ok()
|
||||
|
||||
debug "unsubscribe", pubsubTopic, contentTopicOp
|
||||
node.wakuRelay.unsubscribe(pubsubTopic)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
|
||||
|
||||
return ok()
|
||||
|
||||
proc publish*(
|
||||
node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage
|
||||
): Future[Result[void, string]] {.async, gcsafe.} =
|
||||
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
|
||||
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
|
||||
## It is also used to determine the shard.
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
let msg =
|
||||
"Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||
error "publish error", err = msg
|
||||
# TODO: Improve error handling
|
||||
return err(msg)
|
||||
|
||||
let pubsubTopic = pubsubTopicOp.valueOr:
|
||||
if node.wakuAutoSharding.isNone():
|
||||
return err("Pubsub topic must be specified when static sharding is enabled.")
|
||||
node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr:
|
||||
let msg = "Autosharding error: " & error
|
||||
return err(msg)
|
||||
|
||||
#TODO instead of discard return error when 0 peers received the message
|
||||
discard await node.wakuRelay.publish(pubsubTopic, message)
|
||||
|
||||
notice "waku.relay published",
|
||||
peerId = node.peerId,
|
||||
pubsubTopic = pubsubTopic,
|
||||
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
|
||||
publishTime = getNowInNanosecondTime()
|
||||
|
||||
return ok()
|
||||
|
||||
proc mountRelay*(
|
||||
node: WakuNode,
|
||||
peerExchangeHandler = none(RoutingRecordsHandler),
|
||||
maxMessageSize = int(DefaultMaxWakuMessageSize),
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
if not node.wakuRelay.isNil():
|
||||
error "wakuRelay already mounted, skipping"
|
||||
return err("wakuRelay already mounted, skipping")
|
||||
|
||||
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
|
||||
info "mounting relay protocol"
|
||||
|
||||
node.wakuRelay = WakuRelay.new(node.switch, maxMessageSize).valueOr:
|
||||
error "failed mounting relay protocol", error = error
|
||||
return err("failed mounting relay protocol: " & error)
|
||||
|
||||
## Add peer exchange handler
|
||||
if peerExchangeHandler.isSome():
|
||||
node.wakuRelay.parameters.enablePX = true
|
||||
# Feature flag for peer exchange in nim-libp2p
|
||||
node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
||||
|
||||
if node.started:
|
||||
await node.startRelay()
|
||||
|
||||
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
|
||||
|
||||
info "relay mounted successfully"
|
||||
return ok()
|
||||
|
||||
## Waku RLN Relay
|
||||
|
||||
proc mountRlnRelay*(
|
||||
node: WakuNode,
|
||||
rlnConf: WakuRlnConfig,
|
||||
spamHandler = none(SpamHandler),
|
||||
registrationHandler = none(RegistrationHandler),
|
||||
) {.async.} =
|
||||
info "mounting rln relay"
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
raise newException(
|
||||
CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay"
|
||||
)
|
||||
|
||||
let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler)
|
||||
if rlnRelayRes.isErr():
|
||||
raise
|
||||
newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
|
||||
let rlnRelay = rlnRelayRes.get()
|
||||
if (rlnConf.userMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit):
|
||||
error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
|
||||
let validator = generateRlnValidator(rlnRelay, spamHandler)
|
||||
|
||||
# register rln validator as default validator
|
||||
debug "Registering RLN validator"
|
||||
node.wakuRelay.addValidator(validator, "RLN validation failed")
|
||||
|
||||
node.wakuRlnRelay = rlnRelay
|
||||
309
waku/node/api/store.nim
Normal file
309
waku/node/api/store.nim
Normal file
@ -0,0 +1,309 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
results,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/builders,
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/transports/wstransport,
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_core,
|
||||
../../waku_store_legacy/protocol as legacy_store,
|
||||
../../waku_store_legacy/client as legacy_store_client,
|
||||
../../waku_store_legacy/common as legacy_store_common,
|
||||
../../waku_store/protocol as store,
|
||||
../../waku_store/client as store_client,
|
||||
../../waku_store/common as store_common,
|
||||
../../waku_store/resume,
|
||||
../peer_manager,
|
||||
../../common/rate_limit/setting,
|
||||
../../waku_archive,
|
||||
../../waku_archive_legacy
|
||||
|
||||
logScope:
|
||||
topics = "waku node store api"
|
||||
|
||||
## Waku archive
|
||||
proc mountArchive*(
|
||||
node: WakuNode,
|
||||
driver: waku_archive.ArchiveDriver,
|
||||
retentionPolicy = none(waku_archive.RetentionPolicy),
|
||||
): Result[void, string] =
|
||||
node.wakuArchive = waku_archive.WakuArchive.new(
|
||||
driver = driver, retentionPolicy = retentionPolicy
|
||||
).valueOr:
|
||||
return err("error in mountArchive: " & error)
|
||||
|
||||
node.wakuArchive.start()
|
||||
|
||||
return ok()
|
||||
|
||||
proc mountLegacyArchive*(
|
||||
node: WakuNode, driver: waku_archive_legacy.ArchiveDriver
|
||||
): Result[void, string] =
|
||||
node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(driver = driver).valueOr:
|
||||
return err("error in mountLegacyArchive: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
## Legacy Waku Store
|
||||
|
||||
# TODO: Review this mapping logic. Maybe, move it to the appplication code
|
||||
proc toArchiveQuery(
|
||||
request: legacy_store_common.HistoryQuery
|
||||
): waku_archive_legacy.ArchiveQuery =
|
||||
waku_archive_legacy.ArchiveQuery(
|
||||
pubsubTopic: request.pubsubTopic,
|
||||
contentTopics: request.contentTopics,
|
||||
cursor: request.cursor.map(
|
||||
proc(cursor: HistoryCursor): waku_archive_legacy.ArchiveCursor =
|
||||
waku_archive_legacy.ArchiveCursor(
|
||||
pubsubTopic: cursor.pubsubTopic,
|
||||
senderTime: cursor.senderTime,
|
||||
storeTime: cursor.storeTime,
|
||||
digest: cursor.digest,
|
||||
)
|
||||
),
|
||||
startTime: request.startTime,
|
||||
endTime: request.endTime,
|
||||
pageSize: request.pageSize.uint,
|
||||
direction: request.direction,
|
||||
requestId: request.requestId,
|
||||
)
|
||||
|
||||
# TODO: Review this mapping logic. Maybe, move it to the appplication code
|
||||
proc toHistoryResult*(
|
||||
res: waku_archive_legacy.ArchiveResult
|
||||
): legacy_store_common.HistoryResult =
|
||||
if res.isErr():
|
||||
let error = res.error
|
||||
case res.error.kind
|
||||
of waku_archive_legacy.ArchiveErrorKind.DRIVER_ERROR,
|
||||
waku_archive_legacy.ArchiveErrorKind.INVALID_QUERY:
|
||||
err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: res.error.cause))
|
||||
else:
|
||||
err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
|
||||
else:
|
||||
let response = res.get()
|
||||
ok(
|
||||
HistoryResponse(
|
||||
messages: response.messages,
|
||||
cursor: response.cursor.map(
|
||||
proc(cursor: waku_archive_legacy.ArchiveCursor): HistoryCursor =
|
||||
HistoryCursor(
|
||||
pubsubTopic: cursor.pubsubTopic,
|
||||
senderTime: cursor.senderTime,
|
||||
storeTime: cursor.storeTime,
|
||||
digest: cursor.digest,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
proc mountLegacyStore*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
) {.async.} =
|
||||
info "mounting waku legacy store protocol"
|
||||
|
||||
if node.wakuLegacyArchive.isNil():
|
||||
error "failed to mount waku legacy store protocol", error = "waku archive not set"
|
||||
return
|
||||
|
||||
# TODO: Review this handler logic. Maybe, move it to the appplication code
|
||||
let queryHandler: HistoryQueryHandler = proc(
|
||||
request: HistoryQuery
|
||||
): Future[legacy_store_common.HistoryResult] {.async.} =
|
||||
if request.cursor.isSome():
|
||||
request.cursor.get().checkHistCursor().isOkOr:
|
||||
return err(error)
|
||||
|
||||
let request = request.toArchiveQuery()
|
||||
let response = await node.wakuLegacyArchive.findMessagesV2(request)
|
||||
return response.toHistoryResult()
|
||||
|
||||
node.wakuLegacyStore = legacy_store.WakuStore.new(
|
||||
node.peerManager, node.rng, queryHandler, some(rateLimit)
|
||||
)
|
||||
|
||||
if node.started:
|
||||
# Node has started already. Let's start store too.
|
||||
await node.wakuLegacyStore.start()
|
||||
|
||||
node.switch.mount(
|
||||
node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuLegacyStoreCodec)
|
||||
)
|
||||
|
||||
proc mountLegacyStoreClient*(node: WakuNode) =
|
||||
info "mounting legacy store client"
|
||||
|
||||
node.wakuLegacyStoreClient =
|
||||
legacy_store_client.WakuStoreClient.new(node.peerManager, node.rng)
|
||||
|
||||
proc query*(
|
||||
node: WakuNode, query: legacy_store_common.HistoryQuery, peer: RemotePeerInfo
|
||||
): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {.
|
||||
async, gcsafe
|
||||
.} =
|
||||
## Queries known nodes for historical messages
|
||||
if node.wakuLegacyStoreClient.isNil():
|
||||
return err("waku legacy store client is nil")
|
||||
|
||||
let queryRes = await node.wakuLegacyStoreClient.query(query, peer)
|
||||
if queryRes.isErr():
|
||||
return err("legacy store client query error: " & $queryRes.error)
|
||||
|
||||
let response = queryRes.get()
|
||||
|
||||
return ok(response)
|
||||
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc query*(
|
||||
node: WakuNode, query: legacy_store_common.HistoryQuery
|
||||
): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {.
|
||||
async, gcsafe, deprecated: "Use 'node.query()' with peer destination instead"
|
||||
.} =
|
||||
## Queries known nodes for historical messages
|
||||
if node.wakuLegacyStoreClient.isNil():
|
||||
return err("waku legacy store client is nil")
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuLegacyStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
error "no suitable remote peers"
|
||||
return err("peer_not_found_failure")
|
||||
|
||||
return await node.query(query, peerOpt.get())
|
||||
|
||||
when defined(waku_exp_store_resume):
|
||||
# TODO: Move to application module (e.g., wakunode2.nim)
|
||||
proc resume*(
|
||||
node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])
|
||||
) {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
||||
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
|
||||
## messages are stored in the wakuStore's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
if node.wakuLegacyStoreClient.isNil():
|
||||
return
|
||||
|
||||
let retrievedMessages = await node.wakuLegacyStoreClient.resume(peerList)
|
||||
if retrievedMessages.isErr():
|
||||
error "failed to resume store", error = retrievedMessages.error
|
||||
return
|
||||
|
||||
info "the number of retrieved messages since the last online time: ",
|
||||
number = retrievedMessages.value
|
||||
|
||||
## Waku Store
|
||||
|
||||
proc toArchiveQuery(request: StoreQueryRequest): waku_archive.ArchiveQuery =
|
||||
var query = waku_archive.ArchiveQuery()
|
||||
|
||||
query.includeData = request.includeData
|
||||
query.pubsubTopic = request.pubsubTopic
|
||||
query.contentTopics = request.contentTopics
|
||||
query.startTime = request.startTime
|
||||
query.endTime = request.endTime
|
||||
query.hashes = request.messageHashes
|
||||
query.cursor = request.paginationCursor
|
||||
query.direction = request.paginationForward
|
||||
query.requestId = request.requestId
|
||||
|
||||
if request.paginationLimit.isSome():
|
||||
query.pageSize = uint(request.paginationLimit.get())
|
||||
|
||||
return query
|
||||
|
||||
proc toStoreResult(res: waku_archive.ArchiveResult): StoreQueryResult =
|
||||
let response = res.valueOr:
|
||||
return err(StoreError.new(300, "archive error: " & $error))
|
||||
|
||||
var res = StoreQueryResponse()
|
||||
|
||||
res.statusCode = 200
|
||||
res.statusDesc = "OK"
|
||||
|
||||
for i in 0 ..< response.hashes.len:
|
||||
let hash = response.hashes[i]
|
||||
|
||||
let kv = store_common.WakuMessageKeyValue(messageHash: hash)
|
||||
|
||||
res.messages.add(kv)
|
||||
|
||||
for i in 0 ..< response.messages.len:
|
||||
res.messages[i].message = some(response.messages[i])
|
||||
res.messages[i].pubsubTopic = some(response.topics[i])
|
||||
|
||||
res.paginationCursor = response.cursor
|
||||
|
||||
return ok(res)
|
||||
|
||||
proc mountStore*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
) {.async.} =
|
||||
if node.wakuArchive.isNil():
|
||||
error "failed to mount waku store protocol", error = "waku archive not set"
|
||||
return
|
||||
|
||||
info "mounting waku store protocol"
|
||||
|
||||
let requestHandler: StoreQueryRequestHandler = proc(
|
||||
request: StoreQueryRequest
|
||||
): Future[StoreQueryResult] {.async.} =
|
||||
let request = request.toArchiveQuery()
|
||||
let response = await node.wakuArchive.findMessages(request)
|
||||
|
||||
return response.toStoreResult()
|
||||
|
||||
node.wakuStore =
|
||||
store.WakuStore.new(node.peerManager, node.rng, requestHandler, some(rateLimit))
|
||||
|
||||
if node.started:
|
||||
await node.wakuStore.start()
|
||||
|
||||
node.switch.mount(node.wakuStore, protocolMatcher(store_common.WakuStoreCodec))
|
||||
|
||||
proc mountStoreClient*(node: WakuNode) =
|
||||
info "mounting store client"
|
||||
|
||||
node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng)
|
||||
|
||||
proc query*(
|
||||
node: WakuNode, request: store_common.StoreQueryRequest, peer: RemotePeerInfo
|
||||
): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {.
|
||||
async, gcsafe
|
||||
.} =
|
||||
## Queries known nodes for historical messages
|
||||
if node.wakuStoreClient.isNil():
|
||||
return err("waku store v3 client is nil")
|
||||
|
||||
let response = (await node.wakuStoreClient.query(request, peer)).valueOr:
|
||||
var res = StoreQueryResponse()
|
||||
res.statusCode = uint32(error.kind)
|
||||
res.statusDesc = $error
|
||||
|
||||
return ok(res)
|
||||
|
||||
return ok(response)
|
||||
|
||||
proc setupStoreResume*(node: WakuNode) =
|
||||
node.wakuStoreResume = StoreResume.new(
|
||||
node.peerManager, node.wakuArchive, node.wakuStoreClient
|
||||
).valueOr:
|
||||
error "Failed to setup Store Resume", error = $error
|
||||
return
|
||||
@ -1,13 +1,14 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sets, strformat, random, sequtils],
|
||||
std/[options, sets, random, sequtils],
|
||||
chronos,
|
||||
chronicles,
|
||||
libp2p/protocols/rendezvous
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../api,
|
||||
../../waku_rln_relay,
|
||||
../../waku_relay,
|
||||
../peer_manager,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -12,11 +12,9 @@ proc decodePayloadV2*(
|
||||
case message.version
|
||||
of 2:
|
||||
# We attempt to decode the WakuMessage payload
|
||||
let deserializedPayload2 = deserializePayloadV2(message.payload)
|
||||
if deserializedPayload2.isOk():
|
||||
return ok(deserializedPayload2.get())
|
||||
else:
|
||||
let deserializedPayload2 = deserializePayloadV2(message.payload).valueOr:
|
||||
return err("Failed to decode WakuMessage")
|
||||
return ok(deserializedPayload2)
|
||||
else:
|
||||
return err("Wrong message version while decoding payload")
|
||||
|
||||
@ -28,13 +26,11 @@ proc encodePayloadV2*(
|
||||
raises: [NoiseMalformedHandshake, NoisePublicKeyError]
|
||||
.} =
|
||||
# We attempt to encode the PayloadV2
|
||||
let serializedPayload2 = serializePayloadV2(payload2)
|
||||
if not serializedPayload2.isOk():
|
||||
let serializedPayload2 = serializePayloadV2(payload2).valueOr:
|
||||
return err("Failed to encode PayloadV2")
|
||||
|
||||
# If successful, we create and return a WakuMessage
|
||||
let msg = WakuMessage(
|
||||
payload: serializedPayload2.get(), version: 2, contentTopic: contentTopic
|
||||
)
|
||||
let msg =
|
||||
WakuMessage(payload: serializedPayload2, version: 2, contentTopic: contentTopic)
|
||||
|
||||
return ok(msg)
|
||||
|
||||
@ -70,30 +70,27 @@ proc parseCursor(
|
||||
digest: Option[string],
|
||||
): Result[Option[HistoryCursor], string] =
|
||||
# Parse sender time
|
||||
let parsedSenderTime = parseTime(senderTime)
|
||||
if not parsedSenderTime.isOk():
|
||||
return err(parsedSenderTime.error)
|
||||
let parsedSenderTime = parseTime(senderTime).valueOr:
|
||||
return err(error)
|
||||
|
||||
# Parse store time
|
||||
let parsedStoreTime = parseTime(storeTime)
|
||||
if not parsedStoreTime.isOk():
|
||||
return err(parsedStoreTime.error)
|
||||
let parsedStoreTime = parseTime(storeTime).valueOr:
|
||||
return err(error)
|
||||
|
||||
# Parse message digest
|
||||
let parsedMsgDigest = parseMsgDigest(digest)
|
||||
if not parsedMsgDigest.isOk():
|
||||
return err(parsedMsgDigest.error)
|
||||
let parsedMsgDigest = parseMsgDigest(digest).valueOr:
|
||||
return err(error)
|
||||
|
||||
# Parse cursor information
|
||||
if parsedPubsubTopic.isSome() and parsedSenderTime.value.isSome() and
|
||||
parsedStoreTime.value.isSome() and parsedMsgDigest.value.isSome():
|
||||
if parsedPubsubTopic.isSome() and parsedSenderTime.isSome() and
|
||||
parsedStoreTime.isSome() and parsedMsgDigest.isSome():
|
||||
return ok(
|
||||
some(
|
||||
HistoryCursor(
|
||||
pubsubTopic: parsedPubsubTopic.get(),
|
||||
senderTime: parsedSenderTime.value.get(),
|
||||
storeTime: parsedStoreTime.value.get(),
|
||||
digest: parsedMsgDigest.value.get(),
|
||||
senderTime: parsedSenderTime.get(),
|
||||
storeTime: parsedStoreTime.get(),
|
||||
digest: parsedMsgDigest.get(),
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -225,16 +222,14 @@ proc installStoreApiHandlers*(
|
||||
endTime.toOpt(),
|
||||
pageSize.toOpt(),
|
||||
ascending.toOpt(),
|
||||
)
|
||||
|
||||
if not histQuery.isOk():
|
||||
return RestApiResponse.badRequest(histQuery.error)
|
||||
).valueOr:
|
||||
return RestApiResponse.badRequest(error)
|
||||
|
||||
if peerAddr.isNone() and not node.wakuLegacyStore.isNil():
|
||||
## The user didn't specify a peer address and self-node is configured as a store node.
|
||||
## In this case we assume that the user is willing to retrieve the messages stored by
|
||||
## the local/self store node.
|
||||
return await node.retrieveMsgsFromSelfNode(histQuery.get())
|
||||
return await node.retrieveMsgsFromSelfNode(histQuery)
|
||||
|
||||
# Parse the peer address parameter
|
||||
let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr:
|
||||
@ -253,4 +248,4 @@ proc installStoreApiHandlers*(
|
||||
"No suitable service peer & none discovered"
|
||||
)
|
||||
|
||||
return await node.performHistoryQuery(histQuery.value, peerAddr)
|
||||
return await node.performHistoryQuery(histQuery, peerAddr)
|
||||
|
||||
@ -60,13 +60,11 @@ proc parseMsgDigest*(
|
||||
return ok(none(waku_store_common.MessageDigest))
|
||||
|
||||
let decodedUrl = decodeUrl(input.get())
|
||||
let base64Decoded = base64.decode(Base64String(decodedUrl))
|
||||
let base64DecodedArr = base64.decode(Base64String(decodedUrl)).valueOr:
|
||||
return err(error)
|
||||
|
||||
var messageDigest = waku_store_common.MessageDigest()
|
||||
|
||||
if not base64Decoded.isOk():
|
||||
return err(base64Decoded.error)
|
||||
|
||||
let base64DecodedArr = base64Decoded.get()
|
||||
# Next snippet inspired by "nwaku/waku/waku_archive/archive.nim"
|
||||
# TODO: Improve coherence of MessageDigest type
|
||||
messageDigest = block:
|
||||
@ -220,10 +218,9 @@ proc readValue*(
|
||||
of "data":
|
||||
if data.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `data` fields found", "MessageDigest")
|
||||
let decoded = base64.decode(reader.readValue(Base64String))
|
||||
if not decoded.isOk():
|
||||
let decoded = base64.decode(reader.readValue(Base64String)).valueOr:
|
||||
reader.raiseUnexpectedField("Failed decoding data", "MessageDigest")
|
||||
data = some(decoded.get())
|
||||
data = some(decoded)
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
|
||||
@ -91,7 +91,7 @@ proc new*(
|
||||
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
|
||||
discard
|
||||
|
||||
let sres = HttpServerRef.new(
|
||||
server.httpServer = HttpServerRef.new(
|
||||
address,
|
||||
defaultProcessCallback,
|
||||
serverFlags,
|
||||
@ -106,12 +106,9 @@ proc new*(
|
||||
maxRequestBodySize,
|
||||
dualstack = dualstack,
|
||||
middlewares = middlewares,
|
||||
)
|
||||
if sres.isOk():
|
||||
server.httpServer = sres.get()
|
||||
ok(server)
|
||||
else:
|
||||
err(sres.error)
|
||||
).valueOr:
|
||||
return err(error)
|
||||
return ok(server)
|
||||
|
||||
proc getRouter(): RestRouter =
|
||||
# TODO: Review this `validate` method. Check in nim-presto what is this used for.
|
||||
|
||||
@ -51,7 +51,7 @@ const SelectNoCursorAscStmtDef =
|
||||
|
||||
const SelectNoCursorNoDataAscStmtName = "SelectWithoutCursorAndDataAsc"
|
||||
const SelectNoCursorNoDataAscStmtDef =
|
||||
"""SELECT messageHash FROM messages
|
||||
"""SELECT messageHash FROM messages
|
||||
WHERE contentTopic IN ($1) AND
|
||||
messageHash IN ($2) AND
|
||||
pubsubTopic = $3 AND
|
||||
@ -196,7 +196,7 @@ proc hashCallbackImpl(
|
||||
pqResult: ptr PGresult, rows: var seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
||||
) =
|
||||
## Callback to get a hash out of the DB.
|
||||
## Used when queries only ask for hashes
|
||||
## Used when queries only ask for hashes
|
||||
|
||||
let numFields = pqResult.pqnfields()
|
||||
if numFields != 1:
|
||||
@ -1233,33 +1233,30 @@ proc refreshPartitionsInfo(
|
||||
debug "refreshPartitionsInfo"
|
||||
self.partitionMngr.clearPartitionInfo()
|
||||
|
||||
let partitionNamesRes = await self.getPartitionsList()
|
||||
if not partitionNamesRes.isOk():
|
||||
return err("Could not retrieve partitions list: " & $partitionNamesRes.error)
|
||||
else:
|
||||
let partitionNames = partitionNamesRes.get()
|
||||
for partitionName in partitionNames:
|
||||
## partitionName contains something like 'messages_1708449815_1708449875'
|
||||
let bothTimes = partitionName.replace("messages_", "")
|
||||
let times = bothTimes.split("_")
|
||||
if times.len != 2:
|
||||
return err(fmt"loopPartitionFactory wrong partition name {partitionName}")
|
||||
let partitionNames = (await self.getPartitionsList()).valueOr:
|
||||
return err("could not get partitions list: " & $error)
|
||||
for partitionName in partitionNames:
|
||||
## partitionName contains something like 'messages_1708449815_1708449875'
|
||||
let bothTimes = partitionName.replace("messages_", "")
|
||||
let times = bothTimes.split("_")
|
||||
if times.len != 2:
|
||||
return err(fmt"loopPartitionFactory wrong partition name {partitionName}")
|
||||
|
||||
var beginning: int64
|
||||
try:
|
||||
beginning = parseInt(times[0])
|
||||
except ValueError:
|
||||
return err("Could not parse beginning time: " & getCurrentExceptionMsg())
|
||||
var beginning: int64
|
||||
try:
|
||||
beginning = parseInt(times[0])
|
||||
except ValueError:
|
||||
return err("Could not parse beginning time: " & getCurrentExceptionMsg())
|
||||
|
||||
var `end`: int64
|
||||
try:
|
||||
`end` = parseInt(times[1])
|
||||
except ValueError:
|
||||
return err("Could not parse end time: " & getCurrentExceptionMsg())
|
||||
var `end`: int64
|
||||
try:
|
||||
`end` = parseInt(times[1])
|
||||
except ValueError:
|
||||
return err("Could not parse end time: " & getCurrentExceptionMsg())
|
||||
|
||||
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
|
||||
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
|
||||
|
||||
return ok()
|
||||
return ok()
|
||||
|
||||
const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10)
|
||||
|
||||
@ -1338,10 +1335,7 @@ proc removePartition(
|
||||
let partitionName = partition.getName()
|
||||
debug "beginning of removePartition", partitionName
|
||||
|
||||
var partSize = ""
|
||||
let partSizeRes = await self.getTableSize(partitionName)
|
||||
if partSizeRes.isOk():
|
||||
partSize = partSizeRes.get()
|
||||
let partSize = (await self.getTableSize(partitionName)).valueOr("")
|
||||
|
||||
## Detach and remove the partition concurrently to not block the parent table (messages)
|
||||
let detachPartitionQuery =
|
||||
|
||||
@ -24,8 +24,7 @@ proc checkConnectivity*(
|
||||
|
||||
var numTrial = 0
|
||||
while numTrial < MaxNumTrials:
|
||||
let res = await connPool.pgQuery(HealthCheckQuery)
|
||||
if res.isOk():
|
||||
(await connPool.pgQuery(HealthCheckQuery)).isErrOr:
|
||||
## Connection resumed. Let's go back to the normal healthcheck.
|
||||
break errorBlock
|
||||
|
||||
|
||||
@ -24,8 +24,7 @@ proc checkConnectivity*(
|
||||
|
||||
var numTrial = 0
|
||||
while numTrial < MaxNumTrials:
|
||||
let res = await connPool.pgQuery(HealthCheckQuery)
|
||||
if res.isOk():
|
||||
(await connPool.pgQuery(HealthCheckQuery)).isErrOr:
|
||||
## Connection resumed. Let's go back to the normal healthcheck.
|
||||
break errorBlock
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, sets],
|
||||
std/[options, sequtils],
|
||||
results,
|
||||
chronicles,
|
||||
chronos,
|
||||
|
||||
@ -7,7 +7,6 @@ import
|
||||
mix/mix_protocol,
|
||||
mix/mix_node,
|
||||
mix/mix_metrics,
|
||||
mix/tag_manager,
|
||||
libp2p/[multiaddress, multicodec, peerid],
|
||||
eth/common/keys
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ import
|
||||
./node/net_config,
|
||||
./node/waku_switch as switch,
|
||||
./node/waku_node as node,
|
||||
./node/health_monitor as health_monitor
|
||||
./node/health_monitor as health_monitor,
|
||||
./node/api as api
|
||||
|
||||
export net_config, switch, node, health_monitor
|
||||
export net_config, switch, node, health_monitor, api
|
||||
|
||||
@ -1,10 +1,6 @@
|
||||
import
|
||||
../../common/error_handling,
|
||||
../protocol_types,
|
||||
../protocol_metrics,
|
||||
../constants,
|
||||
../rln
|
||||
import options, chronos, results, std/[deques, sequtils]
|
||||
import ../../common/error_handling, ../protocol_types, ../protocol_metrics, ../constants
|
||||
|
||||
import options, chronos, results, std/[deques]
|
||||
|
||||
export options, chronos, results, protocol_types, protocol_metrics, deques
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import
|
||||
os,
|
||||
web3,
|
||||
web3/eth_api_types,
|
||||
web3/primitives,
|
||||
@ -8,14 +7,12 @@ import
|
||||
nimcrypto/keccak as keccak,
|
||||
stint,
|
||||
json,
|
||||
std/[strutils, tables, algorithm],
|
||||
stew/[byteutils, arrayops],
|
||||
std/strutils,
|
||||
stew/byteutils,
|
||||
sequtils
|
||||
|
||||
import
|
||||
../../../waku_keystore,
|
||||
../../rln,
|
||||
../../rln/rln_interface,
|
||||
../../conversion_utils,
|
||||
../../protocol_types,
|
||||
../group_manager_base
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, metrics, metrics/chronos_httpserver, ./constants, ../utils/collector
|
||||
import chronicles, metrics, metrics/chronos_httpserver, ../utils/collector
|
||||
|
||||
export metrics
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
stew/[arrayops, byteutils, endians2],
|
||||
stint,
|
||||
results,
|
||||
std/[sequtils, strutils, tables, tempfiles]
|
||||
std/[sequtils, strutils, tables]
|
||||
|
||||
import ./rln_interface, ../conversion_utils, ../protocol_types, ../protocol_metrics
|
||||
import ../../waku_core, ../../waku_keystore
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user