mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-09 01:13:08 +00:00
Merge branch 'master' into rlnv2-only
This commit is contained in:
commit
49931966ae
@ -15,9 +15,11 @@ The following options are available:
|
||||
-p, --protocol Protocol required to be supported: store,relay,lightpush,filter (can be used
|
||||
multiple times).
|
||||
-l, --log-level Sets the log level [=LogLevel.DEBUG].
|
||||
-np, --node-port Listening port for waku node [=60000].
|
||||
-np, --node-port Listening port for waku node [=60000].
|
||||
--websocket-secure-key-path Secure websocket key path: '/path/to/key.txt' .
|
||||
--websocket-secure-cert-path Secure websocket Certificate path: '/path/to/cert.txt' .
|
||||
-c, --cluster-id Cluster ID of the fleet node to check status [Default=1]
|
||||
-s, --shard Shards index to subscribe to topics [ Argument may be repeated ]
|
||||
|
||||
```
|
||||
|
||||
|
||||
@ -84,6 +84,21 @@ type WakuCanaryConf* = object
|
||||
desc: "Ping the peer node to measure latency", defaultValue: true, name: "ping"
|
||||
.}: bool
|
||||
|
||||
shards* {.
|
||||
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
|
||||
defaultValue: @[],
|
||||
name: "shard",
|
||||
abbr: "s"
|
||||
.}: seq[uint16]
|
||||
|
||||
clusterId* {.
|
||||
desc:
|
||||
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
|
||||
defaultValue: 1,
|
||||
name: "cluster-id",
|
||||
abbr: "c"
|
||||
.}: uint16
|
||||
|
||||
proc parseCmdArg*(T: type chronos.Duration, p: string): T =
|
||||
try:
|
||||
result = chronos.seconds(parseInt(p))
|
||||
@ -190,6 +205,13 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
|
||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||
|
||||
let relayShards = RelayShards.init(conf.clusterId, conf.shards).valueOr:
|
||||
error "Relay shards initialization failed", error = error
|
||||
return 1
|
||||
enrBuilder.withWakuRelaySharding(relayShards).isOkOr:
|
||||
error "Building ENR with relay sharding failed", error = error
|
||||
return 1
|
||||
|
||||
let recordRes = enrBuilder.build()
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
@ -214,6 +236,8 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
)
|
||||
|
||||
let node = builder.build().tryGet()
|
||||
node.mountMetadata(conf.clusterId).isOkOr:
|
||||
error "failed to mount waku metadata protocol: ", err = error
|
||||
|
||||
if conf.ping:
|
||||
try:
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sequtils],
|
||||
std/[options, tables, sequtils, tempfiles],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
@ -23,6 +23,7 @@ import
|
||||
waku_lightpush/client,
|
||||
waku_lightpush/protocol_metrics,
|
||||
waku_lightpush/rpc,
|
||||
waku_rln_relay
|
||||
],
|
||||
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
|
||||
../resources/payloads
|
||||
@ -59,7 +60,7 @@ suite "Waku Lightpush - End To End":
|
||||
await server.start()
|
||||
|
||||
await server.mountRelay()
|
||||
await server.mountLightpush()
|
||||
await server.mountLightpush() # without rln-relay
|
||||
client.mountLightpushClient()
|
||||
|
||||
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
@ -103,4 +104,83 @@ suite "Waku Lightpush - End To End":
|
||||
|
||||
check:
|
||||
publishResponse.isErr()
|
||||
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
|
||||
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
|
||||
|
||||
suite "RLN Proofs as a Lightpush Service":
|
||||
var
|
||||
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
|
||||
handler {.threadvar.}: PushMessageHandler
|
||||
|
||||
server {.threadvar.}: WakuNode
|
||||
client {.threadvar.}: WakuNode
|
||||
|
||||
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
pubsubTopic {.threadvar.}: PubsubTopic
|
||||
contentTopic {.threadvar.}: ContentTopic
|
||||
message {.threadvar.}: WakuMessage
|
||||
|
||||
asyncSetup:
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
handler = proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
clientKey = generateSecp256k1Key()
|
||||
|
||||
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
# mount rln-relay
|
||||
when defined(rln_v2):
|
||||
let wakuRlnConfig = WakuRlnConfig(
|
||||
rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(1.uint),
|
||||
rlnRelayUserMessageLimit: 1,
|
||||
rlnEpochSizeSec: 1,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
|
||||
)
|
||||
else:
|
||||
let wakuRlnConfig = WakuRlnConfig(
|
||||
rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(1.uint),
|
||||
rlnEpochSizeSec: 1,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
|
||||
)
|
||||
|
||||
await allFutures(server.start(), client.start())
|
||||
await server.start()
|
||||
|
||||
await server.mountRelay()
|
||||
await server.mountRlnRelay(wakuRlnConfig)
|
||||
await server.mountLightpush()
|
||||
client.mountLightpushClient()
|
||||
|
||||
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
pubsubTopic = DefaultPubsubTopic
|
||||
contentTopic = DefaultContentTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
asyncTeardown:
|
||||
await server.stop()
|
||||
|
||||
suite "Lightpush attaching RLN proofs":
|
||||
asyncTest "Message is published when RLN enabled":
|
||||
# Given a light lightpush client
|
||||
let lightpushClient =
|
||||
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
lightpushClient.mountLightpushClient()
|
||||
|
||||
# When the client publishes a message
|
||||
let publishResponse = await lightpushClient.lightpushPublish(
|
||||
some(pubsubTopic), message, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
if not publishResponse.isOk():
|
||||
echo "Publish failed: ", publishResponse.error()
|
||||
|
||||
# Then the message is relayed to the server
|
||||
assertResultOk publishResponse
|
||||
@ -6,7 +6,7 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles],
|
||||
std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles, strformat],
|
||||
stew/[results, byteutils],
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
@ -108,10 +108,11 @@ proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} =
|
||||
tx.to = some(acc)
|
||||
tx.gasPrice = some(gasPrice)
|
||||
|
||||
# Send 10 eth to acc
|
||||
# Send 1000 eth to acc
|
||||
discard await web3.send(tx)
|
||||
let balance = await web3.provider.eth_getBalance(acc, "latest")
|
||||
assert(balance == ethToWei(1000.u256))
|
||||
assert balance == ethToWei(1000.u256),
|
||||
fmt"Balance is {balance} but expected {ethToWei(1000.u256)}"
|
||||
|
||||
return (pk, acc)
|
||||
|
||||
|
||||
@ -48,9 +48,10 @@ suite "Waku v2 Rest API - Admin":
|
||||
await allFutures(node1.start(), node2.start(), node3.start())
|
||||
await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay())
|
||||
|
||||
let restPort = Port(58011)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("127.0.0.1")
|
||||
restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installAdminApiHandlers(restServer.router, node1)
|
||||
|
||||
|
||||
@ -27,7 +27,7 @@ proc testWakuNode(): WakuNode =
|
||||
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||
bindIp = parseIpAddress("0.0.0.0")
|
||||
extIp = parseIpAddress("127.0.0.1")
|
||||
port = Port(58000)
|
||||
port = Port(0)
|
||||
|
||||
newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
|
||||
|
||||
@ -86,8 +86,7 @@ proc checkResponse(
|
||||
expectedOrigin.isSome() and
|
||||
response.headers.contains("Access-Control-Allow-Origin") and
|
||||
response.headers.getLastString("Access-Control-Allow-Origin") ==
|
||||
expectedOrigin.get() and
|
||||
response.headers.contains("Access-Control-Allow-Headers") and
|
||||
expectedOrigin.get() and response.headers.contains("Access-Control-Allow-Headers") and
|
||||
response.headers.getLastString("Access-Control-Allow-Headers") == "Content-Type"
|
||||
)
|
||||
):
|
||||
@ -106,7 +105,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef
|
||||
.init(
|
||||
@ -116,6 +115,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"),
|
||||
)
|
||||
.tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -158,7 +158,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef
|
||||
.init(
|
||||
@ -168,6 +168,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"),
|
||||
)
|
||||
.tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -213,10 +214,11 @@ suite "Waku v2 REST API CORS Handling":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer =
|
||||
WakuRestServerRef.init(restAddress, restPort, allowedOrigin = some("*")).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -259,7 +261,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef
|
||||
.init(
|
||||
@ -269,6 +271,7 @@ suite "Waku v2 REST API CORS Handling":
|
||||
some("test.net:1234,https://localhost:*,http://127.0.0.1:?8,?waku*.net:*80*"),
|
||||
)
|
||||
.tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
@ -26,7 +26,7 @@ proc testWakuNode(): WakuNode =
|
||||
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||
bindIp = parseIpAddress("0.0.0.0")
|
||||
extIp = parseIpAddress("127.0.0.1")
|
||||
port = Port(58000)
|
||||
port = Port(0)
|
||||
|
||||
newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
|
||||
|
||||
@ -37,9 +37,10 @@ suite "Waku v2 REST API - Debug":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -65,9 +66,10 @@ suite "Waku v2 REST API - Debug":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58002)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
@ -59,13 +59,17 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
|
||||
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec
|
||||
)
|
||||
|
||||
let restPort = Port(58011)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("127.0.0.1")
|
||||
testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = testSetup.restServer.httpServer.address.port
|
||||
# update with bound port for client use
|
||||
|
||||
let restPort2 = Port(58012)
|
||||
var restPort2 = Port(0)
|
||||
testSetup.restServerForService =
|
||||
WakuRestServerRef.init(restAddress, restPort2).tryGet()
|
||||
restPort2 = testSetup.restServerForService.httpServer.address.port
|
||||
# update with bound port for client use
|
||||
|
||||
# through this one we will see if messages are pushed according to our content topic sub
|
||||
testSetup.messageCache = MessageCache.init()
|
||||
|
||||
@ -45,9 +45,10 @@ suite "Waku v2 REST API - health":
|
||||
|
||||
healthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||
|
||||
let restPort = Port(58001)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installHealthApiHandler(restServer.router, healthMonitor)
|
||||
restServer.start()
|
||||
|
||||
@ -74,9 +74,11 @@ proc init(
|
||||
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec
|
||||
)
|
||||
|
||||
let restPort = Port(58011)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("127.0.0.1")
|
||||
testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = testSetup.restServer.httpServer.address.port
|
||||
# update with bound port for client use
|
||||
|
||||
installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode)
|
||||
|
||||
|
||||
@ -92,9 +92,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -170,9 +171,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -239,9 +241,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
|
||||
let restPort = Port(58012)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -334,9 +337,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58013)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -406,9 +410,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58014)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -494,9 +499,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58015)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -549,9 +555,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58016)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -617,9 +624,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
|
||||
let restPort = Port(58017)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
@ -680,7 +688,6 @@ procSuite "Waku Rest API - Store v3":
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
@ -726,9 +733,10 @@ procSuite "Waku Rest API - Store v3":
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
|
||||
let restPort = Port(58018)
|
||||
var restPort = Port(0)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||
restPort = restServer.httpServer.address.port # update with bound port for client use
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
@ -380,8 +380,8 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
|
||||
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
|
||||
):
|
||||
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]"
|
||||
let otherShardsString = "[ " & metadata.shards.join(", ") & "]"
|
||||
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]"
|
||||
let otherShardsString = "[ " & metadata.shards.join(", ") & " ]"
|
||||
reason =
|
||||
"no shards in common: my_shards = " & myShardsString & " others_shards = " &
|
||||
otherShardsString
|
||||
|
||||
@ -44,6 +44,7 @@ import
|
||||
../waku_lightpush/common,
|
||||
../waku_lightpush/protocol,
|
||||
../waku_lightpush/self_req_handler,
|
||||
../waku_lightpush/callbacks,
|
||||
../waku_enr,
|
||||
../waku_peer_exchange,
|
||||
../waku_rln_relay,
|
||||
@ -224,19 +225,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||
if node.wakuRelay.isSubscribed(topic):
|
||||
return
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
notice "waku.relay received",
|
||||
my_peer_id = node.peerId,
|
||||
pubsubTopic = topic,
|
||||
msg_hash = topic.computeMessageHash(msg).to0xHex(),
|
||||
receivedTime = getNowInNanosecondTime(),
|
||||
payloadSizeBytes = msg.payload.len
|
||||
|
||||
let msgSizeKB = msg.payload.len / 1000
|
||||
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
waku_histogram_message_size.observe(msgSizeKB)
|
||||
|
||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuFilter.isNil():
|
||||
return
|
||||
@ -252,7 +240,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||
let defaultHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await traceHandler(topic, msg)
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
|
||||
@ -389,6 +376,61 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
|
||||
info "relay started successfully"
|
||||
|
||||
proc generateRelayObserver(node: WakuNode): PubSubObserver =
|
||||
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
|
||||
for msg in msgs.messages:
|
||||
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
|
||||
warn "Error generating message id",
|
||||
my_peer_id = node.peerId,
|
||||
from_peer_id = peer.peerId,
|
||||
topic = msg.topic,
|
||||
error = $error
|
||||
continue
|
||||
|
||||
let msg_id_short = shortLog(msg_id)
|
||||
|
||||
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||||
warn "Error decoding to Waku Message",
|
||||
my_peer_id = node.peerId,
|
||||
msg_id = msg_id_short,
|
||||
from_peer_id = peer.peerId,
|
||||
topic = msg.topic,
|
||||
error = $error
|
||||
continue
|
||||
|
||||
let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()
|
||||
|
||||
if onRecv:
|
||||
notice "received relay message",
|
||||
my_peer_id = node.peerId,
|
||||
msg_hash = msg_hash,
|
||||
msg_id = msg_id_short,
|
||||
from_peer_id = peer.peerId,
|
||||
topic = msg.topic,
|
||||
receivedTime = getNowInNanosecondTime(),
|
||||
payloadSizeBytes = wakuMessage.payload.len
|
||||
|
||||
let msgSizeKB = wakuMessage.payload.len / 1000
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
waku_histogram_message_size.observe(msgSizeKB)
|
||||
else:
|
||||
notice "sent relay message",
|
||||
my_peer_id = node.peerId,
|
||||
msg_hash = msg_hash,
|
||||
msg_id = msg_id_short,
|
||||
to_peer_id = peer.peerId,
|
||||
topic = msg.topic,
|
||||
sentTime = getNowInNanosecondTime(),
|
||||
payloadSizeBytes = wakuMessage.payload.len
|
||||
|
||||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
logMessageInfo(peer, msgs, onRecv = true)
|
||||
|
||||
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
discard
|
||||
|
||||
return PubSubObserver(onRecv: onRecv, onSend: onSend)
|
||||
|
||||
proc mountRelay*(
|
||||
node: WakuNode,
|
||||
pubsubTopics: seq[string] = @[],
|
||||
@ -409,6 +451,11 @@ proc mountRelay*(
|
||||
|
||||
node.wakuRelay = initRes.value
|
||||
|
||||
# register relay observers for logging
|
||||
debug "Registering Relay observers"
|
||||
let observerLogger = node.generateRelayObserver()
|
||||
node.wakuRelay.addObserver(observerLogger)
|
||||
|
||||
## Add peer exchange handler
|
||||
if peerExchangeHandler.isSome():
|
||||
node.wakuRelay.parameters.enablePX = true
|
||||
@ -930,34 +977,22 @@ proc mountLightPush*(
|
||||
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
|
||||
) {.async.} =
|
||||
info "mounting light push"
|
||||
|
||||
var pushHandler =
|
||||
if node.wakuRelay.isNil:
|
||||
debug "mounting lightpush without relay (nil)"
|
||||
getNilPushHandler()
|
||||
else:
|
||||
debug "mounting lightpush with relay"
|
||||
let rlnPeer =
|
||||
if isNil(node.wakuRlnRelay):
|
||||
debug "mounting lightpush without rln-relay"
|
||||
none(WakuRLNRelay)
|
||||
else:
|
||||
debug "mounting lightpush with rln-relay"
|
||||
some(node.wakuRlnRelay)
|
||||
getRelayPushHandler(node.wakuRelay, rlnPeer)
|
||||
|
||||
var pushHandler: PushMessageHandler
|
||||
if node.wakuRelay.isNil():
|
||||
debug "mounting lightpush without relay (nil)"
|
||||
pushHandler = proc(
|
||||
peer: PeerId, pubsubTopic: string, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
return err("no waku relay found")
|
||||
else:
|
||||
pushHandler = proc(
|
||||
peer: PeerId, pubsubTopic: string, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message)
|
||||
if validationRes.isErr():
|
||||
return err(validationRes.error)
|
||||
|
||||
let publishedCount =
|
||||
await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
|
||||
|
||||
if publishedCount == 0:
|
||||
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
notice "Lightpush request has not been published to any peers",
|
||||
msg_hash = msgHash
|
||||
|
||||
return ok()
|
||||
|
||||
debug "mounting lightpush with relay"
|
||||
node.wakuLightPush =
|
||||
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
|
||||
|
||||
|
||||
@ -70,6 +70,9 @@ proc addPartitionInfo*(
|
||||
trace "Adding partition info"
|
||||
self.partitions.addLast(partitionInfo)
|
||||
|
||||
proc clearPartitionInfo*(self: PartitionManager) =
|
||||
self.partitions.clear()
|
||||
|
||||
proc removeOldestPartitionName*(self: PartitionManager) =
|
||||
## Simply removed the partition from the tracked/known partitions queue.
|
||||
## Just remove it and ignore it.
|
||||
|
||||
@ -866,6 +866,11 @@ proc acquireDatabaseLock*(
|
||||
s: PostgresDriver, lockId: int = 841886
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time)
|
||||
## This should only be used in the migrations module because this approach doesn't ensure
|
||||
## that the lock is acquired/released by the same connection. The preferable "lock"
|
||||
## approach is using the "performWriteQueryWithLock" proc. However, we can't use
|
||||
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
|
||||
## scripts.
|
||||
let locked = (
|
||||
await s.getStr(
|
||||
fmt"""
|
||||
@ -908,6 +913,46 @@ proc performWriteQuery*(
|
||||
|
||||
return ok()
|
||||
|
||||
const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
|
||||
|
||||
proc performWriteQueryWithLock*(
|
||||
self: PostgresDriver, queryToProtect: string
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
|
||||
debug "performWriteQueryWithLock", queryToProtect
|
||||
let query =
|
||||
fmt"""
|
||||
DO $$
|
||||
DECLARE
|
||||
lock_acquired boolean;
|
||||
BEGIN
|
||||
-- Try to acquire the advisory lock
|
||||
lock_acquired := pg_try_advisory_lock(123456789);
|
||||
|
||||
IF NOT lock_acquired THEN
|
||||
RAISE EXCEPTION '{COULD_NOT_ACQUIRE_ADVISORY_LOCK}';
|
||||
END IF;
|
||||
|
||||
-- Perform the query
|
||||
BEGIN
|
||||
{queryToProtect}
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
-- Ensure the lock is released if an error occurs
|
||||
PERFORM pg_advisory_unlock(123456789);
|
||||
RAISE;
|
||||
END;
|
||||
|
||||
-- Release the advisory lock after the query completes successfully
|
||||
PERFORM pg_advisory_unlock(123456789);
|
||||
END $$;
|
||||
"""
|
||||
(await self.performWriteQuery(query)).isOkOr:
|
||||
debug "protected query ended with error", error = $error
|
||||
return err("protected query ended with error:" & $error)
|
||||
|
||||
debug "protected query ended correctly"
|
||||
return ok()
|
||||
|
||||
proc addPartition(
|
||||
self: PostgresDriver, startTime: Timestamp
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
@ -930,21 +975,15 @@ proc addPartition(
|
||||
"CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " &
|
||||
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
|
||||
|
||||
# Lock the db
|
||||
(await self.acquireDatabaseLock()).isOkOr:
|
||||
error "failed to acquire lock", error = error
|
||||
return err("failed to lock the db")
|
||||
|
||||
defer:
|
||||
(await self.releaseDatabaseLock()).isOkOr:
|
||||
error "failed to release lock", error = error
|
||||
return err("failed to unlock the db.")
|
||||
|
||||
(await self.performWriteQuery(createPartitionQuery)).isOkOr:
|
||||
(await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr:
|
||||
if error.contains("already exists"):
|
||||
debug "skip create new partition as it already exists: ", skipped_error = $error
|
||||
return ok()
|
||||
|
||||
if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK):
|
||||
debug "skip create new partition because the advisory lock is acquired by other"
|
||||
return ok()
|
||||
|
||||
## for any different error, just consider it
|
||||
return err(fmt"error adding partition [{partitionName}]: " & $error)
|
||||
|
||||
@ -953,9 +992,12 @@ proc addPartition(
|
||||
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
|
||||
return ok()
|
||||
|
||||
proc initializePartitionsInfo(
|
||||
proc refreshPartitionsInfo(
|
||||
self: PostgresDriver
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
debug "refreshPartitionsInfo"
|
||||
self.partitionMngr.clearPartitionInfo()
|
||||
|
||||
let partitionNamesRes = await self.getPartitionsList()
|
||||
if not partitionNamesRes.isOk():
|
||||
return err("Could not retrieve partitions list: " & $partitionNamesRes.error)
|
||||
@ -994,13 +1036,13 @@ proc loopPartitionFactory(
|
||||
|
||||
debug "starting loopPartitionFactory"
|
||||
|
||||
## First of all, let's make the 'partition_manager' aware of the current partitions
|
||||
(await self.initializePartitionsInfo()).isOkOr:
|
||||
onFatalError("issue in loopPartitionFactory: " & $error)
|
||||
|
||||
while true:
|
||||
trace "Check if we need to create a new partition"
|
||||
|
||||
## Let's make the 'partition_manager' aware of the current partitions
|
||||
(await self.refreshPartitionsInfo()).isOkOr:
|
||||
onFatalError("issue in loopPartitionFactory: " & $error)
|
||||
|
||||
let now = times.now().toTime().toUnix()
|
||||
|
||||
if self.partitionMngr.isEmpty():
|
||||
|
||||
63
waku/waku_lightpush/callbacks.nim
Normal file
63
waku/waku_lightpush/callbacks.nim
Normal file
@ -0,0 +1,63 @@
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
../waku_core,
|
||||
../waku_relay,
|
||||
./common,
|
||||
./protocol,
|
||||
../waku_rln_relay,
|
||||
../waku_rln_relay/protocol_types,
|
||||
../common/ratelimit
|
||||
import
|
||||
std/times,
|
||||
libp2p/peerid,
|
||||
stew/byteutils
|
||||
|
||||
proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] =
|
||||
# check if the message already has RLN proof
|
||||
if message.proof.len > 0:
|
||||
return ok(message)
|
||||
|
||||
if rlnPeer.isNone():
|
||||
notice "Publishing message without RLN proof"
|
||||
return ok(message)
|
||||
# generate and append RLN proof
|
||||
let
|
||||
time = getTime().toUnix()
|
||||
senderEpochTime = float64(time)
|
||||
var msgWithProof = message
|
||||
rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr:
|
||||
return err(error)
|
||||
return ok(msgWithProof)
|
||||
|
||||
proc getNilPushHandler*(): PushMessageHandler =
|
||||
return proc(
|
||||
peer: PeerId, pubsubTopic: string, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
return err("no waku relay found")
|
||||
|
||||
proc getRelayPushHandler*(
|
||||
wakuRelay: WakuRelay,
|
||||
rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
|
||||
): PushMessageHandler =
|
||||
return proc(
|
||||
peer: PeerId, pubsubTopic: string, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
# append RLN proof
|
||||
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message)
|
||||
if msgWithProof.isErr():
|
||||
return err(msgWithProof.error)
|
||||
|
||||
(await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr:
|
||||
return err(error)
|
||||
|
||||
let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value)
|
||||
if publishedCount == 0:
|
||||
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
notice "Lightpush request has not been published to any peers", msg_hash = msgHash
|
||||
|
||||
return ok()
|
||||
@ -115,4 +115,4 @@ proc new*(
|
||||
requestRateLimiter: newTokenBucket(rateLimitSetting),
|
||||
)
|
||||
wl.initProtocolHandler()
|
||||
return wl
|
||||
return wl
|
||||
@ -180,6 +180,9 @@ proc addValidator*(
|
||||
) {.gcsafe.} =
|
||||
w.wakuValidators.add((handler, errorMessage))
|
||||
|
||||
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
||||
procCall GossipSub(w).addObserver(observer)
|
||||
|
||||
method start*(w: WakuRelay) {.async, base.} =
|
||||
debug "start"
|
||||
await procCall GossipSub(w).start()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user