mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-27 09:13:15 +00:00
feat: remove --relay-peer-exchange option
Not a protocol we want to use now.
This commit is contained in:
parent
5674377b6f
commit
4b72ad443e
@ -77,7 +77,6 @@ import
|
||||
./test_waku_keepalive,
|
||||
./test_waku_enr,
|
||||
./test_waku_dnsdisc,
|
||||
./test_relay_peer_exchange,
|
||||
./test_waku_noise,
|
||||
./test_waku_noise_sessions,
|
||||
./test_waku_netconfig,
|
||||
|
||||
@ -1,96 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, options],
|
||||
stew/shims/net,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/gossipsub
|
||||
|
||||
import waku/waku_core, waku/waku_node, ./testlib/wakucore, ./testlib/wakunode
|
||||
|
||||
procSuite "Relay (GossipSub) Peer Exchange":
|
||||
asyncTest "Mount relay without peer exchange handler":
|
||||
# Given two nodes
|
||||
let
|
||||
listenAddress = parseIpAddress("0.0.0.0")
|
||||
port = Port(0)
|
||||
node1Key = generateSecp256k1Key()
|
||||
node1 = newTestWakuNode(node1Key, listenAddress, port)
|
||||
node2Key = generateSecp256k1Key()
|
||||
node2 =
|
||||
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)
|
||||
|
||||
# When both client and server mount relay without a handler
|
||||
await node1.mountRelay(@[DefaultPubsubTopic])
|
||||
await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler))
|
||||
|
||||
# Then the relays are mounted without a handler
|
||||
check:
|
||||
node1.wakuRelay.parameters.enablePX == false
|
||||
node1.wakuRelay.routingRecordsHandler.len == 0
|
||||
node2.wakuRelay.parameters.enablePX == false
|
||||
node2.wakuRelay.routingRecordsHandler.len == 0
|
||||
|
||||
asyncTest "Mount relay with peer exchange handler":
|
||||
## Given three nodes
|
||||
# Create nodes and ENR. These will be added to the discoverable list
|
||||
let
|
||||
bindIp = parseIpAddress("0.0.0.0")
|
||||
port = Port(0)
|
||||
nodeKey1 = generateSecp256k1Key()
|
||||
node1 = newTestWakuNode(nodeKey1, bindIp, port)
|
||||
nodeKey2 = generateSecp256k1Key()
|
||||
node2 = newTestWakuNode(nodeKey2, bindIp, port, sendSignedPeerRecord = true)
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = newTestWakuNode(nodeKey3, bindIp, port, sendSignedPeerRecord = true)
|
||||
|
||||
# Given some peer exchange handlers
|
||||
proc emptyPeerExchangeHandler(
|
||||
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
|
||||
) {.gcsafe.} =
|
||||
discard
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
proc peerExchangeHandler(
|
||||
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
|
||||
) {.gcsafe.} =
|
||||
## Handle peers received via gossipsub peer exchange
|
||||
let peerRecords = peers.mapIt(it.record.get())
|
||||
|
||||
check:
|
||||
# Node 3 is informed of node 2 via peer exchange
|
||||
peer == node1.switch.peerInfo.peerId
|
||||
topic == DefaultPubsubTopic
|
||||
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
|
||||
|
||||
if (not completionFut.completed()):
|
||||
completionFut.complete(true)
|
||||
|
||||
let
|
||||
emptyPeerExchangeHandle: RoutingRecordsHandler = emptyPeerExchangeHandler
|
||||
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler
|
||||
|
||||
# Givem the nodes mount relay with a peer exchange handler
|
||||
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
|
||||
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
|
||||
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle))
|
||||
|
||||
# Ensure that node1 prunes all peers after the first connection
|
||||
node1.wakuRelay.parameters.dHigh = 1
|
||||
|
||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
|
||||
# When nodes are connected
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# Verify that the handlePeerExchange was called (node3)
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
# Clean up
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
@ -247,12 +247,6 @@ type WakuNodeConf* = object
|
||||
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"
|
||||
.}: bool
|
||||
|
||||
relayPeerExchange* {.
|
||||
desc: "Enable gossipsub peer exchange in relay protocol: true|false",
|
||||
defaultValue: false,
|
||||
name: "relay-peer-exchange"
|
||||
.}: bool
|
||||
|
||||
relayShardedPeerManagement* {.
|
||||
desc:
|
||||
"Enable experimental shard aware peer manager for relay protocol: true|false",
|
||||
|
||||
@ -94,8 +94,6 @@ proc initNode(
|
||||
secureKey = some(conf.websocketSecureKeyPath),
|
||||
secureCert = some(conf.websocketSecureCertPath),
|
||||
nameResolver = dnsResolver,
|
||||
sendSignedPeerRecord = conf.relayPeerExchange,
|
||||
# We send our own signed peer record when peer exchange enabled
|
||||
agentString = some("nwaku"),
|
||||
)
|
||||
builder.withColocationLimit(conf.colocationLimit)
|
||||
@ -131,26 +129,6 @@ proc setupProtocols(
|
||||
return err("failed to mount waku sharding: " & error)
|
||||
|
||||
# Mount relay on all nodes
|
||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||
if conf.relayPeerExchange:
|
||||
proc handlePeerExchange(
|
||||
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
|
||||
) {.gcsafe.} =
|
||||
## Handle peers received via gossipsub peer exchange
|
||||
# TODO: Only consider peers on pubsub topics we subscribe to
|
||||
let exchangedPeers = peers.filterIt(it.record.isSome())
|
||||
# only peers with populated records
|
||||
.mapIt(toRemotePeerInfo(it.record.get()))
|
||||
|
||||
debug "adding exchanged peers",
|
||||
src = peer, topic = topic, numPeers = exchangedPeers.len
|
||||
|
||||
for peer in exchangedPeers:
|
||||
# Peers added are filtered by the peer manager
|
||||
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
|
||||
|
||||
peerExchangeHandler = some(handlePeerExchange)
|
||||
|
||||
let shards =
|
||||
conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard"))
|
||||
debug "Shards created from content topics",
|
||||
@ -168,7 +146,6 @@ proc setupProtocols(
|
||||
await mountRelay(
|
||||
node,
|
||||
pubsubTopics,
|
||||
peerExchangeHandler = peerExchangeHandler,
|
||||
int(parsedMaxMsgSize),
|
||||
)
|
||||
except CatchableError:
|
||||
|
||||
@ -446,7 +446,6 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
proc mountRelay*(
|
||||
node: WakuNode,
|
||||
pubsubTopics: seq[string] = @[],
|
||||
peerExchangeHandler = none(RoutingRecordsHandler),
|
||||
maxMessageSize = int(DefaultMaxWakuMessageSize),
|
||||
) {.async, gcsafe.} =
|
||||
if not node.wakuRelay.isNil():
|
||||
@ -463,12 +462,6 @@ proc mountRelay*(
|
||||
|
||||
node.wakuRelay = initRes.value
|
||||
|
||||
## Add peer exchange handler
|
||||
if peerExchangeHandler.isSome():
|
||||
node.wakuRelay.parameters.enablePX = true
|
||||
# Feature flag for peer exchange in nim-libp2p
|
||||
node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
||||
|
||||
if node.started:
|
||||
await node.startRelay()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user