Added basic peer manager (#364)

* Added basic peer manager
This commit is contained in:
Hanno Cornelius 2021-02-04 12:32:58 +02:00 committed by GitHub
parent 9faff49e5b
commit 080d82dd9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 125 additions and 10 deletions

2
.gitmodules vendored
View File

@ -12,7 +12,7 @@
path = vendor/nim-libp2p path = vendor/nim-libp2p
url = https://github.com/status-im/nim-libp2p.git url = https://github.com/status-im/nim-libp2p.git
ignore = dirty ignore = dirty
branch = master branch = unstable
[submodule "vendor/nim-stew"] [submodule "vendor/nim-stew"]
path = vendor/nim-stew path = vendor/nim-stew
url = https://github.com/status-im/nim-stew.git url = https://github.com/status-im/nim-stew.git

View File

@ -10,5 +10,6 @@ import
./v2/test_waku_swap, ./v2/test_waku_swap,
./v2/test_message_store, ./v2/test_message_store,
./v2/test_jsonrpc_waku, ./v2/test_jsonrpc_waku,
./v2/test_peer_manager,
./v2/test_web3 # will remove it when rln-relay tests get added ./v2/test_web3 # will remove it when rln-relay tests get added
# TODO ./v2/test_waku_rln_relay # TODO ./v2/test_waku_rln_relay

View File

@ -0,0 +1,48 @@
import
std/[unittest, options, sets, tables, sequtils],
stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
libp2p/[standard_setup, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message,
../../waku/v2/node/wakunode2,
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_relay,
../test_helpers
procSuite "Peer Manager":
asyncTest "Peer dialing":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
peerInfo2 = node2.peerInfo
await allFutures([node1.start(), node2.start()])
node1.mountRelay()
node2.mountRelay()
# Dial node2 from node1
let conn = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec)
# Check connection
check:
conn.activity
conn.peerInfo.peerId == peerInfo2.peerId
# Check that node2 is being managed in node1
check:
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
# Check connectedness
check:
node1.peerManager.connectedness(peerInfo2.peerId)

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 5aebf0990e5315a0349e2414e7bc11da96f703bc Subproject commit 090ea91cb06ac6e35fd1401c2214871b24a77363

View File

@ -0,0 +1,67 @@
{.push raises: [Defect, Exception].}
import
chronos, chronicles,
libp2p/standard_setup,
libp2p/peerstore
type
PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
proc new*(T: type PeerManager, switch: Switch): PeerManager =
T(switch: switch,
peerStore: PeerStore.new())
####################
# Dialer interface #
####################
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Connection] {.async.} =
# Dial a given peer and add it to the list of known peers
# @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store...
debug "Adding dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
# ...known addresses
for multiaddr in peerInfo.addrs:
pm.peerStore.addressBook.add(peerInfo.peerId, multiaddr)
# ...public key
var publicKey: PublicKey
discard peerInfo.peerId.extractPublicKey(publicKey)
pm.peerStore.keyBook.set(peerInfo.peerId, publicKey)
# ...associated protocols
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId
# Dial Peer
# @TODO Keep track of conn and connected state in peer store
return await pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto)
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return false
else:
pm.switch.isConnected(peerId)

View File

@ -8,7 +8,6 @@ import
libp2p/protocols/protocol, libp2p/protocols/protocol,
# NOTE For TopicHandler, solve with exports? # NOTE For TopicHandler, solve with exports?
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/peerinfo,
libp2p/standard_setup, libp2p/standard_setup,
../protocol/[waku_relay, message_notifier], ../protocol/[waku_relay, message_notifier],
../protocol/waku_store/waku_store, ../protocol/waku_store/waku_store,
@ -16,7 +15,8 @@ import
../protocol/waku_filter/waku_filter, ../protocol/waku_filter/waku_filter,
../utils/peers, ../utils/peers,
./message_store/message_store, ./message_store/message_store,
../utils/requests ../utils/requests,
./peer_manager
declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions" declarePublicGauge waku_node_filters, "number of content filter subscriptions"
@ -45,6 +45,7 @@ type
# NOTE based on Eth2Node in NBC eth2_network.nim # NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj WakuNode* = ref object of RootObj
peerManager*: PeerManager
switch*: Switch switch*: Switch
wakuRelay*: WakuRelay wakuRelay*: WakuRelay
wakuStore*: WakuStore wakuStore*: WakuStore
@ -130,6 +131,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
# triggerSelf = true, sign = false, # triggerSelf = true, sign = false,
# verifySignature = false).PubSub # verifySignature = false).PubSub
result = WakuNode( result = WakuNode(
peerManager: PeerManager.new(switch),
switch: switch, switch: switch,
rng: rng, rng: rng,
peerInfo: peerInfo, peerInfo: peerInfo,
@ -344,11 +346,8 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
# NOTE This is dialing on WakuRelay protocol specifically # NOTE This is dialing on WakuRelay protocol specifically
# TODO Keep track of conn and connected state somewhere (WakuRelay?) discard await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
#p.conn = await p.switch.dial(remotePeer, WakuRelayCodec) info "Post peerManager dial"
#p.connected = true
discard await n.switch.dial(remotePeer.peerId, remotePeer.addrs, WakuRelayCodec)
info "Post switch dial"
proc setStorePeer*(n: WakuNode, address: string) = proc setStorePeer*(n: WakuNode, address: string) =
info "dialPeer", address = address info "dialPeer", address = address
@ -381,7 +380,7 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} = proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} =
for peerInfo in nodes: for peerInfo in nodes:
info "connectToNodes", peer = peerInfo info "connectToNodes", peer = peerInfo
discard await n.switch.dial(peerInfo.peerId, peerInfo.addrs, WakuRelayCodec) discard await n.peerManager.dialPeer(peerInfo, WakuRelayCodec)
# The issue seems to be around peers not being fully connected when # The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are # trying to subscribe. So what we do is sleep to guarantee nodes are