mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-12 15:06:38 +00:00
deploy: 3b6db722873e15f345d5276e9029b03e838d1c17
This commit is contained in:
parent
fcc362359f
commit
47f9d881ce
@ -1 +1 @@
|
||||
1613033905
|
||||
1613120032
|
@ -47,7 +47,7 @@ procSuite "Peer Manager":
|
||||
|
||||
# Check connectedness
|
||||
check:
|
||||
node1.peerManager.connectedness(peerInfo2.peerId)
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
@ -118,3 +118,45 @@ procSuite "Peer Manager":
|
||||
it.protos.contains(WakuStoreCodec))
|
||||
|
||||
await node.stop()
|
||||
|
||||
|
||||
asyncTest "Peer manager keeps track of connections":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
await node1.start()
|
||||
|
||||
node1.mountRelay()
|
||||
node2.mountRelay()
|
||||
|
||||
# Test default connectedness for new peers
|
||||
node1.peerManager.addPeer(peerInfo2, WakuRelayCodec)
|
||||
check:
|
||||
# No information about node2's connectedness
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
# Purposefully don't start node2
|
||||
# Attempt dialing node2 from node1
|
||||
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Cannot connect to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
|
||||
# Successful connection
|
||||
await node2.start()
|
||||
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Stop node. Gracefully disconnect from all peers.
|
||||
await node1.stop()
|
||||
check:
|
||||
# Not currently connected to node2, but had recent, successful connection.
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
||||
|
@ -10,7 +10,7 @@ generated by GNU Autoconf 2.69. Invocation command line was
|
||||
## Platform. ##
|
||||
## --------- ##
|
||||
|
||||
hostname = fv-az244-0
|
||||
hostname = fv-az56-962
|
||||
uname -m = x86_64
|
||||
uname -r = 5.4.0-1039-azure
|
||||
uname -s = Linux
|
||||
@ -841,7 +841,7 @@ configure:12482: $? = 0
|
||||
configure:12482: result: yes
|
||||
configure:12499: checking for getexecname
|
||||
configure:12499: gcc -o conftest -g -O3 -std=gnu11 -pipe -Wall -Wextra -fPIC conftest.c >&5
|
||||
/tmp/ccBeOD8e.o: In function `main':
|
||||
/tmp/cc2xS9FZ.o: In function `main':
|
||||
/home/runner/work/nim-waku/nim-waku/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/conftest.c:73: undefined reference to `getexecname'
|
||||
collect2: error: ld returned 1 exit status
|
||||
configure:12499: $? = 1
|
||||
@ -1134,7 +1134,7 @@ generated by GNU Autoconf 2.69. Invocation command line was
|
||||
CONFIG_COMMANDS =
|
||||
$ ./config.status
|
||||
|
||||
on fv-az244-0
|
||||
on fv-az56-962
|
||||
|
||||
config.status:1150: creating Makefile
|
||||
config.status:1150: creating backtrace-supported.h
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az244-0:
|
||||
# Libtool was configured on host fv-az56-962:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -42,7 +42,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuRelayCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuFilter.isNil:
|
||||
@ -50,7 +50,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuFilterCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuSwap.isNil:
|
||||
@ -58,7 +58,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuSwapCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
@ -66,7 +66,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuStoreCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
# @TODO filter output on protocol/connected-status
|
||||
|
@ -14,16 +14,54 @@ logScope:
|
||||
topics = "wakupeers"
|
||||
|
||||
type
|
||||
Connectedness* = enum
|
||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||
NotConnected,
|
||||
# CannotConnect: attempted to connect to peer, but failed.
|
||||
CannotConnect,
|
||||
# CanConnect: was recently connected to peer and disconnected gracefully.
|
||||
CanConnect,
|
||||
# Connected: actively connected to peer.
|
||||
Connected
|
||||
|
||||
ConnectionBook* = object of PeerBook[Connectedness]
|
||||
|
||||
WakuPeerStore* = ref object of PeerStore
|
||||
connectionBook*: ConnectionBook
|
||||
|
||||
PeerManager* = ref object of RootObj
|
||||
switch*: Switch
|
||||
peerStore*: PeerStore
|
||||
peerStore*: WakuPeerStore
|
||||
|
||||
const
|
||||
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
|
||||
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
pm.peerStore.connectionBook.set(peerId, Connected)
|
||||
return
|
||||
of ConnEventKind.Disconnected:
|
||||
pm.peerStore.connectionBook.set(peerId, CanConnect)
|
||||
return
|
||||
|
||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||
var p: WakuPeerStore
|
||||
new(p)
|
||||
return p
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch): PeerManager =
|
||||
T(switch: switch,
|
||||
peerStore: PeerStore.new())
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new())
|
||||
|
||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||
onConnEvent(pm, peerId, event)
|
||||
|
||||
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
||||
|
||||
return pm
|
||||
|
||||
####################
|
||||
# Helper functions #
|
||||
@ -46,7 +84,7 @@ proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
|
||||
# Return the known info for all peers registered on the specified protocol
|
||||
pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
@ -55,9 +93,9 @@ proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
|
||||
|
||||
if (storedInfo == StoredInfo()):
|
||||
# Peer is not managed, therefore not connected
|
||||
return false
|
||||
return NotConnected
|
||||
else:
|
||||
pm.switch.isConnected(peerId)
|
||||
pm.peerStore.connectionBook.get(peerId)
|
||||
|
||||
proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
@ -123,10 +161,12 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
|
||||
# @TODO indicate CannotConnect on peer metadata
|
||||
debug "Dialing remote peer timed out"
|
||||
waku_peers_dials.inc(labelValues = ["timeout"])
|
||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
||||
return none(Connection)
|
||||
except CatchableError as e:
|
||||
# @TODO any redial attempts?
|
||||
# @TODO indicate CannotConnect on peer metadata
|
||||
debug "Dialing remote peer failed", msg = e.msg
|
||||
waku_peers_dials.inc(labelValues = ["failed"])
|
||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
||||
return none(Connection)
|
||||
|
@ -394,7 +394,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
||||
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
@ -433,7 +433,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
|
||||
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
|
@ -93,7 +93,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
|
||||
let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_swap_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user