mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
feat: waku peer exchange (RFC34) (#1152)
This commit is contained in:
parent
568ead13af
commit
3e894b0072
@ -27,6 +27,7 @@ import ../../waku/v2/protocol/waku_message,
|
|||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/node/[wakunode2, waku_payload],
|
../../waku/v2/node/[wakunode2, waku_payload],
|
||||||
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
||||||
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/[peers, time],
|
../../waku/v2/utils/[peers, time],
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
./config_chat2
|
./config_chat2
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import
|
|||||||
# Waku Filter
|
# Waku Filter
|
||||||
./v2/test_waku_filter,
|
./v2/test_waku_filter,
|
||||||
./v2/test_wakunode_filter,
|
./v2/test_wakunode_filter,
|
||||||
|
./v2/test_waku_peer_exchange,
|
||||||
./v2/test_waku_payload,
|
./v2/test_waku_payload,
|
||||||
./v2/test_waku_swap,
|
./v2/test_waku_swap,
|
||||||
./v2/test_utils_peers,
|
./v2/test_utils_peers,
|
||||||
|
|||||||
146
tests/v2/test_waku_peer_exchange.nim
Normal file
146
tests/v2/test_waku_peer_exchange.nim
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options, tables, sets],
|
||||||
|
testutils/unittests,
|
||||||
|
chronos,
|
||||||
|
chronicles,
|
||||||
|
stew/shims/net,
|
||||||
|
libp2p/switch,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/multistream,
|
||||||
|
eth/keys,
|
||||||
|
eth/p2p/discoveryv5/enr
|
||||||
|
import
|
||||||
|
../../waku/v2/node/wakunode2,
|
||||||
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
|
../../waku/v2/node/discv5/waku_discv5,
|
||||||
|
../../waku/v2/protocol/waku_peer_exchange,
|
||||||
|
../../waku/v2/protocol/waku_relay,
|
||||||
|
../test_helpers,
|
||||||
|
./utils
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Extend test coverage
|
||||||
|
procSuite "Waku Peer Exchange":
|
||||||
|
|
||||||
|
asyncTest "encode and decode peer exchange response":
|
||||||
|
## Setup
|
||||||
|
var
|
||||||
|
enr1 = enr.Record(seqNum: 0, raw: @[])
|
||||||
|
enr2 = enr.Record(seqNum: 0, raw: @[])
|
||||||
|
|
||||||
|
discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
|
||||||
|
discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
|
||||||
|
|
||||||
|
let peerInfos = @[
|
||||||
|
PeerExchangePeerInfo(enr: enr1.raw),
|
||||||
|
PeerExchangePeerInfo(enr: enr2.raw),
|
||||||
|
]
|
||||||
|
|
||||||
|
var rpc = PeerExchangeRpc(
|
||||||
|
response: PeerExchangeResponse(
|
||||||
|
peerInfos: peerInfos
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let
|
||||||
|
rpcBuffer: seq[byte] = rpc.encode().buffer
|
||||||
|
res = PeerExchangeRpc.init(rpcBuffer)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk
|
||||||
|
res.get().response.peerInfos == peerInfos
|
||||||
|
|
||||||
|
## When
|
||||||
|
var
|
||||||
|
resEnr1 = enr.Record(seqNum: 0, raw: @[])
|
||||||
|
resEnr2 = enr.Record(seqNum: 0, raw: @[])
|
||||||
|
|
||||||
|
discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr)
|
||||||
|
discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
resEnr1 == enr1
|
||||||
|
resEnr2 == enr2
|
||||||
|
|
||||||
|
asyncTest "retrieve and provide peer exchange peers from discv5":
|
||||||
|
## Setup (copied from test_waku_discv5.nim)
|
||||||
|
let
|
||||||
|
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||||
|
extIp = ValidIpAddress.init("127.0.0.1")
|
||||||
|
|
||||||
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
nodeTcpPort1 = Port(60000)
|
||||||
|
nodeUdpPort1 = Port(9000)
|
||||||
|
node1 = WakuNode.new(nodeKey1, bindIp, nodeTcpPort1)
|
||||||
|
|
||||||
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
nodeTcpPort2 = Port(60002)
|
||||||
|
nodeUdpPort2 = Port(9002)
|
||||||
|
node2 = WakuNode.new(nodeKey2, bindIp, nodeTcpPort2)
|
||||||
|
|
||||||
|
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
nodeTcpPort3 = Port(60004)
|
||||||
|
nodeUdpPort3 = Port(9004)
|
||||||
|
node3 = WakuNode.new(nodeKey3, bindIp, nodeTcpPort3)
|
||||||
|
|
||||||
|
# todo: px flag
|
||||||
|
flags = initWakuFlags(lightpush = false,
|
||||||
|
filter = false,
|
||||||
|
store = false,
|
||||||
|
relay = true)
|
||||||
|
|
||||||
|
# Mount discv5
|
||||||
|
node1.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
|
some(extIp), some(nodeTcpPort1), some(nodeUdpPort1),
|
||||||
|
bindIp,
|
||||||
|
nodeUdpPort1,
|
||||||
|
newSeq[string](),
|
||||||
|
false,
|
||||||
|
keys.PrivateKey(nodeKey1.skkey),
|
||||||
|
flags,
|
||||||
|
[], # Empty enr fields, for now
|
||||||
|
node1.rng
|
||||||
|
)
|
||||||
|
|
||||||
|
node2.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
|
some(extIp), some(nodeTcpPort2), some(nodeUdpPort2),
|
||||||
|
bindIp,
|
||||||
|
nodeUdpPort2,
|
||||||
|
@[node1.wakuDiscv5.protocol.localNode.record.toURI()], # Bootstrap with node1
|
||||||
|
false,
|
||||||
|
keys.PrivateKey(nodeKey2.skkey),
|
||||||
|
flags,
|
||||||
|
[], # Empty enr fields, for now
|
||||||
|
node2.rng
|
||||||
|
)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||||
|
await allFutures([node1.startDiscv5(), node2.startDiscv5()])
|
||||||
|
|
||||||
|
# Mount peer exchange
|
||||||
|
await node1.mountWakuPeerExchange()
|
||||||
|
await node3.mountWakuPeerExchange()
|
||||||
|
|
||||||
|
await sleepAsync(3000.millis) # Give the algorithm some time to work its magic
|
||||||
|
|
||||||
|
asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()
|
||||||
|
|
||||||
|
node3.wakuPeerExchange.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
## When
|
||||||
|
discard waitFor node3.wakuPeerExchange.request(1)
|
||||||
|
|
||||||
|
await sleepAsync(2000.millis) # Give the algorithm some time to work its magic
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
node1.wakuDiscv5.protocol.nodesDiscovered > 0
|
||||||
|
node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId)
|
||||||
|
|
||||||
|
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||||
@ -20,6 +20,7 @@ import
|
|||||||
../v2/utils/time,
|
../v2/utils/time,
|
||||||
../v2/protocol/waku_message,
|
../v2/protocol/waku_message,
|
||||||
../v2/node/wakunode2,
|
../v2/node/wakunode2,
|
||||||
|
../v2/node/peer_manager/peer_manager,
|
||||||
# Common cli config
|
# Common cli config
|
||||||
./config_bridge
|
./config_bridge
|
||||||
|
|
||||||
|
|||||||
@ -390,6 +390,17 @@ type
|
|||||||
defaultValue: 1
|
defaultValue: 1
|
||||||
name: "discv5-bits-per-hop" .}: int
|
name: "discv5-bits-per-hop" .}: int
|
||||||
|
|
||||||
|
## waku peer exchange config
|
||||||
|
peerExchange* {.
|
||||||
|
desc: "Enable waku peer exchange protocol (responder side): true|false",
|
||||||
|
defaultValue: false
|
||||||
|
name: "peer-exchange" }: bool
|
||||||
|
|
||||||
|
peerExchangeNode* {.
|
||||||
|
desc: "Peer multiaddr to send peer exchange requests to. (enables peer exchange protocol requester side)",
|
||||||
|
defaultValue: ""
|
||||||
|
name: "peer-exchange-node" }: string
|
||||||
|
|
||||||
## websocket config
|
## websocket config
|
||||||
websocketSupport* {.
|
websocketSupport* {.
|
||||||
desc: "Enable websocket: true|false",
|
desc: "Enable websocket: true|false",
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import
|
|||||||
export waku_peer_store, peer_storage, peers
|
export waku_peer_store, peer_storage, peers
|
||||||
|
|
||||||
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
||||||
|
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
||||||
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -292,3 +293,48 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def
|
|||||||
let addrs = pm.switch.peerStore[AddressBook][peerId]
|
let addrs = pm.switch.peerStore[AddressBook][peerId]
|
||||||
|
|
||||||
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)
|
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)
|
||||||
|
|
||||||
|
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
|
||||||
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
|
info "Connecting to node", remotePeer = remotePeer, source = source
|
||||||
|
|
||||||
|
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||||
|
let connOpt = await pm.dialPeer(remotePeer, proto)
|
||||||
|
|
||||||
|
if connOpt.isSome():
|
||||||
|
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||||
|
waku_node_conns_initiated.inc(labelValues = [source])
|
||||||
|
else:
|
||||||
|
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||||
|
waku_peers_errors.inc(labelValues = ["conn_init_failure"])
|
||||||
|
|
||||||
|
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
|
||||||
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
|
info "connectToNodes", len = nodes.len
|
||||||
|
|
||||||
|
for nodeId in nodes:
|
||||||
|
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
|
||||||
|
|
||||||
|
# The issue seems to be around peers not being fully connected when
|
||||||
|
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
||||||
|
# fully connected.
|
||||||
|
#
|
||||||
|
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
||||||
|
# later.
|
||||||
|
await sleepAsync(chronos.seconds(5))
|
||||||
|
|
||||||
|
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
|
||||||
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
|
info "connectToNodes", len = nodes.len
|
||||||
|
|
||||||
|
for remotePeerInfo in nodes:
|
||||||
|
await connectToNode(pm, remotePeerInfo, proto, source)
|
||||||
|
|
||||||
|
# The issue seems to be around peers not being fully connected when
|
||||||
|
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
||||||
|
# fully connected.
|
||||||
|
#
|
||||||
|
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
||||||
|
# later.
|
||||||
|
await sleepAsync(chronos.seconds(5))
|
||||||
|
|
||||||
|
|||||||
@ -20,7 +20,8 @@ import
|
|||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
|
../protocol/waku_peer_exchange,
|
||||||
../utils/[peers, requests, wakuenr],
|
../utils/[peers, requests, wakuenr],
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./storage/message/waku_store_queue,
|
./storage/message/waku_store_queue,
|
||||||
@ -42,7 +43,6 @@ when defined(rln):
|
|||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
||||||
declarePublicCounter waku_node_conns_initiated, "number of connections initiated by this node", ["source"]
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode"
|
topics = "wakunode"
|
||||||
@ -564,6 +564,20 @@ proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
|||||||
|
|
||||||
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
||||||
|
|
||||||
|
proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||||
|
info "mounting waku peer exchange"
|
||||||
|
|
||||||
|
var discv5Opt: Option[WakuDiscoveryV5]
|
||||||
|
if not node.wakuDiscV5.isNil():
|
||||||
|
discv5Opt = some(node.wakuDiscV5)
|
||||||
|
node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt)
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start Waku peer exchange too.
|
||||||
|
await node.wakuPeerExchange.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
||||||
|
|
||||||
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||||
info "mounting libp2p ping protocol"
|
info "mounting libp2p ping protocol"
|
||||||
|
|
||||||
@ -611,22 +625,6 @@ proc startKeepalive*(node: WakuNode) =
|
|||||||
|
|
||||||
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
||||||
|
|
||||||
## Helpers
|
|
||||||
proc connectToNode(n: WakuNode, remotePeer: RemotePeerInfo, source = "api") {.async.} =
|
|
||||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
||||||
info "Connecting to node", remotePeer = remotePeer, source = source
|
|
||||||
|
|
||||||
# NOTE This is dialing on WakuRelay protocol specifically
|
|
||||||
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
||||||
let connOpt = await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
|
|
||||||
|
|
||||||
if connOpt.isSome():
|
|
||||||
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
||||||
waku_node_conns_initiated.inc(labelValues = [source])
|
|
||||||
else:
|
|
||||||
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
||||||
waku_node_errors.inc(labelValues = ["conn_init_failure"])
|
|
||||||
|
|
||||||
proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) =
|
proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) =
|
||||||
n.wakuStore.setPeer(peer)
|
n.wakuStore.setPeer(peer)
|
||||||
|
|
||||||
@ -654,35 +652,18 @@ proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueErr
|
|||||||
let peer = parseRemotePeerInfo(address)
|
let peer = parseRemotePeerInfo(address)
|
||||||
n.wakuLightPush.setPeer(peer)
|
n.wakuLightPush.setPeer(peer)
|
||||||
|
|
||||||
proc connectToNodes*(n: WakuNode, nodes: seq[string], source = "api") {.async.} =
|
proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||||
|
info "Set peer exchange peer", address = address
|
||||||
|
|
||||||
|
let remotePeer = parseRemotePeerInfo(address)
|
||||||
|
|
||||||
|
n.wakuPeerExchange.setPeer(remotePeer)
|
||||||
|
|
||||||
|
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
|
||||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
info "connectToNodes", len = nodes.len
|
# NOTE This is dialing on WakuRelay protocol specifically
|
||||||
|
await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source)
|
||||||
for nodeId in nodes:
|
|
||||||
await connectToNode(n, parseRemotePeerInfo(nodeId), source)
|
|
||||||
|
|
||||||
# The issue seems to be around peers not being fully connected when
|
|
||||||
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
|
||||||
# fully connected.
|
|
||||||
#
|
|
||||||
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
|
||||||
# later.
|
|
||||||
await sleepAsync(5.seconds)
|
|
||||||
|
|
||||||
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo], source = "api") {.async.} =
|
|
||||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
||||||
info "connectToNodes", len = nodes.len
|
|
||||||
|
|
||||||
for remotePeerInfo in nodes:
|
|
||||||
await connectToNode(n, remotePeerInfo, source)
|
|
||||||
|
|
||||||
# The issue seems to be around peers not being fully connected when
|
|
||||||
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
|
||||||
# fully connected.
|
|
||||||
#
|
|
||||||
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
|
||||||
# later.
|
|
||||||
await sleepAsync(5.seconds)
|
|
||||||
|
|
||||||
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||||
## Continuously add newly discovered nodes
|
## Continuously add newly discovered nodes
|
||||||
@ -933,7 +914,7 @@ when isMainModule:
|
|||||||
trace "resolving", domain=domain
|
trace "resolving", domain=domain
|
||||||
let resolved = await dnsResolver.resolveTxt(domain)
|
let resolved = await dnsResolver.resolveTxt(domain)
|
||||||
return resolved[0] # Use only first answer
|
return resolved[0] # Use only first answer
|
||||||
|
|
||||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
|
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
|
||||||
resolver)
|
resolver)
|
||||||
if wakuDnsDiscovery.isOk:
|
if wakuDnsDiscovery.isOk:
|
||||||
@ -1107,6 +1088,13 @@ when isMainModule:
|
|||||||
if conf.filternode != "":
|
if conf.filternode != "":
|
||||||
setFilterPeer(node, conf.filternode)
|
setFilterPeer(node, conf.filternode)
|
||||||
|
|
||||||
|
# waku peer exchange setup
|
||||||
|
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
||||||
|
waitFor mountWakuPeerExchange(node)
|
||||||
|
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
setPeerExchangePeer(node, conf.peerExchangeNode)
|
||||||
|
|
||||||
ok(true) # Success
|
ok(true) # Success
|
||||||
|
|
||||||
# 5/7 Start node and mounted protocols
|
# 5/7 Start node and mounted protocols
|
||||||
@ -1135,7 +1123,13 @@ when isMainModule:
|
|||||||
if dynamicBootstrapNodes.len > 0:
|
if dynamicBootstrapNodes.len > 0:
|
||||||
info "Connecting to dynamic bootstrap peers"
|
info "Connecting to dynamic bootstrap peers"
|
||||||
waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
||||||
|
|
||||||
|
# retrieve and connect to peer exchange peers
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
info "Retrieving peer info via peer exchange protocol"
|
||||||
|
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
||||||
|
discard waitFor node.wakuPeerExchange.request(desiredOutDegree)
|
||||||
|
|
||||||
# Start keepalive, if enabled
|
# Start keepalive, if enabled
|
||||||
if conf.keepAlive:
|
if conf.keepAlive:
|
||||||
node.startKeepalive()
|
node.startKeepalive()
|
||||||
|
|||||||
@ -9,10 +9,12 @@ import
|
|||||||
metrics/chronos_httpserver,
|
metrics/chronos_httpserver,
|
||||||
./config,
|
./config,
|
||||||
./wakunode2,
|
./wakunode2,
|
||||||
|
./peer_manager/peer_manager,
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_store,
|
../protocol/waku_store,
|
||||||
../protocol/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
../protocol/waku_swap/waku_swap
|
../protocol/waku_swap/waku_swap,
|
||||||
|
../protocol/waku_peer_exchange
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode.setup.metrics"
|
topics = "wakunode.setup.metrics"
|
||||||
@ -64,10 +66,11 @@ proc startMetricsLog*() =
|
|||||||
info "Total filter peers", count = parseCollectorIntoF64(waku_filter_peers)
|
info "Total filter peers", count = parseCollectorIntoF64(waku_filter_peers)
|
||||||
info "Total store peers", count = parseCollectorIntoF64(waku_store_peers)
|
info "Total store peers", count = parseCollectorIntoF64(waku_store_peers)
|
||||||
info "Total lightpush peers", count = parseCollectorIntoF64(waku_lightpush_peers)
|
info "Total lightpush peers", count = parseCollectorIntoF64(waku_lightpush_peers)
|
||||||
|
info "Total peer exchange peers", count = parseCollectorIntoF64(waku_px_peers)
|
||||||
info "Total errors", count = freshErrorCount
|
info "Total errors", count = freshErrorCount
|
||||||
info "Total active filter subscriptions", count = parseCollectorIntoF64(waku_filter_subscribers)
|
info "Total active filter subscriptions", count = parseCollectorIntoF64(waku_filter_subscribers)
|
||||||
|
|
||||||
discard setTimer(Moment.fromNow(30.seconds), logMetrics)
|
discard setTimer(Moment.fromNow(30.seconds), logMetrics)
|
||||||
|
|
||||||
discard setTimer(Moment.fromNow(30.seconds), logMetrics)
|
discard setTimer(Moment.fromNow(30.seconds), logMetrics)
|
||||||
|
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import
|
|||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_lightpush,
|
../protocol/waku_lightpush,
|
||||||
|
../protocol/waku_peer_exchange,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./discv5/waku_discv5
|
./discv5/waku_discv5
|
||||||
@ -39,6 +40,7 @@ type
|
|||||||
wakuSwap*: WakuSwap
|
wakuSwap*: WakuSwap
|
||||||
wakuRlnRelay*: WakuRLNRelay
|
wakuRlnRelay*: WakuRLNRelay
|
||||||
wakuLightPush*: WakuLightPush
|
wakuLightPush*: WakuLightPush
|
||||||
|
wakuPeerExchange*: WakuPeerExchange
|
||||||
enr*: enr.Record
|
enr*: enr.Record
|
||||||
libp2pPing*: Ping
|
libp2pPing*: Ping
|
||||||
filters*: Filters
|
filters*: Filters
|
||||||
|
|||||||
13
waku/v2/protocol/waku_peer_exchange.nim
Normal file
13
waku/v2/protocol/waku_peer_exchange.nim
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
./waku_peer_exchange/rpc,
|
||||||
|
./waku_peer_exchange/rpc_codec,
|
||||||
|
./waku_peer_exchange/protocol,
|
||||||
|
./waku_peer_exchange/client
|
||||||
|
|
||||||
|
export
|
||||||
|
rpc,
|
||||||
|
rpc_codec,
|
||||||
|
protocol,
|
||||||
|
client
|
||||||
3
waku/v2/protocol/waku_peer_exchange/README.md
Normal file
3
waku/v2/protocol/waku_peer_exchange/README.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# Waku Peer Exchange
|
||||||
|
|
||||||
|
Implementation of [34/WAKU2-PEER-EXCHANGE](https://rfc.vac.dev/spec/34/).
|
||||||
9
waku/v2/protocol/waku_peer_exchange/client.nim
Normal file
9
waku/v2/protocol/waku_peer_exchange/client.nim
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[tables, sequtils],
|
||||||
|
chronicles
|
||||||
|
import
|
||||||
|
../waku_message,
|
||||||
|
./rpc
|
||||||
|
|
||||||
209
waku/v2/protocol/waku_peer_exchange/protocol.nim
Normal file
209
waku/v2/protocol/waku_peer_exchange/protocol.nim
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
import
|
||||||
|
std/[options, sets, tables, sequtils, random],
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
metrics,
|
||||||
|
libp2p/protocols/protocol,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
eth/p2p/discoveryv5/enr
|
||||||
|
import
|
||||||
|
../../node/peer_manager/peer_manager,
|
||||||
|
../../node/discv5/waku_discv5,
|
||||||
|
../../utils/requests,
|
||||||
|
../waku_message,
|
||||||
|
../waku_relay,
|
||||||
|
./rpc,
|
||||||
|
./rpc_codec
|
||||||
|
|
||||||
|
|
||||||
|
declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"
|
||||||
|
declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange"
|
||||||
|
declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange"
|
||||||
|
declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters"
|
||||||
|
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
|
||||||
|
declarePublicGauge waku_px_errors, "number of peer exchange errors", ["type"]
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakupx"
|
||||||
|
|
||||||
|
|
||||||
|
const
|
||||||
|
# We add a 64kB safety buffer for protocol overhead.
|
||||||
|
# 10x-multiplier also for safety
|
||||||
|
MaxRpcSize = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
|
||||||
|
MaxCacheSize = 1000
|
||||||
|
CacheCleanWindow = 200
|
||||||
|
|
||||||
|
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
|
||||||
|
|
||||||
|
# Error types (metric label values)
|
||||||
|
const
|
||||||
|
dialFailure = "dial_failure"
|
||||||
|
peerNotFoundFailure = "peer_not_found_failure"
|
||||||
|
decodeRpcFailure = "decode_rpc_failure"
|
||||||
|
retrievePeersDiscv5Error= "retrieve_peers_discv5_failure"
|
||||||
|
pxFailure = "px_failure"
|
||||||
|
|
||||||
|
type
|
||||||
|
WakuPeerExchangeResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
WakuPeerExchange* = ref object of LPProtocol
|
||||||
|
peerManager*: PeerManager
|
||||||
|
wakuDiscv5: Option[WakuDiscoveryV5]
|
||||||
|
enrCache: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||||
|
|
||||||
|
proc sendPeerExchangeRpcToPeer(wpx: WakuPeerExchange, rpc: PeerExchangeRpc, peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||||
|
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
|
||||||
|
if connOpt.isNone():
|
||||||
|
return err(dialFailure)
|
||||||
|
|
||||||
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||||
|
let rpc = PeerExchangeRpc(
|
||||||
|
request: PeerExchangeRequest(
|
||||||
|
numPeers: numPeers
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer)
|
||||||
|
if res.isErr():
|
||||||
|
waku_px_errors.inc(labelValues = [res.error()])
|
||||||
|
return err(res.error())
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||||
|
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err(peerNotFoundFailure)
|
||||||
|
|
||||||
|
return await wpx.request(numPeers, peerOpt.get())
|
||||||
|
|
||||||
|
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||||
|
var peerInfos: seq[PeerExchangePeerInfo] = @[]
|
||||||
|
for e in enrs:
|
||||||
|
let pi = PeerExchangePeerInfo(
|
||||||
|
enr: e.raw
|
||||||
|
)
|
||||||
|
peerInfos.add(pi)
|
||||||
|
|
||||||
|
let rpc = PeerExchangeRpc(
|
||||||
|
response: PeerExchangeResponse(
|
||||||
|
peerInfos: peerInfos
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer)
|
||||||
|
if res.isErr():
|
||||||
|
waku_px_errors.inc(labelValues = [res.error()])
|
||||||
|
return err(res.error())
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||||
|
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err(peerNotFoundFailure)
|
||||||
|
|
||||||
|
return await wpx.respond(enrs, peerOpt.get())
|
||||||
|
|
||||||
|
proc cleanCache(px: WakuPeerExchange) {.gcsafe.} =
|
||||||
|
px.enrCache.delete(0, CacheCleanWindow-1)
|
||||||
|
|
||||||
|
proc runPeerExchangeDiscv5Loop*(px: WakuPeerExchange) {.async, gcsafe.} =
|
||||||
|
## Runs a discv5 loop adding new peers to the px peer cache
|
||||||
|
if px.wakuDiscv5.isNone():
|
||||||
|
warn "Trying to run discovery v5 (for PX) while it's disabled"
|
||||||
|
return
|
||||||
|
|
||||||
|
info "Starting peer exchange discovery v5 loop"
|
||||||
|
|
||||||
|
while px.wakuDiscv5.get().listening:
|
||||||
|
trace "Running px discv5 discovery loop"
|
||||||
|
let discoveredPeers = await px.wakuDiscv5.get().findRandomPeers()
|
||||||
|
info "Discovered px peers via discv5", count=discoveredPeers.get().len()
|
||||||
|
if discoveredPeers.isOk:
|
||||||
|
for dp in discoveredPeers.get():
|
||||||
|
if dp.enr.isSome() and not px.enrCache.contains(dp.enr.get()):
|
||||||
|
px.enrCache.add(dp.enr.get())
|
||||||
|
|
||||||
|
if px.enrCache.len() >= MaxCacheSize:
|
||||||
|
px.cleanCache()
|
||||||
|
|
||||||
|
## This loop "competes" with the loop in wakunode2
|
||||||
|
## For the purpose of collecting px peers, 30 sec intervals should be enough
|
||||||
|
await sleepAsync(30.seconds)
|
||||||
|
|
||||||
|
proc getEnrsFromCache(px: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
|
||||||
|
randomize()
|
||||||
|
if px.enrCache.len() == 0:
|
||||||
|
debug "peer exchange ENR cache is empty"
|
||||||
|
return @[]
|
||||||
|
for i in 0..<min(numPeers, px.enrCache.len().uint64()):
|
||||||
|
let ri = rand(0..<px.enrCache.len())
|
||||||
|
result.add(px.enrCache[ri])
|
||||||
|
|
||||||
|
proc initProtocolHandler*(px: WakuPeerExchange) =
|
||||||
|
|
||||||
|
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
|
let message = await conn.readLp(MaxRpcSize.int)
|
||||||
|
|
||||||
|
let res = PeerExchangeRpc.init(message)
|
||||||
|
if res.isErr():
|
||||||
|
waku_px_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return
|
||||||
|
|
||||||
|
let rpc = res.get()
|
||||||
|
|
||||||
|
# handle peer exchange request
|
||||||
|
if rpc.request != PeerExchangeRequest():
|
||||||
|
trace "peer exchange request received"
|
||||||
|
let enrs = px.getEnrsFromCache(rpc.request.numPeers)
|
||||||
|
discard await px.respond(enrs, conn.peerId)
|
||||||
|
waku_px_peers_sent.inc(enrs.len().int64())
|
||||||
|
|
||||||
|
# handle peer exchange response
|
||||||
|
if rpc.response != PeerExchangeResponse():
|
||||||
|
# todo: error handling
|
||||||
|
trace "peer exchange response received"
|
||||||
|
var record: enr.Record
|
||||||
|
var remotePeerInfoList: seq[RemotePeerInfo]
|
||||||
|
waku_px_peers_received_total.inc(rpc.response.peerInfos.len().int64())
|
||||||
|
for pi in rpc.response.peerInfos:
|
||||||
|
discard enr.fromBytes(record, pi.enr)
|
||||||
|
remotePeerInfoList.add(record.toRemotePeerInfo().get)
|
||||||
|
|
||||||
|
let newPeers = remotePeerInfoList.filterIt(
|
||||||
|
not px.peerManager.switch.isConnected(it.peerId))
|
||||||
|
|
||||||
|
if newPeers.len() > 0:
|
||||||
|
waku_px_peers_received_unknown.inc(newPeers.len().int64())
|
||||||
|
debug "Connecting to newly discovered peers", count=newPeers.len()
|
||||||
|
await px.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")
|
||||||
|
|
||||||
|
px.handler = handler
|
||||||
|
px.codec = WakuPeerExchangeCodec
|
||||||
|
|
||||||
|
proc init*(T: type WakuPeerExchange,
|
||||||
|
peerManager: PeerManager,
|
||||||
|
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)
|
||||||
|
): T =
|
||||||
|
let px = WakuPeerExchange(
|
||||||
|
peerManager: peerManager,
|
||||||
|
wakuDiscv5: wakuDiscv5
|
||||||
|
)
|
||||||
|
px.initProtocolHandler()
|
||||||
|
return px
|
||||||
|
|
||||||
|
proc setPeer*(wpx: WakuPeerExchange, peer: RemotePeerInfo) =
|
||||||
|
wpx.peerManager.addPeer(peer, WakuPeerExchangeCodec)
|
||||||
|
waku_px_peers.inc()
|
||||||
|
|
||||||
13
waku/v2/protocol/waku_peer_exchange/rpc.nim
Normal file
13
waku/v2/protocol/waku_peer_exchange/rpc.nim
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
type
|
||||||
|
PeerExchangePeerInfo* = object
|
||||||
|
enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778
|
||||||
|
|
||||||
|
PeerExchangeRequest* = object
|
||||||
|
numPeers*: uint64
|
||||||
|
|
||||||
|
PeerExchangeResponse* = object
|
||||||
|
peerInfos*: seq[PeerExchangePeerInfo]
|
||||||
|
|
||||||
|
PeerExchangeRpc* = object
|
||||||
|
request*: PeerExchangeRequest
|
||||||
|
response*: PeerExchangeResponse
|
||||||
91
waku/v2/protocol/waku_peer_exchange/rpc_codec.nim
Normal file
91
waku/v2/protocol/waku_peer_exchange/rpc_codec.nim
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
libp2p/protobuf/minprotobuf,
|
||||||
|
libp2p/varint
|
||||||
|
import
|
||||||
|
../../utils/protobuf,
|
||||||
|
./rpc
|
||||||
|
|
||||||
|
proc encode*(rpc: PeerExchangeRequest): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
|
||||||
|
output.write3(1, rpc.numPeers)
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PeerExchangeRequest(numPeers: 0)
|
||||||
|
|
||||||
|
var numPeers: uint64
|
||||||
|
if ?pb.getField(1, numPeers):
|
||||||
|
rpc.numPeers = numPeers
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
|
proc encode*(rpc: PeerExchangePeerInfo): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
|
||||||
|
output.write3(1, rpc.enr)
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PeerExchangePeerInfo(enr: @[])
|
||||||
|
|
||||||
|
var peerInfoBuffer: seq[byte]
|
||||||
|
if ?pb.getField(1, peerInfoBuffer):
|
||||||
|
rpc.enr = peerInfoBuffer
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
|
proc encode*(rpc: PeerExchangeResponse): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
|
||||||
|
for pi in rpc.peerInfos:
|
||||||
|
output.write3(1, pi.encode())
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PeerExchangeResponse(peerInfos: @[])
|
||||||
|
|
||||||
|
var peerInfoBuffers: seq[seq[byte]]
|
||||||
|
if ?pb.getRepeatedField(1, peerInfoBuffers):
|
||||||
|
for pib in peerInfoBuffers:
|
||||||
|
rpc.peerInfos.add(?PeerExchangePeerInfo.init(pib))
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
|
proc encode*(rpc: PeerExchangeRpc): ProtoBuffer =
|
||||||
|
var output = initProtoBuffer()
|
||||||
|
output.write3(1, rpc.request.encode())
|
||||||
|
output.write3(2, rpc.response.encode())
|
||||||
|
output.finish3()
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
proc init*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
let pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
var rpc = PeerExchangeRpc()
|
||||||
|
|
||||||
|
var requestBuffer: seq[byte]
|
||||||
|
discard ?pb.getField(1, requestBuffer)
|
||||||
|
rpc.request = ?PeerExchangeRequest.init(requestBuffer)
|
||||||
|
|
||||||
|
var responseBuffer: seq[byte]
|
||||||
|
discard ?pb.getField(2, responseBuffer)
|
||||||
|
rpc.response = ?PeerExchangeResponse.init(responseBuffer)
|
||||||
|
|
||||||
|
return ok(rpc)
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user