mirror of https://github.com/waku-org/nwaku.git
fix(networking): fix wrong peer connected state (#1560)
This commit is contained in:
parent
740e4f2bd6
commit
edce01cd6a
|
@ -45,7 +45,7 @@ const
|
||||||
MaxParalelDials = 10
|
MaxParalelDials = 10
|
||||||
|
|
||||||
# Delay between consecutive relayConnectivityLoop runs
|
# Delay between consecutive relayConnectivityLoop runs
|
||||||
ConnectivityLoopInterval = chronos.seconds(30)
|
ConnectivityLoopInterval = chronos.seconds(15)
|
||||||
|
|
||||||
# How often the peer store is pruned
|
# How often the peer store is pruned
|
||||||
PrunePeerStoreInterval = chronos.minutes(5)
|
PrunePeerStoreInterval = chronos.minutes(5)
|
||||||
|
@ -156,24 +156,36 @@ proc loadFromStorage(pm: PeerManager) =
|
||||||
# Initialisation #
|
# Initialisation #
|
||||||
##################
|
##################
|
||||||
|
|
||||||
|
# currently disabled. note that peer connection state connected/disconnected
|
||||||
|
# cant be tracked using this handler when more than one conn is allowed and
|
||||||
|
# when using autonat. eg if a peer has 2 conns and one is disconnected we cant
|
||||||
|
# assume that the peer is disconnected, because the other one might still be active.
|
||||||
|
# note that even with maxconn = 1, autonat forces more than one connection.
|
||||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||||
|
|
||||||
case event.kind
|
case event.kind
|
||||||
of ConnEventKind.Connected:
|
of ConnEventKind.Connected:
|
||||||
let direction = if event.incoming: Inbound else: Outbound
|
let direction = if event.incoming: Inbound else: Outbound
|
||||||
|
discard
|
||||||
|
of ConnEventKind.Disconnected:
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||||
|
if event.kind == PeerEventKind.Joined:
|
||||||
|
let direction = if event.initiator: Outbound else: Inbound
|
||||||
pm.peerStore[ConnectionBook][peerId] = Connected
|
pm.peerStore[ConnectionBook][peerId] = Connected
|
||||||
pm.peerStore[DirectionBook][peerId] = direction
|
pm.peerStore[DirectionBook][peerId] = direction
|
||||||
|
|
||||||
waku_connected_peers.inc(1, labelValues=[$direction])
|
waku_connected_peers.inc(1, labelValues=[$direction])
|
||||||
|
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
||||||
return
|
return
|
||||||
of ConnEventKind.Disconnected:
|
|
||||||
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])
|
|
||||||
|
|
||||||
|
elif event.kind == PeerEventKind.Left:
|
||||||
pm.peerStore[DirectionBook][peerId] = UnknownDirection
|
pm.peerStore[DirectionBook][peerId] = UnknownDirection
|
||||||
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
||||||
|
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])
|
||||||
|
|
||||||
if not pm.storage.isNil:
|
if not pm.storage.isNil:
|
||||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||||
return
|
return
|
||||||
|
@ -199,14 +211,21 @@ proc new*(T: type PeerManager,
|
||||||
initialBackoffInSec: initialBackoffInSec,
|
initialBackoffInSec: initialBackoffInSec,
|
||||||
backoffFactor: backoffFactor,
|
backoffFactor: backoffFactor,
|
||||||
maxFailedAttempts: maxFailedAttempts)
|
maxFailedAttempts: maxFailedAttempts)
|
||||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||||
onConnEvent(pm, peerId, event)
|
onConnEvent(pm, peerId, event)
|
||||||
|
|
||||||
|
proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
|
||||||
|
onPeerEvent(pm, peerId, event)
|
||||||
|
|
||||||
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
|
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
|
||||||
waku_peer_store_size.set(toSeq(pm.peerStore[AddressBook].book.keys).len.int64)
|
waku_peer_store_size.set(toSeq(pm.peerStore[AddressBook].book.keys).len.int64)
|
||||||
|
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
# currently disabled
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected)
|
||||||
|
#pm.switch.addConnEventHandler(connHook, ConnEventKind.Disconnected)
|
||||||
|
|
||||||
|
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined)
|
||||||
|
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left)
|
||||||
|
|
||||||
# called every time the peerstore is updated
|
# called every time the peerstore is updated
|
||||||
pm.peerStore[AddressBook].addHandler(peerStoreChanged)
|
pm.peerStore[AddressBook].addHandler(peerStoreChanged)
|
||||||
|
|
Loading…
Reference in New Issue