mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
Compare commits
13 Commits
master
...
v0.37.1-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbc089f20e | ||
|
|
4664b96a7a | ||
|
|
ff93643ae9 | ||
|
|
9a341a68e5 | ||
|
|
59045461d3 | ||
|
|
9223fac807 | ||
|
|
9d53867ec2 | ||
|
|
2d9cc0a561 | ||
|
|
282a929826 | ||
|
|
902732eb77 | ||
|
|
31e237d259 | ||
|
|
1d8860a2b1 | ||
|
|
0baf273a97 |
65
CHANGELOG.md
65
CHANGELOG.md
@ -1,3 +1,68 @@
|
|||||||
|
## v0.37.1-beta (2025-12-10)
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
- Remove ENR cache from peer exchange ([#3652](https://github.com/logos-messaging/logos-messaging-nim/pull/3652)) ([7920368a](https://github.com/logos-messaging/logos-messaging-nim/commit/7920368a36687cd5f12afa52d59866792d8457ca))
|
||||||
|
|
||||||
|
## v0.37.0 (2025-10-01)
|
||||||
|
|
||||||
|
### Notes
|
||||||
|
|
||||||
|
- Deprecated parameters:
|
||||||
|
- `tree_path` and `rlnDB` (RLN-related storage paths)
|
||||||
|
- `--dns-discovery` (fully removed, including dns-discovery-name-server)
|
||||||
|
- `keepAlive` (deprecated, config updated accordingly)
|
||||||
|
- Legacy `store` protocol is no longer supported by default.
|
||||||
|
- Improved sharding configuration: now explicit and shard-specific metrics added.
|
||||||
|
- Mix nodes are limited to IPv4 addresses only.
|
||||||
|
- [lightpush legacy](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) is being deprecated. Use [lightpush v3](https://github.com/waku-org/specs/blob/master/standards/core/lightpush.md) instead.
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Waku API: create node via API ([#3580](https://github.com/waku-org/nwaku/pull/3580)) ([bc8acf76](https://github.com/waku-org/nwaku/commit/bc8acf76))
|
||||||
|
- Waku Sync: full topic support ([#3275](https://github.com/waku-org/nwaku/pull/3275)) ([9327da5a](https://github.com/waku-org/nwaku/commit/9327da5a))
|
||||||
|
- Mix PoC implementation ([#3284](https://github.com/waku-org/nwaku/pull/3284)) ([eb7a3d13](https://github.com/waku-org/nwaku/commit/eb7a3d13))
|
||||||
|
- Rendezvous: add request interval option ([#3569](https://github.com/waku-org/nwaku/pull/3569)) ([cc7a6406](https://github.com/waku-org/nwaku/commit/cc7a6406))
|
||||||
|
- Shard-specific metrics tracking ([#3520](https://github.com/waku-org/nwaku/pull/3520)) ([c3da29fd](https://github.com/waku-org/nwaku/commit/c3da29fd))
|
||||||
|
- Libwaku: build Windows DLL for Status-go ([#3460](https://github.com/waku-org/nwaku/pull/3460)) ([5c38a53f](https://github.com/waku-org/nwaku/commit/5c38a53f))
|
||||||
|
- RLN: add Stateless RLN support ([#3621](https://github.com/waku-org/nwaku/pull/3621))
|
||||||
|
- LOG: Reduce log level of messages from debug to info for better visibility ([#3622](https://github.com/waku-org/nwaku/pull/3622))
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
- Prevent invalid pubsub topic subscription via Relay REST API ([#3559](https://github.com/waku-org/nwaku/pull/3559)) ([a36601ab](https://github.com/waku-org/nwaku/commit/a36601ab))
|
||||||
|
- Fixed node crash when RLN is unregistered ([#3573](https://github.com/waku-org/nwaku/pull/3573)) ([3d0c6279](https://github.com/waku-org/nwaku/commit/3d0c6279))
|
||||||
|
- REST: fixed sync protocol issues ([#3503](https://github.com/waku-org/nwaku/pull/3503)) ([393e3cce](https://github.com/waku-org/nwaku/commit/393e3cce))
|
||||||
|
- Regex pattern fix for `username:password@` in URLs ([#3517](https://github.com/waku-org/nwaku/pull/3517)) ([89a3f735](https://github.com/waku-org/nwaku/commit/89a3f735))
|
||||||
|
- Sharding: applied modulus fix ([#3530](https://github.com/waku-org/nwaku/pull/3530)) ([f68d7999](https://github.com/waku-org/nwaku/commit/f68d7999))
|
||||||
|
- Metrics: switched to counter instead of gauge ([#3355](https://github.com/waku-org/nwaku/pull/3355)) ([a27eec90](https://github.com/waku-org/nwaku/commit/a27eec90))
|
||||||
|
- Fixed lightpush metrics and diagnostics ([#3486](https://github.com/waku-org/nwaku/pull/3486)) ([0ed3fc80](https://github.com/waku-org/nwaku/commit/0ed3fc80))
|
||||||
|
- Misc sync, dashboard, and CI fixes ([#3434](https://github.com/waku-org/nwaku/pull/3434), [#3508](https://github.com/waku-org/nwaku/pull/3508), [#3464](https://github.com/waku-org/nwaku/pull/3464))
|
||||||
|
- Raise log level of numerous operational messages from debug to info for better visibility ([#3622](https://github.com/waku-org/nwaku/pull/3622))
|
||||||
|
|
||||||
|
### Changes
|
||||||
|
|
||||||
|
- Enable peer-exchange by default ([#3557](https://github.com/waku-org/nwaku/pull/3557)) ([7df526f8](https://github.com/waku-org/nwaku/commit/7df526f8))
|
||||||
|
- Refactor peer-exchange client and service implementations ([#3523](https://github.com/waku-org/nwaku/pull/3523)) ([4379f9ec](https://github.com/waku-org/nwaku/commit/4379f9ec))
|
||||||
|
- Updated rendezvous to use callback-based shard/capability updates ([#3558](https://github.com/waku-org/nwaku/pull/3558)) ([028bf297](https://github.com/waku-org/nwaku/commit/028bf297))
|
||||||
|
- Config updates and explicit sharding setup ([#3468](https://github.com/waku-org/nwaku/pull/3468)) ([994d485b](https://github.com/waku-org/nwaku/commit/994d485b))
|
||||||
|
- Bumped libp2p to v1.13.0 ([#3574](https://github.com/waku-org/nwaku/pull/3574)) ([b1616e55](https://github.com/waku-org/nwaku/commit/b1616e55))
|
||||||
|
- Removed legacy dependencies (e.g., libpcre in Docker builds) ([#3552](https://github.com/waku-org/nwaku/pull/3552)) ([4db4f830](https://github.com/waku-org/nwaku/commit/4db4f830))
|
||||||
|
- Benchmarks for RLN proof generation & verification ([#3567](https://github.com/waku-org/nwaku/pull/3567)) ([794c3a85](https://github.com/waku-org/nwaku/commit/794c3a85))
|
||||||
|
- Various CI/CD & infra updates ([#3515](https://github.com/waku-org/nwaku/pull/3515), [#3505](https://github.com/waku-org/nwaku/pull/3505))
|
||||||
|
|
||||||
|
### This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
|
||||||
|
|
||||||
|
| Protocol | Spec status | Protocol id |
|
||||||
|
| ---: | :---: | :--- |
|
||||||
|
| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` |
|
||||||
|
| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1` <br />`/vac/waku/filter-subscribe/2.0.0-beta1` <br />`/vac/waku/filter-push/2.0.0-beta1` |
|
||||||
|
| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` |
|
||||||
|
| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
|
||||||
|
| [`WAKU2-LIGHTPUSH v3`](https://github.com/waku-org/specs/blob/master/standards/core/lightpush.md) | `draft` | `/vac/waku/lightpush/3.0.0` |
|
||||||
|
| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` |
|
||||||
|
| [`WAKU-SYNC`](https://github.com/waku-org/specs/blob/master/standards/core/sync.md) | `draft` | `/vac/waku/sync/1.0.0` |
|
||||||
|
|
||||||
## v0.36.0 (2025-06-20)
|
## v0.36.0 (2025-06-20)
|
||||||
### Notes
|
### Notes
|
||||||
|
|
||||||
|
|||||||
@ -66,15 +66,17 @@ suite "Waku Peer Exchange":
|
|||||||
|
|
||||||
suite "fetchPeerExchangePeers":
|
suite "fetchPeerExchangePeers":
|
||||||
var node2 {.threadvar.}: WakuNode
|
var node2 {.threadvar.}: WakuNode
|
||||||
|
var node3 {.threadvar.}: WakuNode
|
||||||
|
|
||||||
asyncSetup:
|
asyncSetup:
|
||||||
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
|
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
|
||||||
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
|
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
|
||||||
|
node3 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
|
||||||
|
|
||||||
await allFutures(node.start(), node2.start())
|
await allFutures(node.start(), node2.start(), node3.start())
|
||||||
|
|
||||||
asyncTeardown:
|
asyncTeardown:
|
||||||
await allFutures(node.stop(), node2.stop())
|
await allFutures(node.stop(), node2.stop(), node3.stop())
|
||||||
|
|
||||||
asyncTest "Node fetches without mounting peer exchange":
|
asyncTest "Node fetches without mounting peer exchange":
|
||||||
# When a node, without peer exchange mounted, fetches peers
|
# When a node, without peer exchange mounted, fetches peers
|
||||||
@ -104,12 +106,10 @@ suite "Waku Peer Exchange":
|
|||||||
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
|
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
|
||||||
check node.peerManager.switch.peerStore.peers.len == 0
|
check node.peerManager.switch.peerStore.peers.len == 0
|
||||||
|
|
||||||
# Mock that we discovered a node (to avoid running discv5)
|
# Simulate node2 discovering node3 via Discv5
|
||||||
var enr = enr.Record()
|
var rpInfo = node3.peerInfo.toRemotePeerInfo()
|
||||||
assert enr.fromUri(
|
rpInfo.enr = some(node3.enr)
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
node2.peerManager.addPeer(rpInfo, PeerOrigin.Discv5)
|
||||||
), "Failed to parse ENR"
|
|
||||||
node2.wakuPeerExchange.enrCache.add(enr)
|
|
||||||
|
|
||||||
# Set node2 as service peer (default one) for px protocol
|
# Set node2 as service peer (default one) for px protocol
|
||||||
node.peerManager.addServicePeer(
|
node.peerManager.addServicePeer(
|
||||||
@ -121,10 +121,8 @@ suite "Waku Peer Exchange":
|
|||||||
check res.tryGet() == 1
|
check res.tryGet() == 1
|
||||||
|
|
||||||
# Check that the peer ended up in the peerstore
|
# Check that the peer ended up in the peerstore
|
||||||
let rpInfo = enr.toRemotePeerInfo.get()
|
|
||||||
check:
|
check:
|
||||||
node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
|
node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
|
||||||
node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)
|
|
||||||
|
|
||||||
suite "setPeerExchangePeer":
|
suite "setPeerExchangePeer":
|
||||||
var node2 {.threadvar.}: WakuNode
|
var node2 {.threadvar.}: WakuNode
|
||||||
|
|||||||
@ -142,9 +142,13 @@ suite "Waku Peer Exchange":
|
|||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
node2 =
|
node2 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
node3 =
|
||||||
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
node4 =
|
||||||
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
|
||||||
# Start and mount peer exchange
|
# Start and mount peer exchange
|
||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start(), node3.start(), node4.start()])
|
||||||
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
|
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
|
||||||
|
|
||||||
# Create connection
|
# Create connection
|
||||||
@ -154,18 +158,15 @@ suite "Waku Peer Exchange":
|
|||||||
require:
|
require:
|
||||||
connOpt.isSome
|
connOpt.isSome
|
||||||
|
|
||||||
# Create some enr and add to peer exchange (simulating disv5)
|
# Simulate node1 discovering node3 via Discv5
|
||||||
var enr1, enr2 = enr.Record()
|
var info3 = node3.peerInfo.toRemotePeerInfo()
|
||||||
check enr1.fromUri(
|
info3.enr = some(node3.enr)
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
|
||||||
)
|
|
||||||
check enr2.fromUri(
|
|
||||||
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mock that we have discovered these enrs
|
# Simulate node1 discovering node4 via Discv5
|
||||||
node1.wakuPeerExchange.enrCache.add(enr1)
|
var info4 = node4.peerInfo.toRemotePeerInfo()
|
||||||
node1.wakuPeerExchange.enrCache.add(enr2)
|
info4.enr = some(node4.enr)
|
||||||
|
node1.peerManager.addPeer(info4, PeerOrigin.Discv5)
|
||||||
|
|
||||||
# Request 2 peer from px. Test all request variants
|
# Request 2 peer from px. Test all request variants
|
||||||
let response1 = await node2.wakuPeerExchangeClient.request(2)
|
let response1 = await node2.wakuPeerExchangeClient.request(2)
|
||||||
@ -185,12 +186,12 @@ suite "Waku Peer Exchange":
|
|||||||
response3.get().peerInfos.len == 2
|
response3.get().peerInfos.len == 2
|
||||||
|
|
||||||
# Since it can return duplicates test that at least one of the enrs is in the response
|
# Since it can return duplicates test that at least one of the enrs is in the response
|
||||||
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or
|
response1.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
|
||||||
response1.get().peerInfos.anyIt(it.enr == enr2.raw)
|
response1.get().peerInfos.anyIt(it.enr == node4.enr.raw)
|
||||||
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or
|
response2.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
|
||||||
response2.get().peerInfos.anyIt(it.enr == enr2.raw)
|
response2.get().peerInfos.anyIt(it.enr == node4.enr.raw)
|
||||||
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or
|
response3.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
|
||||||
response3.get().peerInfos.anyIt(it.enr == enr2.raw)
|
response3.get().peerInfos.anyIt(it.enr == node4.enr.raw)
|
||||||
|
|
||||||
asyncTest "Request fails gracefully":
|
asyncTest "Request fails gracefully":
|
||||||
let
|
let
|
||||||
@ -265,8 +266,8 @@ suite "Waku Peer Exchange":
|
|||||||
peerInfo2.origin = PeerOrigin.Discv5
|
peerInfo2.origin = PeerOrigin.Discv5
|
||||||
|
|
||||||
check:
|
check:
|
||||||
not poolFilter(cluster, peerInfo1)
|
poolFilter(cluster, peerInfo1).isErr()
|
||||||
poolFilter(cluster, peerInfo2)
|
poolFilter(cluster, peerInfo2).isOk()
|
||||||
|
|
||||||
asyncTest "Request 0 peers, with 1 peer in PeerExchange":
|
asyncTest "Request 0 peers, with 1 peer in PeerExchange":
|
||||||
# Given two valid nodes with PeerExchange
|
# Given two valid nodes with PeerExchange
|
||||||
@ -275,9 +276,11 @@ suite "Waku Peer Exchange":
|
|||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
node2 =
|
node2 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
node3 =
|
||||||
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
|
||||||
# Start and mount peer exchange
|
# Start and mount peer exchange
|
||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
|
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
|
||||||
|
|
||||||
# Connect the nodes
|
# Connect the nodes
|
||||||
@ -286,12 +289,10 @@ suite "Waku Peer Exchange":
|
|||||||
)
|
)
|
||||||
assert dialResponse.isSome
|
assert dialResponse.isSome
|
||||||
|
|
||||||
# Mock that we have discovered one enr
|
# Simulate node1 discovering node3 via Discv5
|
||||||
var record = enr.Record()
|
var info3 = node3.peerInfo.toRemotePeerInfo()
|
||||||
check record.fromUri(
|
info3.enr = some(node3.enr)
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
|
||||||
)
|
|
||||||
node1.wakuPeerExchange.enrCache.add(record)
|
|
||||||
|
|
||||||
# When requesting 0 peers
|
# When requesting 0 peers
|
||||||
let response = await node2.wakuPeerExchangeClient.request(0)
|
let response = await node2.wakuPeerExchangeClient.request(0)
|
||||||
@ -312,13 +313,6 @@ suite "Waku Peer Exchange":
|
|||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start()])
|
||||||
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])
|
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])
|
||||||
|
|
||||||
# Mock that we have discovered one enr
|
|
||||||
var record = enr.Record()
|
|
||||||
check record.fromUri(
|
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
|
||||||
)
|
|
||||||
node2.wakuPeerExchange.enrCache.add(record)
|
|
||||||
|
|
||||||
# When making any request with an invalid peer info
|
# When making any request with an invalid peer info
|
||||||
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
|
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
|
||||||
remotePeerInfo2.peerId.data.add(255.byte)
|
remotePeerInfo2.peerId.data.add(255.byte)
|
||||||
@ -362,17 +356,17 @@ suite "Waku Peer Exchange":
|
|||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
node2 =
|
node2 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
node3 =
|
||||||
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
|
||||||
# Start and mount peer exchange
|
# Start and mount peer exchange
|
||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
|
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
|
||||||
|
|
||||||
# Mock that we have discovered these enrs
|
# Simulate node1 discovering node3 via Discv5
|
||||||
var enr1 = enr.Record()
|
var info3 = node3.peerInfo.toRemotePeerInfo()
|
||||||
check enr1.fromUri(
|
info3.enr = some(node3.enr)
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
|
||||||
)
|
|
||||||
node1.wakuPeerExchange.enrCache.add(enr1)
|
|
||||||
|
|
||||||
# Create connection
|
# Create connection
|
||||||
let connOpt = await node2.peerManager.dialPeer(
|
let connOpt = await node2.peerManager.dialPeer(
|
||||||
@ -396,7 +390,7 @@ suite "Waku Peer Exchange":
|
|||||||
check:
|
check:
|
||||||
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
|
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
|
||||||
decodedBuff.get().response.peerInfos.len == 1
|
decodedBuff.get().response.peerInfos.len == 1
|
||||||
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
|
decodedBuff.get().response.peerInfos[0].enr == node3.enr.raw
|
||||||
|
|
||||||
asyncTest "RateLimit as expected":
|
asyncTest "RateLimit as expected":
|
||||||
let
|
let
|
||||||
@ -404,9 +398,11 @@ suite "Waku Peer Exchange":
|
|||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
node2 =
|
node2 =
|
||||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
node3 =
|
||||||
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
|
||||||
# Start and mount peer exchange
|
# Start and mount peer exchange
|
||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
await allFutures(
|
await allFutures(
|
||||||
[
|
[
|
||||||
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
|
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
|
||||||
@ -414,6 +410,11 @@ suite "Waku Peer Exchange":
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Simulate node1 discovering nodeA via Discv5
|
||||||
|
var info3 = node3.peerInfo.toRemotePeerInfo()
|
||||||
|
info3.enr = some(node3.enr)
|
||||||
|
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
|
||||||
|
|
||||||
# Create connection
|
# Create connection
|
||||||
let connOpt = await node2.peerManager.dialPeer(
|
let connOpt = await node2.peerManager.dialPeer(
|
||||||
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
|
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
|
||||||
@ -421,19 +422,6 @@ suite "Waku Peer Exchange":
|
|||||||
require:
|
require:
|
||||||
connOpt.isSome
|
connOpt.isSome
|
||||||
|
|
||||||
# Create some enr and add to peer exchange (simulating disv5)
|
|
||||||
var enr1, enr2 = enr.Record()
|
|
||||||
check enr1.fromUri(
|
|
||||||
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
|
|
||||||
)
|
|
||||||
check enr2.fromUri(
|
|
||||||
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mock that we have discovered these enrs
|
|
||||||
node1.wakuPeerExchange.enrCache.add(enr1)
|
|
||||||
node1.wakuPeerExchange.enrCache.add(enr2)
|
|
||||||
|
|
||||||
await sleepAsync(150.milliseconds)
|
await sleepAsync(150.milliseconds)
|
||||||
|
|
||||||
# Request 2 peer from px. Test all request variants
|
# Request 2 peer from px. Test all request variants
|
||||||
|
|||||||
@ -202,3 +202,17 @@ proc getPeersByCapability*(
|
|||||||
): seq[RemotePeerInfo] =
|
): seq[RemotePeerInfo] =
|
||||||
return
|
return
|
||||||
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))
|
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))
|
||||||
|
|
||||||
|
template forEnrPeers*(
|
||||||
|
peerStore: PeerStore,
|
||||||
|
peerId, peerConnectedness, peerOrigin, peerEnrRecord, body: untyped,
|
||||||
|
) =
|
||||||
|
let enrBook = peerStore[ENRBook]
|
||||||
|
let connBook = peerStore[ConnectionBook]
|
||||||
|
let sourceBook = peerStore[SourceBook]
|
||||||
|
for pid, enrRecord in tables.pairs(enrBook.book):
|
||||||
|
let peerId {.inject.} = pid
|
||||||
|
let peerConnectedness {.inject.} = connBook.book.getOrDefault(pid, NotConnected)
|
||||||
|
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
|
||||||
|
let peerEnrRecord {.inject.} = enrRecord
|
||||||
|
body
|
||||||
|
|||||||
@ -489,9 +489,6 @@ proc stop*(node: WakuNode) {.async.} =
|
|||||||
if not node.wakuStoreTransfer.isNil():
|
if not node.wakuStoreTransfer.isNil():
|
||||||
node.wakuStoreTransfer.stop()
|
node.wakuStoreTransfer.stop()
|
||||||
|
|
||||||
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
|
|
||||||
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()
|
|
||||||
|
|
||||||
if not node.wakuPeerExchangeClient.isNil() and
|
if not node.wakuPeerExchangeClient.isNil() and
|
||||||
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
||||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||||
|
|||||||
@ -22,7 +22,6 @@ export WakuPeerExchangeCodec
|
|||||||
|
|
||||||
declarePublicGauge waku_px_peers_received_unknown,
|
declarePublicGauge waku_px_peers_received_unknown,
|
||||||
"number of previously unknown ENRs received via peer exchange"
|
"number of previously unknown ENRs received via peer exchange"
|
||||||
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
|
|
||||||
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
|
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
|
||||||
declarePublicCounter waku_px_peers_sent,
|
declarePublicCounter waku_px_peers_sent,
|
||||||
"number of ENRs sent to peer exchange requesters"
|
"number of ENRs sent to peer exchange requesters"
|
||||||
@ -32,11 +31,9 @@ logScope:
|
|||||||
|
|
||||||
type WakuPeerExchange* = ref object of LPProtocol
|
type WakuPeerExchange* = ref object of LPProtocol
|
||||||
peerManager*: PeerManager
|
peerManager*: PeerManager
|
||||||
enrCache*: seq[enr.Record]
|
|
||||||
cluster*: Option[uint16]
|
cluster*: Option[uint16]
|
||||||
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||||
requestRateLimiter*: RequestRateLimiter
|
requestRateLimiter*: RequestRateLimiter
|
||||||
pxLoopHandle*: Future[void]
|
|
||||||
|
|
||||||
proc respond(
|
proc respond(
|
||||||
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
|
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
|
||||||
@ -79,61 +76,50 @@ proc respondError(
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc getEnrsFromCache(
|
proc poolFilter*(
|
||||||
wpx: WakuPeerExchange, numPeers: uint64
|
cluster: Option[uint16], origin: PeerOrigin, enr: enr.Record
|
||||||
): seq[enr.Record] {.gcsafe.} =
|
): Result[void, string] =
|
||||||
if wpx.enrCache.len() == 0:
|
if origin != Discv5:
|
||||||
info "peer exchange ENR cache is empty"
|
trace "peer not from discv5", origin = $origin
|
||||||
return @[]
|
return err("peer not from discv5: " & $origin)
|
||||||
|
if cluster.isSome() and enr.isClusterMismatched(cluster.get()):
|
||||||
# copy and shuffle
|
trace "peer has mismatching cluster"
|
||||||
randomize()
|
return err("peer has mismatching cluster")
|
||||||
var shuffledCache = wpx.enrCache
|
return ok()
|
||||||
shuffledCache.shuffle()
|
|
||||||
|
|
||||||
# return numPeers or less if cache is smaller
|
|
||||||
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]
|
|
||||||
|
|
||||||
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
|
|
||||||
if peer.origin != Discv5:
|
|
||||||
trace "peer not from discv5", peer = $peer, origin = $peer.origin
|
|
||||||
return false
|
|
||||||
|
|
||||||
|
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): Result[void, string] =
|
||||||
if peer.enr.isNone():
|
if peer.enr.isNone():
|
||||||
info "peer has no ENR", peer = $peer
|
info "peer has no ENR", peer = $peer
|
||||||
return false
|
return err("peer has no ENR: " & $peer)
|
||||||
|
return poolFilter(cluster, peer.origin, peer.enr.get())
|
||||||
|
|
||||||
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
|
proc getEnrsFromStore(
|
||||||
info "peer has mismatching cluster", peer = $peer
|
wpx: WakuPeerExchange, numPeers: uint64
|
||||||
return false
|
): seq[enr.Record] {.gcsafe.} =
|
||||||
|
# Reservoir sampling (Algorithm R)
|
||||||
return true
|
var i = 0
|
||||||
|
let k = min(MaxPeersCacheSize, numPeers.int)
|
||||||
proc populateEnrCache(wpx: WakuPeerExchange) =
|
let enrStoreLen = wpx.peerManager.switch.peerStore[ENRBook].len
|
||||||
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
|
var enrs = newSeqOfCap[enr.Record](min(k, enrStoreLen))
|
||||||
let withEnr = wpx.peerManager.switch.peerStore.getReachablePeers().filterIt(
|
wpx.peerManager.switch.peerStore.forEnrPeers(
|
||||||
poolFilter(wpx.cluster, it)
|
peerId, peerConnectedness, peerOrigin, peerEnrRecord
|
||||||
)
|
):
|
||||||
|
if peerConnectedness == CannotConnect:
|
||||||
# either what we have or max cache size
|
debug "Could not retrieve ENR because cannot connect to peer",
|
||||||
var newEnrCache = newSeq[enr.Record](0)
|
remotePeerId = peerId
|
||||||
for i in 0 ..< min(withEnr.len, MaxPeersCacheSize):
|
continue
|
||||||
newEnrCache.add(withEnr[i].enr.get())
|
poolFilter(wpx.cluster, peerOrigin, peerEnrRecord).isOkOr:
|
||||||
|
debug "Could not get ENR because no peer matched pool", error = error
|
||||||
# swap cache for new
|
continue
|
||||||
wpx.enrCache = newEnrCache
|
if i < k:
|
||||||
trace "ENR cache populated"
|
enrs.add(peerEnrRecord)
|
||||||
|
else:
|
||||||
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
|
# Add some randomness
|
||||||
# try more aggressively to fill the cache at startup
|
let j = rand(i)
|
||||||
var attempts = 50
|
if j < k:
|
||||||
while wpx.enrCache.len < MaxPeersCacheSize and attempts > 0:
|
enrs[j] = peerEnrRecord
|
||||||
attempts -= 1
|
inc(i)
|
||||||
wpx.populateEnrCache()
|
return enrs
|
||||||
await sleepAsync(1.seconds)
|
|
||||||
|
|
||||||
heartbeat "Updating px enr cache", CacheRefreshInterval:
|
|
||||||
wpx.populateEnrCache()
|
|
||||||
|
|
||||||
proc initProtocolHandler(wpx: WakuPeerExchange) =
|
proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||||
@ -174,7 +160,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
|
|||||||
error "Failed to respond with BAD_REQUEST:", error = $error
|
error "Failed to respond with BAD_REQUEST:", error = $error
|
||||||
return
|
return
|
||||||
|
|
||||||
let enrs = wpx.getEnrsFromCache(decBuf.request.numPeers)
|
let enrs = wpx.getEnrsFromStore(decBuf.request.numPeers)
|
||||||
|
|
||||||
info "peer exchange request received"
|
info "peer exchange request received"
|
||||||
trace "px enrs to respond", enrs = $enrs
|
trace "px enrs to respond", enrs = $enrs
|
||||||
try:
|
try:
|
||||||
@ -214,5 +201,4 @@ proc new*(
|
|||||||
)
|
)
|
||||||
wpx.initProtocolHandler()
|
wpx.initProtocolHandler()
|
||||||
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
|
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
|
||||||
asyncSpawn wpx.updatePxEnrCache()
|
|
||||||
return wpx
|
return wpx
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user