mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
Merge branch 'master' into update-master-from-release-v30
This commit is contained in:
commit
7ec831fd9d
3
.github/ISSUE_TEMPLATE/feature_request.md
vendored
3
.github/ISSUE_TEMPLATE/feature_request.md
vendored
@ -21,3 +21,6 @@ Add any other context or screenshots about the feature request here.
|
|||||||
|
|
||||||
### Acceptance criteria
|
### Acceptance criteria
|
||||||
A list of tasks that need to be done for the issue to be considered resolved.
|
A list of tasks that need to be done for the issue to be considered resolved.
|
||||||
|
|
||||||
|
### Epic
|
||||||
|
Epic title and link the feature refers to.
|
||||||
|
|||||||
@ -8,13 +8,13 @@ explained in [nwaku-compose](https://github.com/waku-org/nwaku-compose/blob/1b56
|
|||||||
|
|
||||||
### Release highlights
|
### Release highlights
|
||||||
|
|
||||||
* RLN_v2 is used. The maximum rate can be set to
|
* RLN_v2 is used. The maximum rate can be set to `N` messages per epoch, instead of just one message per epoch. See [this](https://github.com/waku-org/nwaku/issues/2345) for more details. Notice that we established an epoch of 10 minutes.
|
||||||
`N` messages per epoch, instead of just one message per epoch. See [this](https://github.com/waku-org/nwaku/issues/2345) for more details. Notice that we established an epoch of 10 minutes.
|
|
||||||
|
|
||||||
### Changes
|
### Changes
|
||||||
|
|
||||||
- rln-relay: add chain-id flag to wakunode and restrict usage if mismatches rpc provider ([#2858](https://github.com/waku-org/nwaku/pull/2858))
|
- rln-relay: add chain-id flag to wakunode and restrict usage if mismatches rpc provider ([#2858](https://github.com/waku-org/nwaku/pull/2858))
|
||||||
- rln: nullifierlog vulnerability ([#2855](https://github.com/waku-org/nwaku/pull/2855))
|
- rln: fix nullifierlog vulnerability ([#2855](https://github.com/waku-org/nwaku/pull/2855))
|
||||||
- chore: add TWN parameters for RLNv2 ([#2843](https://github.com/waku-org/nwaku/pull/2843))
|
- chore: add TWN parameters for RLNv2 ([#2843](https://github.com/waku-org/nwaku/pull/2843))
|
||||||
- fix(rln-relay): clear nullifier log only if length is over max epoch gap ([#2836](https://github.com/waku-org/nwaku/pull/2836))
|
- fix(rln-relay): clear nullifier log only if length is over max epoch gap ([#2836](https://github.com/waku-org/nwaku/pull/2836))
|
||||||
- rlnv2: clean fork of rlnv2 ([#2828](https://github.com/waku-org/nwaku/issues/2828)) ([a02832fe](https://github.com/waku-org/nwaku/commit/a02832fe))
|
- rlnv2: clean fork of rlnv2 ([#2828](https://github.com/waku-org/nwaku/issues/2828)) ([a02832fe](https://github.com/waku-org/nwaku/commit/a02832fe))
|
||||||
|
|||||||
4
Makefile
4
Makefile
@ -130,6 +130,10 @@ ifeq ($(POSTGRES), 1)
|
|||||||
NIM_PARAMS := $(NIM_PARAMS) -d:postgres -d:nimDebugDlOpen
|
NIM_PARAMS := $(NIM_PARAMS) -d:postgres -d:nimDebugDlOpen
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
ifeq ($(DEBUG_DISCV5), 1)
|
||||||
|
NIM_PARAMS := $(NIM_PARAMS) -d:debugDiscv5
|
||||||
|
endif
|
||||||
|
|
||||||
clean: | clean-libbacktrace
|
clean: | clean-libbacktrace
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -4,10 +4,7 @@
|
|||||||
when not (compileOption("threads")):
|
when not (compileOption("threads")):
|
||||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[strformat, strutils, times, options, random]
|
import std/[strformat, strutils, times, options, random]
|
||||||
import
|
import
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, times, strutils, hashes, sequtils],
|
std/[tables, times, strutils, hashes, sequtils],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, strutils, os, sequtils, net],
|
std/[options, strutils, os, sequtils, net],
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, tables, strutils, sequtils, options, strformat],
|
std/[sets, tables, strutils, sequtils, options, strformat],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, strutils, times, sequtils, random],
|
std/[tables, strutils, times, sequtils, random],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[json, tables, sequtils],
|
std/[json, tables, sequtils],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/json,
|
std/json,
|
||||||
|
|||||||
@ -15,9 +15,11 @@ The following options are available:
|
|||||||
-p, --protocol Protocol required to be supported: store,relay,lightpush,filter (can be used
|
-p, --protocol Protocol required to be supported: store,relay,lightpush,filter (can be used
|
||||||
multiple times).
|
multiple times).
|
||||||
-l, --log-level Sets the log level [=LogLevel.DEBUG].
|
-l, --log-level Sets the log level [=LogLevel.DEBUG].
|
||||||
-np, --node-port Listening port for waku node [=60000].
|
-np, --node-port Listening port for waku node [=60000].
|
||||||
--websocket-secure-key-path Secure websocket key path: '/path/to/key.txt' .
|
--websocket-secure-key-path Secure websocket key path: '/path/to/key.txt' .
|
||||||
--websocket-secure-cert-path Secure websocket Certificate path: '/path/to/cert.txt' .
|
--websocket-secure-cert-path Secure websocket Certificate path: '/path/to/cert.txt' .
|
||||||
|
-c, --cluster-id Cluster ID of the fleet node to check status [Default=1]
|
||||||
|
-s, --shard Shards index to subscribe to topics [ Argument may be repeated ]
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@ -84,6 +84,21 @@ type WakuCanaryConf* = object
|
|||||||
desc: "Ping the peer node to measure latency", defaultValue: true, name: "ping"
|
desc: "Ping the peer node to measure latency", defaultValue: true, name: "ping"
|
||||||
.}: bool
|
.}: bool
|
||||||
|
|
||||||
|
shards* {.
|
||||||
|
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
|
||||||
|
defaultValue: @[],
|
||||||
|
name: "shard",
|
||||||
|
abbr: "s"
|
||||||
|
.}: seq[uint16]
|
||||||
|
|
||||||
|
clusterId* {.
|
||||||
|
desc:
|
||||||
|
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
|
||||||
|
defaultValue: 1,
|
||||||
|
name: "cluster-id",
|
||||||
|
abbr: "c"
|
||||||
|
.}: uint16
|
||||||
|
|
||||||
proc parseCmdArg*(T: type chronos.Duration, p: string): T =
|
proc parseCmdArg*(T: type chronos.Duration, p: string): T =
|
||||||
try:
|
try:
|
||||||
result = chronos.seconds(parseInt(p))
|
result = chronos.seconds(parseInt(p))
|
||||||
@ -190,6 +205,13 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
|||||||
|
|
||||||
var enrBuilder = EnrBuilder.init(nodeKey)
|
var enrBuilder = EnrBuilder.init(nodeKey)
|
||||||
|
|
||||||
|
let relayShards = RelayShards.init(conf.clusterId, conf.shards).valueOr:
|
||||||
|
error "Relay shards initialization failed", error = error
|
||||||
|
return 1
|
||||||
|
enrBuilder.withWakuRelaySharding(relayShards).isOkOr:
|
||||||
|
error "Building ENR with relay sharding failed", error = error
|
||||||
|
return 1
|
||||||
|
|
||||||
let recordRes = enrBuilder.build()
|
let recordRes = enrBuilder.build()
|
||||||
let record =
|
let record =
|
||||||
if recordRes.isErr():
|
if recordRes.isErr():
|
||||||
@ -214,6 +236,8 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
|||||||
)
|
)
|
||||||
|
|
||||||
let node = builder.build().tryGet()
|
let node = builder.build().tryGet()
|
||||||
|
node.mountMetadata(conf.clusterId).isOkOr:
|
||||||
|
error "failed to mount waku metadata protocol: ", err = error
|
||||||
|
|
||||||
if conf.ping:
|
if conf.ping:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, strutils, sequtils, net],
|
std/[options, strutils, sequtils, net],
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
#!/usr/bin/env groovy
|
#!/usr/bin/env groovy
|
||||||
library 'status-jenkins-lib@v1.8.9'
|
library 'status-jenkins-lib@v1.8.17'
|
||||||
|
|
||||||
pipeline {
|
pipeline {
|
||||||
agent { label 'linux' }
|
agent { label 'linux' }
|
||||||
@ -68,7 +68,7 @@ pipeline {
|
|||||||
"${params.IMAGE_NAME}:${params.IMAGE_TAG ?: env.GIT_COMMIT.take(8)}",
|
"${params.IMAGE_NAME}:${params.IMAGE_TAG ?: env.GIT_COMMIT.take(8)}",
|
||||||
"--label=build='${env.BUILD_URL}' " +
|
"--label=build='${env.BUILD_URL}' " +
|
||||||
"--label=commit='${git.commit()}' " +
|
"--label=commit='${git.commit()}' " +
|
||||||
"--label=version='${git.describe()}' " +
|
"--label=version='${git.describe('--tags')}' " +
|
||||||
"--build-arg=MAKE_TARGET='${params.MAKE_TARGET}' " +
|
"--build-arg=MAKE_TARGET='${params.MAKE_TARGET}' " +
|
||||||
"--build-arg=NIMFLAGS='${params.NIMFLAGS} -d:postgres ' " +
|
"--build-arg=NIMFLAGS='${params.NIMFLAGS} -d:postgres ' " +
|
||||||
"--build-arg=LOG_LEVEL='${params.LOWEST_LOG_LEVEL_ALLOWED}' " +
|
"--build-arg=LOG_LEVEL='${params.LOWEST_LOG_LEVEL_ALLOWED}' " +
|
||||||
|
|||||||
@ -1,8 +1,5 @@
|
|||||||
## Nim wrappers for the functions defined in librln
|
## Nim wrappers for the functions defined in librln
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import stew/results
|
import stew/results
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config]
|
../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config]
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
stew/results,
|
stew/results,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
stew/results, chronicles, ./node_spec as Waku, ./stealth_commitment_protocol as SCP
|
stew/results, chronicles, ./node_spec as Waku, ./stealth_commitment_protocol as SCP
|
||||||
|
|||||||
@ -351,7 +351,7 @@ suite "Peer Manager":
|
|||||||
await server2.stop()
|
await server2.stop()
|
||||||
|
|
||||||
suite "Tracked Peer Metadata":
|
suite "Tracked Peer Metadata":
|
||||||
xasyncTest "Metadata Recording":
|
asyncTest "Metadata Recording":
|
||||||
# When adding a peer other than self to the peer store
|
# When adding a peer other than self to the peer store
|
||||||
serverRemotePeerInfo.enr = some(server.enr)
|
serverRemotePeerInfo.enr = some(server.enr)
|
||||||
client.peerManager.addPeer(serverRemotePeerInfo)
|
client.peerManager.addPeer(serverRemotePeerInfo)
|
||||||
|
|||||||
@ -233,13 +233,13 @@ procSuite "Waku Archive - find messages":
|
|||||||
response.messages.anyIt(it == msg1)
|
response.messages.anyIt(it == msg1)
|
||||||
response.messages.anyIt(it == msg3)
|
response.messages.anyIt(it == msg3)
|
||||||
|
|
||||||
test "handle query with more than 10 content filters":
|
test "handle query with more than 100 content filters":
|
||||||
## Setup
|
## Setup
|
||||||
let
|
let
|
||||||
driver = newSqliteArchiveDriver()
|
driver = newSqliteArchiveDriver()
|
||||||
archive = newWakuArchive(driver)
|
archive = newWakuArchive(driver)
|
||||||
|
|
||||||
let queryTopics = toSeq(1 .. 15).mapIt(ContentTopic($it))
|
let queryTopics = toSeq(1 .. 150).mapIt(ContentTopic($it))
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let req = ArchiveQuery(contentTopics: queryTopics)
|
let req = ArchiveQuery(contentTopics: queryTopics)
|
||||||
|
|||||||
@ -69,7 +69,7 @@ suite "Waku Message - Content topics namespacing":
|
|||||||
let err = ns.tryError()
|
let err = ns.tryError()
|
||||||
check:
|
check:
|
||||||
err.kind == ParsingErrorKind.InvalidFormat
|
err.kind == ParsingErrorKind.InvalidFormat
|
||||||
err.cause == "topic must start with slash"
|
err.cause == "content-topic '" & topic & "' must start with slash"
|
||||||
|
|
||||||
test "Parse content topic string - Invalid string: not namespaced":
|
test "Parse content topic string - Invalid string: not namespaced":
|
||||||
## Given
|
## Given
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles, strformat],
|
std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles, strformat],
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
./rln/waku_rln_relay_utils,
|
./rln/waku_rln_relay_utils,
|
||||||
|
|||||||
@ -37,17 +37,18 @@ suite "Waku v2 Rest API - Admin":
|
|||||||
asyncSetup:
|
asyncSetup:
|
||||||
node1 =
|
node1 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600))
|
||||||
peerInfo1 = node1.switch.peerInfo
|
|
||||||
node2 =
|
node2 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602))
|
||||||
peerInfo2 = node2.switch.peerInfo
|
|
||||||
node3 =
|
node3 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604))
|
||||||
peerInfo3 = node3.switch.peerInfo
|
|
||||||
|
|
||||||
await allFutures(node1.start(), node2.start(), node3.start())
|
await allFutures(node1.start(), node2.start(), node3.start())
|
||||||
await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay())
|
await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay())
|
||||||
|
|
||||||
|
peerInfo1 = node1.switch.peerInfo
|
||||||
|
peerInfo2 = node2.switch.peerInfo
|
||||||
|
peerInfo3 = node3.switch.peerInfo
|
||||||
|
|
||||||
var restPort = Port(0)
|
var restPort = Port(0)
|
||||||
let restAddress = parseIpAddress("127.0.0.1")
|
let restAddress = parseIpAddress("127.0.0.1")
|
||||||
restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||||
@ -165,3 +166,29 @@ suite "Waku v2 Rest API - Admin":
|
|||||||
check:
|
check:
|
||||||
getRes.status == 400
|
getRes.status == 400
|
||||||
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
||||||
|
|
||||||
|
asyncTest "Get peer origin":
|
||||||
|
# Adding peers to the Peer Store
|
||||||
|
node1.peerManager.addPeer(peerInfo2, Discv5)
|
||||||
|
node1.peerManager.addPeer(peerInfo3, PeerExchange)
|
||||||
|
|
||||||
|
# Connecting to both peers
|
||||||
|
let conn2 = await node1.peerManager.connectRelay(peerInfo2)
|
||||||
|
let conn3 = await node1.peerManager.connectRelay(peerInfo3)
|
||||||
|
|
||||||
|
# Check successful connections
|
||||||
|
check:
|
||||||
|
conn2 == true
|
||||||
|
conn3 == true
|
||||||
|
|
||||||
|
# Query peers REST endpoint
|
||||||
|
let getRes = await client.getPeers()
|
||||||
|
|
||||||
|
check:
|
||||||
|
getRes.status == 200
|
||||||
|
$getRes.contentType == $MIMETYPE_JSON
|
||||||
|
getRes.data.len() == 2
|
||||||
|
# Check peer 2
|
||||||
|
getRes.data.anyIt(it.origin == Discv5)
|
||||||
|
# Check peer 3
|
||||||
|
getRes.data.anyIt(it.origin == PeerExchange)
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
|
std/os,
|
||||||
|
chronos/timer,
|
||||||
stew/byteutils,
|
stew/byteutils,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
@ -52,7 +54,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
|
|||||||
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
|
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
|
||||||
|
|
||||||
await testSetup.serviceNode.mountRelay()
|
await testSetup.serviceNode.mountRelay()
|
||||||
await testSetup.serviceNode.mountFilter()
|
await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds)
|
||||||
await testSetup.subscriberNode.mountFilterClient()
|
await testSetup.subscriberNode.mountFilterClient()
|
||||||
|
|
||||||
testSetup.subscriberNode.peerManager.addServicePeer(
|
testSetup.subscriberNode.peerManager.addServicePeer(
|
||||||
@ -315,3 +317,147 @@ suite "Waku v2 Rest API - Filter V2":
|
|||||||
messages == @[testMessage]
|
messages == @[testMessage]
|
||||||
|
|
||||||
await restFilterTest.shutdown()
|
await restFilterTest.shutdown()
|
||||||
|
|
||||||
|
asyncTest "duplicate message push to filter subscriber":
|
||||||
|
# setup filter service and client node
|
||||||
|
let restFilterTest = await RestFilterTest.init()
|
||||||
|
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
|
||||||
|
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
|
||||||
|
|
||||||
|
let requestBody = FilterSubscribeRequest(
|
||||||
|
requestId: "1001",
|
||||||
|
contentFilters: @[DefaultContentTopic],
|
||||||
|
pubsubTopic: some(DefaultPubsubTopic),
|
||||||
|
)
|
||||||
|
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
|
||||||
|
|
||||||
|
# subscribe fiter service
|
||||||
|
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
|
||||||
|
DefaultPubsubTopic, DefaultContentTopic
|
||||||
|
)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_JSON
|
||||||
|
response.data.requestId == "1001"
|
||||||
|
subscribedPeer.len() == 1
|
||||||
|
|
||||||
|
# ping subscriber node
|
||||||
|
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
|
||||||
|
|
||||||
|
let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")
|
||||||
|
|
||||||
|
check:
|
||||||
|
pingResponse.status == 200
|
||||||
|
pingResponse.data.requestId == "1002"
|
||||||
|
pingResponse.data.statusDesc == "OK"
|
||||||
|
|
||||||
|
# first - message push from service node to subscriber client
|
||||||
|
let testMessage = WakuMessage(
|
||||||
|
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
|
||||||
|
contentTopic: DefaultContentTopic,
|
||||||
|
timestamp: int64(2022),
|
||||||
|
meta: "test-meta".toBytes(),
|
||||||
|
)
|
||||||
|
|
||||||
|
let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
|
||||||
|
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
|
||||||
|
)
|
||||||
|
|
||||||
|
# check messages received client side or not
|
||||||
|
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
|
||||||
|
|
||||||
|
check:
|
||||||
|
postMsgResponse1.status == 200
|
||||||
|
$postMsgResponse1.contentType == $MIMETYPE_TEXT
|
||||||
|
postMsgResponse1.data == "OK"
|
||||||
|
len(messages1.data) == 1
|
||||||
|
|
||||||
|
# second - message push from service node to subscriber client
|
||||||
|
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
|
||||||
|
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
|
||||||
|
)
|
||||||
|
|
||||||
|
# check message received client side or not
|
||||||
|
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
|
||||||
|
|
||||||
|
check:
|
||||||
|
postMsgResponse2.status == 200
|
||||||
|
$postMsgResponse2.contentType == $MIMETYPE_TEXT
|
||||||
|
postMsgResponse2.data == "OK"
|
||||||
|
len(messages2.data) == 0
|
||||||
|
|
||||||
|
await restFilterTest.shutdown()
|
||||||
|
|
||||||
|
asyncTest "duplicate message push to filter subscriber ( sleep in between )":
|
||||||
|
# setup filter service and client node
|
||||||
|
let restFilterTest = await RestFilterTest.init()
|
||||||
|
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
|
||||||
|
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
|
||||||
|
|
||||||
|
let requestBody = FilterSubscribeRequest(
|
||||||
|
requestId: "1001",
|
||||||
|
contentFilters: @[DefaultContentTopic],
|
||||||
|
pubsubTopic: some(DefaultPubsubTopic),
|
||||||
|
)
|
||||||
|
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
|
||||||
|
|
||||||
|
# subscribe fiter service
|
||||||
|
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
|
||||||
|
DefaultPubsubTopic, DefaultContentTopic
|
||||||
|
)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_JSON
|
||||||
|
response.data.requestId == "1001"
|
||||||
|
subscribedPeer.len() == 1
|
||||||
|
|
||||||
|
# ping subscriber node
|
||||||
|
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
|
||||||
|
|
||||||
|
let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")
|
||||||
|
|
||||||
|
check:
|
||||||
|
pingResponse.status == 200
|
||||||
|
pingResponse.data.requestId == "1002"
|
||||||
|
pingResponse.data.statusDesc == "OK"
|
||||||
|
|
||||||
|
# first - message push from service node to subscriber client
|
||||||
|
let testMessage = WakuMessage(
|
||||||
|
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
|
||||||
|
contentTopic: DefaultContentTopic,
|
||||||
|
timestamp: int64(2022),
|
||||||
|
meta: "test-meta".toBytes(),
|
||||||
|
)
|
||||||
|
|
||||||
|
let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
|
||||||
|
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
|
||||||
|
)
|
||||||
|
|
||||||
|
# check messages received client side or not
|
||||||
|
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
|
||||||
|
|
||||||
|
check:
|
||||||
|
postMsgResponse1.status == 200
|
||||||
|
$postMsgResponse1.contentType == $MIMETYPE_TEXT
|
||||||
|
postMsgResponse1.data == "OK"
|
||||||
|
len(messages1.data) == 1
|
||||||
|
|
||||||
|
# Pause execution for 1 seconds to test TimeCache functionality of service node
|
||||||
|
await sleepAsync(1.seconds)
|
||||||
|
|
||||||
|
# second - message push from service node to subscriber client
|
||||||
|
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
|
||||||
|
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
|
||||||
|
)
|
||||||
|
|
||||||
|
# check message received client side or not
|
||||||
|
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
|
||||||
|
|
||||||
|
check:
|
||||||
|
postMsgResponse2.status == 200
|
||||||
|
$postMsgResponse2.contentType == $MIMETYPE_TEXT
|
||||||
|
postMsgResponse2.data == "OK"
|
||||||
|
len(messages2.data) == 1
|
||||||
|
await restFilterTest.shutdown()
|
||||||
|
|||||||
@ -511,11 +511,12 @@ suite "Waku v2 Rest API - Relay":
|
|||||||
|
|
||||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
|
||||||
|
let invalidContentTopic = "invalidContentTopic"
|
||||||
# When
|
# When
|
||||||
let response = await client.relayPostAutoMessagesV1(
|
let response = await client.relayPostAutoMessagesV1(
|
||||||
RelayWakuMessage(
|
RelayWakuMessage(
|
||||||
payload: base64.encode("TEST-PAYLOAD"),
|
payload: base64.encode("TEST-PAYLOAD"),
|
||||||
contentTopic: some("invalidContentTopic"),
|
contentTopic: some(invalidContentTopic),
|
||||||
timestamp: some(int64(2022)),
|
timestamp: some(int64(2022)),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -525,7 +526,8 @@ suite "Waku v2 Rest API - Relay":
|
|||||||
response.status == 400
|
response.status == 400
|
||||||
$response.contentType == $MIMETYPE_TEXT
|
$response.contentType == $MIMETYPE_TEXT
|
||||||
response.data ==
|
response.data ==
|
||||||
"Failed to publish. Autosharding error: invalid format: topic must start with slash"
|
"Failed to publish. Autosharding error: invalid format: content-topic '" &
|
||||||
|
invalidContentTopic & "' must start with slash"
|
||||||
|
|
||||||
await restServer.stop()
|
await restServer.stop()
|
||||||
await restServer.closeWait()
|
await restServer.closeWait()
|
||||||
|
|||||||
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 8a306763cec8105fa83574b56734b0f66823f844
|
Subproject commit 4ad38079dec8407c396ebaaf6ba60e5e94e3fce5
|
||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import stew/[results, byteutils, base64]
|
import stew/[results, byteutils, base64]
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import confutils/defs as confutilsDefs
|
import confutils/defs as confutilsDefs
|
||||||
import ../../envvar_serialization
|
import ../../envvar_serialization
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[strutils, net]
|
import std/[strutils, net]
|
||||||
import ../../../envvar_serialization
|
import ../../../envvar_serialization
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
# Simple async pool driver for postgress.
|
# Simple async pool driver for postgress.
|
||||||
# Inspired by: https://github.com/treeform/pg/
|
# Inspired by: https://github.com/treeform/pg/
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[sequtils, nre, strformat, sets], stew/results, chronos
|
import std/[sequtils, nre, strformat, sets], stew/results, chronos
|
||||||
import ./dbconn, ../common
|
import ./dbconn, ../common
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, net],
|
std/[options, net],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options, stew/results, eth/keys as eth_keys, libp2p/crypto/crypto as libp2p_crypto
|
std/options, stew/results, eth/keys as eth_keys, libp2p/crypto/crypto as libp2p_crypto
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import stew/shims/macros, serialization
|
import stew/shims/macros, serialization
|
||||||
import ./envvar_serialization/reader, ./envvar_serialization/writer
|
import ./envvar_serialization/reader, ./envvar_serialization/writer
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, typetraits, options, os],
|
std/[tables, typetraits, options, os],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[os, strutils], stew/byteutils, stew/ptrops
|
import std/[os, strutils], stew/byteutils, stew/ptrops
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
type
|
type
|
||||||
HexDataStr* = distinct string
|
HexDataStr* = distinct string
|
||||||
|
|||||||
@ -8,10 +8,7 @@ import
|
|||||||
|
|
||||||
export chronicles.LogLevel
|
export chronicles.LogLevel
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
type LogFormat* = enum
|
type LogFormat* = enum
|
||||||
TEXT
|
TEXT
|
||||||
|
|||||||
@ -1,8 +1,5 @@
|
|||||||
## An extension wrapper around nim-chronos
|
## An extension wrapper around nim-chronos
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
|
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
# Extensions for libp2p's protobuf library implementation
|
# Extensions for libp2p's protobuf library implementation
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/options, libp2p/protobuf/minprotobuf, libp2p/varint
|
import std/options, libp2p/protobuf/minprotobuf, libp2p/varint
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/options, chronos/timer, libp2p/stream/connection
|
import std/options, chronos/timer, libp2p/stream/connection
|
||||||
|
|
||||||
@ -43,10 +40,10 @@ template checkUsageLimit*(
|
|||||||
bodyWithinLimit, bodyRejected: untyped,
|
bodyWithinLimit, bodyRejected: untyped,
|
||||||
) =
|
) =
|
||||||
if t.checkUsage(proto, conn):
|
if t.checkUsage(proto, conn):
|
||||||
waku_service_requests.inc(labelValues = [proto])
|
waku_service_requests.inc(labelValues = [proto, "served"])
|
||||||
bodyWithinLimit
|
bodyWithinLimit
|
||||||
else:
|
else:
|
||||||
waku_service_requests_rejected.inc(labelValues = [proto])
|
waku_service_requests.inc(labelValues = [proto, "rejected"])
|
||||||
bodyRejected
|
bodyRejected
|
||||||
|
|
||||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronos
|
import chronos
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[httpclient, json, uri, options], stew/results
|
import std/[httpclient, json, uri, options], stew/results
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[options, strutils, net]
|
import std/[options, strutils, net]
|
||||||
import chronicles, eth/net/nat, stew/results, nativesockets
|
import chronicles, eth/net/nat, stew/results, nativesockets
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
proc flatten*[T](a: seq[seq[T]]): seq[T] =
|
proc flatten*[T](a: seq[seq[T]]): seq[T] =
|
||||||
var aFlat = newSeq[T](0)
|
var aFlat = newSeq[T](0)
|
||||||
|
|||||||
@ -1,18 +1,9 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
declarePublicCounter waku_service_requests,
|
declarePublicCounter waku_service_requests,
|
||||||
"number of non-relay service requests received", ["service"]
|
"number of non-relay service requests received", ["service", "state"]
|
||||||
declarePublicCounter waku_service_requests_rejected,
|
|
||||||
"number of non-relay service requests received being rejected due to limit overdue",
|
|
||||||
["service"]
|
|
||||||
|
|
||||||
declarePublicCounter waku_service_inbound_network_bytes,
|
declarePublicCounter waku_service_network_bytes,
|
||||||
"total incoming traffic of specific waku services", labels = ["service"]
|
"total incoming traffic of specific waku services", labels = ["service", "direction"]
|
||||||
|
|
||||||
declarePublicCounter waku_service_outbound_network_bytes,
|
|
||||||
"total outgoing traffic of specific waku services", labels = ["service"]
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, strutils, options, sets, net, json],
|
std/[sequtils, strutils, options, sets, net, json],
|
||||||
@ -172,6 +169,31 @@ proc updateENRShards(
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc logDiscv5FoundPeers(discoveredRecords: seq[waku_enr.Record]) =
|
||||||
|
for record in discoveredRecords:
|
||||||
|
let recordUri = record.toURI()
|
||||||
|
let capabilities = record.getCapabilities()
|
||||||
|
|
||||||
|
let typedRecord = record.toTyped().valueOr:
|
||||||
|
warn "Could not parse to typed record", error = error, enr = recordUri
|
||||||
|
continue
|
||||||
|
|
||||||
|
let peerInfo = record.toRemotePeerInfo().valueOr:
|
||||||
|
warn "Could not generate remote peer info", error = error, enr = recordUri
|
||||||
|
continue
|
||||||
|
|
||||||
|
let addrs = peerInfo.constructMultiaddrStr()
|
||||||
|
|
||||||
|
let rs = typedRecord.relaySharding()
|
||||||
|
let shardsStr =
|
||||||
|
if rs.isSome():
|
||||||
|
$rs.get()
|
||||||
|
else:
|
||||||
|
"no shards found"
|
||||||
|
|
||||||
|
notice "Received discv5 node",
|
||||||
|
addrs = addrs, enr = recordUri, capabilities = capabilities, shards = shardsStr
|
||||||
|
|
||||||
proc findRandomPeers*(
|
proc findRandomPeers*(
|
||||||
wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)
|
wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)
|
||||||
): Future[seq[waku_enr.Record]] {.async.} =
|
): Future[seq[waku_enr.Record]] {.async.} =
|
||||||
@ -180,6 +202,9 @@ proc findRandomPeers*(
|
|||||||
|
|
||||||
var discoveredRecords = discoveredNodes.mapIt(it.record)
|
var discoveredRecords = discoveredNodes.mapIt(it.record)
|
||||||
|
|
||||||
|
when defined(debugDiscv5):
|
||||||
|
logDiscv5FoundPeers(discoveredRecords)
|
||||||
|
|
||||||
# Filter out nodes that do not match the predicate
|
# Filter out nodes that do not match the predicate
|
||||||
if overridePred.isSome():
|
if overridePred.isSome():
|
||||||
discoveredRecords = discoveredRecords.filter(overridePred.get())
|
discoveredRecords = discoveredRecords.filter(overridePred.get())
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
## A set of utilities to integrate EIP-1459 DNS-based discovery
|
## A set of utilities to integrate EIP-1459 DNS-based discovery
|
||||||
## for Waku v2 nodes.
|
## for Waku v2 nodes.
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, net],
|
std/[options, net],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
type ClusterConf* = object
|
type ClusterConf* = object
|
||||||
maxMessageSize*: string
|
maxMessageSize*: string
|
||||||
|
|||||||
@ -135,17 +135,21 @@ proc setupProtocols(
|
|||||||
# only peers with populated records
|
# only peers with populated records
|
||||||
.mapIt(toRemotePeerInfo(it.record.get()))
|
.mapIt(toRemotePeerInfo(it.record.get()))
|
||||||
|
|
||||||
debug "connecting to exchanged peers",
|
debug "adding exchanged peers",
|
||||||
src = peer, topic = topic, numPeers = exchangedPeers.len
|
src = peer, topic = topic, numPeers = exchangedPeers.len
|
||||||
|
|
||||||
# asyncSpawn, as we don't want to block here
|
for peer in exchangedPeers:
|
||||||
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
# Peers added are filtered by the peer manager
|
||||||
|
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
|
||||||
|
|
||||||
peerExchangeHandler = some(handlePeerExchange)
|
peerExchangeHandler = some(handlePeerExchange)
|
||||||
|
|
||||||
|
let shards =
|
||||||
|
conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard"))
|
||||||
|
debug "Shards created from content topics",
|
||||||
|
contentTopics = conf.contentTopics, shards = shards
|
||||||
|
|
||||||
if conf.relay:
|
if conf.relay:
|
||||||
let shards =
|
|
||||||
conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard"))
|
|
||||||
let pubsubTopics = conf.pubsubTopics & shards
|
let pubsubTopics = conf.pubsubTopics & shards
|
||||||
|
|
||||||
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
|
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils],
|
std/[options, sequtils],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils, strutils, net],
|
std/[options, sequtils, strutils, net],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[options], chronos
|
import std/[options], chronos
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sets, sequtils, times, strutils, math, random],
|
std/[options, sets, sequtils, times, strutils, math, random],
|
||||||
@ -128,9 +125,16 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
|
|||||||
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
|
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and
|
||||||
pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0:
|
pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0:
|
||||||
trace "peer already managed and ENR info is already saved",
|
let incomingEnr = remotePeerInfo.enr.valueOr:
|
||||||
remote_peer_id = $remotePeerInfo.peerId
|
trace "peer already managed and incoming ENR is empty",
|
||||||
return
|
remote_peer_id = $remotePeerInfo.peerId
|
||||||
|
return
|
||||||
|
|
||||||
|
if pm.peerStore[ENRBook][remotePeerInfo.peerId].raw == incomingEnr.raw or
|
||||||
|
pm.peerStore[ENRBook][remotePeerInfo.peerId].seqNum > incomingEnr.seqNum:
|
||||||
|
trace "peer already managed and ENR info is already saved",
|
||||||
|
remote_peer_id = $remotePeerInfo.peerId
|
||||||
|
return
|
||||||
|
|
||||||
trace "Adding peer to manager",
|
trace "Adding peer to manager",
|
||||||
peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
||||||
@ -138,6 +142,8 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
|
|||||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey
|
||||||
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
|
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
|
||||||
|
pm.peerStore[ProtoVersionBook][remotePeerInfo.peerId] = remotePeerInfo.protoVersion
|
||||||
|
pm.peerStore[AgentBook][remotePeerInfo.peerId] = remotePeerInfo.agent
|
||||||
|
|
||||||
if remotePeerInfo.protocols.len > 0:
|
if remotePeerInfo.protocols.len > 0:
|
||||||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols
|
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols
|
||||||
@ -380,8 +386,8 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
|||||||
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
|
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
|
||||||
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
|
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
|
||||||
):
|
):
|
||||||
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]"
|
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]"
|
||||||
let otherShardsString = "[ " & metadata.shards.join(", ") & "]"
|
let otherShardsString = "[ " & metadata.shards.join(", ") & " ]"
|
||||||
reason =
|
reason =
|
||||||
"no shards in common: my_shards = " & myShardsString & " others_shards = " &
|
"no shards in common: my_shards = " & myShardsString & " others_shards = " &
|
||||||
otherShardsString
|
otherShardsString
|
||||||
@ -435,7 +441,9 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
|||||||
|
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
var remotePeerInfo = pm.peerStore.get(peerId)
|
var remotePeerInfo = pm.peerStore.get(peerId)
|
||||||
remotePeerInfo.disconnectTime = getTime().toUnix
|
|
||||||
|
if event.kind == PeerEventKind.Left:
|
||||||
|
remotePeerInfo.disconnectTime = getTime().toUnix
|
||||||
|
|
||||||
pm.storage.insertOrReplace(remotePeerInfo)
|
pm.storage.insertOrReplace(remotePeerInfo)
|
||||||
|
|
||||||
@ -640,18 +648,29 @@ proc connectToNodes*(
|
|||||||
info "Dialing multiple peers", numOfPeers = nodes.len
|
info "Dialing multiple peers", numOfPeers = nodes.len
|
||||||
|
|
||||||
var futConns: seq[Future[bool]]
|
var futConns: seq[Future[bool]]
|
||||||
|
var connectedPeers: seq[RemotePeerInfo]
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
let node = parsePeerInfo(node)
|
let node = parsePeerInfo(node)
|
||||||
if node.isOk():
|
if node.isOk():
|
||||||
futConns.add(pm.connectRelay(node.value))
|
futConns.add(pm.connectRelay(node.value))
|
||||||
|
connectedPeers.add(node.value)
|
||||||
else:
|
else:
|
||||||
error "Couldn't parse node info", error = node.error
|
error "Couldn't parse node info", error = node.error
|
||||||
|
|
||||||
await allFutures(futConns)
|
await allFutures(futConns)
|
||||||
let successfulConns = futConns.mapIt(it.read()).countIt(it == true)
|
|
||||||
|
# Filtering successful connectedPeers based on futConns
|
||||||
|
let combined = zip(connectedPeers, futConns)
|
||||||
|
connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0])
|
||||||
|
|
||||||
|
when defined(debugDiscv5):
|
||||||
|
let peerIds = connectedPeers.mapIt(it.peerId)
|
||||||
|
let origin = connectedPeers.mapIt(it.origin)
|
||||||
|
notice "established connections with found peers",
|
||||||
|
peerIds = peerIds, origin = origin
|
||||||
|
|
||||||
info "Finished dialing multiple peers",
|
info "Finished dialing multiple peers",
|
||||||
successfulConns = successfulConns, attempted = nodes.len
|
successfulConns = connectedPeers.len, attempted = nodes.len
|
||||||
|
|
||||||
# The issue seems to be around peers not being fully connected when
|
# The issue seems to be around peers not being fully connected when
|
||||||
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
||||||
@ -717,15 +736,15 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
|||||||
if outRelayPeers.len >= pm.outRelayPeersTarget:
|
if outRelayPeers.len >= pm.outRelayPeersTarget:
|
||||||
return
|
return
|
||||||
|
|
||||||
let notConnectedPeers =
|
let notConnectedPeers = pm.peerStore.getNotConnectedPeers()
|
||||||
pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
|
||||||
|
|
||||||
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||||
|
|
||||||
shuffle(outsideBackoffPeers)
|
shuffle(outsideBackoffPeers)
|
||||||
|
|
||||||
var index = 0
|
var index = 0
|
||||||
var numPendingConnReqs = outsideBackoffPeers.len
|
var numPendingConnReqs =
|
||||||
|
min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len)
|
||||||
## number of outstanding connection requests
|
## number of outstanding connection requests
|
||||||
|
|
||||||
while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget:
|
while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget:
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/[tables, strutils, os], stew/results, chronicles
|
import std/[tables, strutils, os], stew/results, chronicles
|
||||||
import ../../../common/databases/db_sqlite, ../../../common/databases/common
|
import ../../../common/databases/db_sqlite, ../../../common/databases/common
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import stew/results
|
import stew/results
|
||||||
import ../../../waku_core, ../waku_peer_store
|
import ../../../waku_core, ../waku_peer_store
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, options],
|
std/[sets, options],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, sequtils, sets, options, strutils],
|
std/[tables, sequtils, sets, options, strutils],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronicles, chronos, metrics, metrics/chronos_httpserver
|
import chronicles, chronos, metrics, metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[hashes, options, sugar, tables, strutils, sequtils, os, net],
|
std/[hashes, options, sugar, tables, strutils, sequtils, os, net],
|
||||||
@ -437,12 +434,14 @@ proc mountFilter*(
|
|||||||
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
||||||
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
||||||
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
|
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
|
||||||
|
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
|
||||||
) {.async: (raises: []).} =
|
) {.async: (raises: []).} =
|
||||||
## Mounting filter v2 protocol
|
## Mounting filter v2 protocol
|
||||||
|
|
||||||
info "mounting filter protocol"
|
info "mounting filter protocol"
|
||||||
node.wakuFilter = WakuFilter.new(
|
node.wakuFilter = WakuFilter.new(
|
||||||
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
|
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer,
|
||||||
|
messageCacheTTL,
|
||||||
)
|
)
|
||||||
|
|
||||||
if node.started:
|
if node.started:
|
||||||
|
|||||||
@ -1,8 +1,5 @@
|
|||||||
# Waku Switch utils.
|
# Waku Switch utils.
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options,
|
std/options,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import stew/results
|
import stew/results
|
||||||
import ../waku_core, ../waku_noise/noise_types, ../waku_noise/noise_utils
|
import ../waku_core, ../waku_noise/noise_types, ../waku_noise/noise_utils
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
# Request utils.
|
# Request utils.
|
||||||
|
|
||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import bearssl/rand, stew/byteutils
|
import bearssl/rand, stew/byteutils
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronos, std/[options, sequtils], stew/results
|
import chronos, std/[options, sequtils], stew/results
|
||||||
import ../discovery/waku_discv5, ../waku_relay, ../waku_core, ./message_cache
|
import ../discovery/waku_discv5, ../waku_relay, ../waku_core, ./message_cache
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, sugar, algorithm, options],
|
std/[sequtils, sugar, algorithm, options],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[strformat, sequtils, tables],
|
std/[strformat, sequtils, tables],
|
||||||
@ -32,11 +29,12 @@ logScope:
|
|||||||
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers"
|
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers"
|
||||||
const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions"
|
const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions"
|
||||||
|
|
||||||
type PeerProtocolTuple = tuple[multiaddr: string, protocol: string, connected: bool]
|
type PeerProtocolTuple =
|
||||||
|
tuple[multiaddr: string, protocol: string, connected: bool, origin: PeerOrigin]
|
||||||
|
|
||||||
proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) =
|
proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) =
|
||||||
for peer in peersTup:
|
for peer in peersTup:
|
||||||
peers.add(peer.multiaddr, peer.protocol, peer.connected)
|
peers.add(peer.multiaddr, peer.protocol, peer.connected, peer.origin)
|
||||||
|
|
||||||
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
||||||
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
|
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
|
||||||
@ -48,6 +46,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
multiaddr: constructMultiaddrStr(it),
|
multiaddr: constructMultiaddrStr(it),
|
||||||
protocol: WakuRelayCodec,
|
protocol: WakuRelayCodec,
|
||||||
connected: it.connectedness == Connectedness.Connected,
|
connected: it.connectedness == Connectedness.Connected,
|
||||||
|
origin: it.origin,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
tuplesToWakuPeers(peers, relayPeers)
|
tuplesToWakuPeers(peers, relayPeers)
|
||||||
@ -60,6 +59,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
multiaddr: constructMultiaddrStr(it),
|
multiaddr: constructMultiaddrStr(it),
|
||||||
protocol: WakuFilterSubscribeCodec,
|
protocol: WakuFilterSubscribeCodec,
|
||||||
connected: it.connectedness == Connectedness.Connected,
|
connected: it.connectedness == Connectedness.Connected,
|
||||||
|
origin: it.origin,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
tuplesToWakuPeers(peers, filterV2Peers)
|
tuplesToWakuPeers(peers, filterV2Peers)
|
||||||
@ -70,6 +70,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
multiaddr: constructMultiaddrStr(it),
|
multiaddr: constructMultiaddrStr(it),
|
||||||
protocol: WakuStoreCodec,
|
protocol: WakuStoreCodec,
|
||||||
connected: it.connectedness == Connectedness.Connected,
|
connected: it.connectedness == Connectedness.Connected,
|
||||||
|
origin: it.origin,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
tuplesToWakuPeers(peers, storePeers)
|
tuplesToWakuPeers(peers, storePeers)
|
||||||
@ -82,6 +83,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
multiaddr: constructMultiaddrStr(it),
|
multiaddr: constructMultiaddrStr(it),
|
||||||
protocol: WakuLegacyStoreCodec,
|
protocol: WakuLegacyStoreCodec,
|
||||||
connected: it.connectedness == Connectedness.Connected,
|
connected: it.connectedness == Connectedness.Connected,
|
||||||
|
origin: it.origin,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
tuplesToWakuPeers(peers, legacyStorePeers)
|
tuplesToWakuPeers(peers, legacyStorePeers)
|
||||||
@ -93,6 +95,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
multiaddr: constructMultiaddrStr(it),
|
multiaddr: constructMultiaddrStr(it),
|
||||||
protocol: WakuLightPushCodec,
|
protocol: WakuLightPushCodec,
|
||||||
connected: it.connectedness == Connectedness.Connected,
|
connected: it.connectedness == Connectedness.Connected,
|
||||||
|
origin: it.origin,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
tuplesToWakuPeers(peers, lightpushPeers)
|
tuplesToWakuPeers(peers, lightpushPeers)
|
||||||
|
|||||||
@ -1,14 +1,11 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
json_serialization,
|
json_serialization,
|
||||||
json_serialization/std/options,
|
json_serialization/std/options,
|
||||||
json_serialization/lexer
|
json_serialization/lexer
|
||||||
import ../serdes
|
import ../serdes, ../../../waku_core
|
||||||
|
|
||||||
#### Types
|
#### Types
|
||||||
|
|
||||||
@ -19,6 +16,7 @@ type ProtocolState* = object
|
|||||||
type WakuPeer* = object
|
type WakuPeer* = object
|
||||||
multiaddr*: string
|
multiaddr*: string
|
||||||
protocols*: seq[ProtocolState]
|
protocols*: seq[ProtocolState]
|
||||||
|
origin*: PeerOrigin
|
||||||
|
|
||||||
type WakuPeers* = seq[WakuPeer]
|
type WakuPeers* = seq[WakuPeer]
|
||||||
|
|
||||||
@ -46,6 +44,7 @@ proc writeValue*(
|
|||||||
writer.beginRecord()
|
writer.beginRecord()
|
||||||
writer.writeField("multiaddr", value.multiaddr)
|
writer.writeField("multiaddr", value.multiaddr)
|
||||||
writer.writeField("protocols", value.protocols)
|
writer.writeField("protocols", value.protocols)
|
||||||
|
writer.writeField("origin", value.origin)
|
||||||
writer.endRecord()
|
writer.endRecord()
|
||||||
|
|
||||||
proc writeValue*(
|
proc writeValue*(
|
||||||
@ -100,6 +99,7 @@ proc readValue*(
|
|||||||
var
|
var
|
||||||
multiaddr: Option[string]
|
multiaddr: Option[string]
|
||||||
protocols: Option[seq[ProtocolState]]
|
protocols: Option[seq[ProtocolState]]
|
||||||
|
origin: Option[PeerOrigin]
|
||||||
|
|
||||||
for fieldName in readObjectFields(reader):
|
for fieldName in readObjectFields(reader):
|
||||||
case fieldName
|
case fieldName
|
||||||
@ -111,6 +111,10 @@ proc readValue*(
|
|||||||
if protocols.isSome():
|
if protocols.isSome():
|
||||||
reader.raiseUnexpectedField("Multiple `protocols` fields found", "WakuPeer")
|
reader.raiseUnexpectedField("Multiple `protocols` fields found", "WakuPeer")
|
||||||
protocols = some(reader.readValue(seq[ProtocolState]))
|
protocols = some(reader.readValue(seq[ProtocolState]))
|
||||||
|
of "origin":
|
||||||
|
if origin.isSome():
|
||||||
|
reader.raiseUnexpectedField("Multiple `origin` fields found", "WakuPeer")
|
||||||
|
origin = some(reader.readValue(PeerOrigin))
|
||||||
else:
|
else:
|
||||||
unrecognizedFieldWarning()
|
unrecognizedFieldWarning()
|
||||||
|
|
||||||
@ -120,7 +124,12 @@ proc readValue*(
|
|||||||
if protocols.isNone():
|
if protocols.isNone():
|
||||||
reader.raiseUnexpectedValue("Field `protocols` are missing")
|
reader.raiseUnexpectedValue("Field `protocols` are missing")
|
||||||
|
|
||||||
value = WakuPeer(multiaddr: multiaddr.get(), protocols: protocols.get())
|
if origin.isNone():
|
||||||
|
reader.raiseUnexpectedValue("Field `origin` is missing")
|
||||||
|
|
||||||
|
value = WakuPeer(
|
||||||
|
multiaddr: multiaddr.get(), protocols: protocols.get(), origin: origin.get()
|
||||||
|
)
|
||||||
|
|
||||||
proc readValue*(
|
proc readValue*(
|
||||||
reader: var JsonReader[RestJson], value: var FilterTopic
|
reader: var JsonReader[RestJson], value: var FilterTopic
|
||||||
@ -196,10 +205,17 @@ func `==`*(a: ProtocolState, b: string): bool {.inline.} =
|
|||||||
func `==`*(a, b: WakuPeer): bool {.inline.} =
|
func `==`*(a, b: WakuPeer): bool {.inline.} =
|
||||||
return a.multiaddr == b.multiaddr
|
return a.multiaddr == b.multiaddr
|
||||||
|
|
||||||
proc add*(peers: var WakuPeers, multiaddr: string, protocol: string, connected: bool) =
|
proc add*(
|
||||||
|
peers: var WakuPeers,
|
||||||
|
multiaddr: string,
|
||||||
|
protocol: string,
|
||||||
|
connected: bool,
|
||||||
|
origin: PeerOrigin,
|
||||||
|
) =
|
||||||
var peer: WakuPeer = WakuPeer(
|
var peer: WakuPeer = WakuPeer(
|
||||||
multiaddr: multiaddr,
|
multiaddr: multiaddr,
|
||||||
protocols: @[ProtocolState(protocol: protocol, connected: connected)],
|
protocols: @[ProtocolState(protocol: protocol, connected: connected)],
|
||||||
|
origin: origin,
|
||||||
)
|
)
|
||||||
let idx = peers.find(peer)
|
let idx = peers.find(peer)
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import net, tables
|
import net, tables
|
||||||
import presto
|
import presto
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import presto/client
|
import presto/client
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronicles, json_serialization, presto/route
|
import chronicles, json_serialization, presto/route
|
||||||
import ../../../waku_node, ../responses, ../serdes, ./types
|
import ../../../waku_node, ../responses, ../serdes, ./types
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronicles, json_serialization, json_serialization/std/options
|
import chronicles, json_serialization, json_serialization/std/options
|
||||||
import ../../../waku_node, ../serdes
|
import ../../../waku_node, ../serdes
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
json,
|
json,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strformat,
|
std/strformat,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, strformat],
|
std/[sets, strformat],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronicles, json_serialization, presto/route
|
import chronicles, json_serialization, presto/route
|
||||||
import ../../../waku_node, ../responses, ../serdes, ./types
|
import ../../../waku_node, ../responses, ../serdes, ./types
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import chronicles, json_serialization, json_serialization/std/options
|
import chronicles, json_serialization, json_serialization/std/options
|
||||||
import ../../../waku_node, ../serdes
|
import ../../../waku_node, ../serdes
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/strformat, stew/results, chronicles, uri, json_serialization, presto/route
|
import std/strformat, stew/results, chronicles, uri, json_serialization, presto/route
|
||||||
import
|
import
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, strformat, uri],
|
std/[sets, strformat, uri],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
json,
|
json,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strformat,
|
std/strformat,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, strformat],
|
std/[sets, strformat],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, strutils, re, net],
|
std/[options, strutils, re, net],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/sets,
|
std/sets,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/sequtils,
|
std/sequtils,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, strformat, times],
|
std/[sets, strformat, times],
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/typetraits, stew/results, chronicles, presto/common
|
import std/typetraits, stew/results, chronicles, presto/common
|
||||||
import ./serdes
|
import ./serdes
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/typetraits,
|
std/typetraits,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/typetraits,
|
std/typetraits,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import std/net
|
import std/net
|
||||||
import
|
import
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
{.push raises: [].}
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user