use peer id in a number of places

This commit is contained in:
Jacek Sieka 2020-08-02 21:27:36 +02:00 committed by zah
parent 4b8ebb5d71
commit 221f372dbc
3 changed files with 71 additions and 69 deletions

View File

@ -7,7 +7,7 @@
import
# Standard library
algorithm, os, tables, strutils, sequtils, times, math, terminal,
std/[algorithm, os, tables, strutils, sequtils, times, math, terminal],
# Nimble packages
stew/[objects, byteutils, endians2], stew/shims/macros,

View File

@ -1,7 +1,7 @@
import
# Std lib
typetraits, strutils, os, random, algorithm, sequtils, math,
options as stdOptions,
std/[typetraits, strutils, os, random, algorithm, sequtils, math, sets],
std/options as stdOptions,
# Status libs
stew/[varints, base58, base64, endians2, results, byteutils], bearssl,
@ -45,7 +45,7 @@ type
GossipMsg = messages.Message
SeenItem* = object
pinfo*: PeerInfo
peerId*: PeerID
stamp*: chronos.Moment
# TODO Is this really needed?
@ -60,10 +60,10 @@ type
metadata*: Eth2Metadata
connectTimeout*: chronos.Duration
seenThreshold*: chronos.Duration
connQueue: AsyncQueue[PeerInfo]
connQueue: AsyncQueue[PeerAddr]
seenTable: Table[PeerID, SeenItem]
connWorkers: seq[Future[void]]
connTable: Table[PeerID, PeerInfo]
connTable: HashSet[PeerID]
forkId: ENRForkID
rng*: ref BrHmacDrbgContext
@ -94,6 +94,10 @@ type
score*: int
lacksSnappy: bool
PeerAddr* = object
peerId*: PeerID
addrs*: seq[MultiAddress]
ConnectionState* = enum
None,
Connecting,
@ -275,7 +279,7 @@ proc openStream(node: Eth2Node,
protocolId: string): Future[Connection] {.async.} =
let protocolId = protocolId & (if peer.lacksSnappy: "ssz" else: "ssz_snappy")
try:
result = await dial(node.switch, peer.info, protocolId)
result = await dial(node.switch, peer.info.peerId, peer.info.addrs, protocolId)
except CancelledError:
raise
except CatchableError:
@ -290,16 +294,15 @@ proc openStream(node: Eth2Node,
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
let peerId = peerInfo.peerId
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peerPool.getOrDefault(peerId)
if result == nil:
# TODO: We should register this peer in the pool!
result = Peer.init(node, peerInfo)
result = Peer.init(node, PeerInfo.init(peerId))
proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} =
# TODO: Can this be `nil`?
return network.getPeer(conn.peerInfo)
return network.getPeer(conn.peerInfo.peerId)
proc getKey*(peer: Peer): PeerID {.inline.} =
result = peer.info.peerId
@ -355,29 +358,28 @@ proc `<`*(a, b: Peer): bool =
else:
false
proc isSeen*(network: ETh2Node, pinfo: PeerInfo): bool =
proc isSeen*(network: ETh2Node, peerId: PeerID): bool =
let currentTime = now(chronos.Moment)
let item = network.seenTable.getOrDefault(pinfo.peerId)
if isNil(item.pinfo):
# Peer is not in SeenTable.
if peerId notin network.seenTable:
return false
let item = network.seenTable[peerId]
if currentTime >= item.stamp:
# Peer is in SeenTable, but the time period has expired.
network.seenTable.del(pinfo.peerId)
network.seenTable.del(peerId)
return false
return true
proc addSeen*(network: ETh2Node, pinfo: PeerInfo,
proc addSeen*(network: ETh2Node, peerId: PeerID,
period: chronos.Duration) =
let item = SeenItem(pinfo: pinfo, stamp: now(chronos.Moment) + period)
network.seenTable[pinfo.peerId] = item
let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period)
network.seenTable[peerId] = item
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
await peer.network.switch.disconnect(peer.info)
await peer.network.switch.disconnect(peer.info.peerId)
peer.connectionState = Disconnected
peer.network.peerPool.release(peer)
let seenTime = case reason
@ -387,7 +389,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
SeenTableTimeIrrelevantNetwork
of FaultOrError:
SeemTableTimeFaultOrError
peer.network.addSeen(peer.info, seenTime)
peer.network.addSeen(peer.info.peerId, seenTime)
peer.info.close()
include eth/p2p/p2p_backends_helpers
@ -718,42 +720,42 @@ proc handleIncomingPeer*(peer: Peer): Future[bool] {.async.} =
nbc_peers.set int64(len(network.peerPool))
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
if r.secp256k1.isSome:
var pubKey = keys.PublicKey.fromRaw(r.secp256k1.get)
if pubkey.isErr:
return # TODO
proc toPeerAddr*(r: enr.TypedRecord):
Result[PeerAddr, cstring] {.raises: [Defect].} =
if not r.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
let peerId = PeerID.init crypto.PublicKey(
scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey[]))
var addresses = newSeq[MultiAddress]()
let
pubKey = ? keys.PublicKey.fromRaw(r.secp256k1.get)
peerId = ? PeerID.init(crypto.PublicKey(
scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey)))
if r.ip.isSome and r.tcp.isSome:
let ip = ipv4(r.ip.get)
addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
var addrs = newSeq[MultiAddress]()
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.tcp6.isSome:
addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
elif r.tcp.isSome:
addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
else:
discard
if r.ip.isSome and r.tcp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
if addresses.len > 0:
return PeerInfo.init(peerId.tryGet(), addresses)
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
elif r.tcp.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
else:
discard
proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo =
if r.isSome:
return r.get.toPeerInfo
if addrs.len == 0:
return err("enr: no addresses in record")
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
logScope: peer = peerInfo.id
ok(PeerAddr(peerId: peerId, addrs: addrs))
proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr) {.async.} =
logScope: peer = peerAddr.peerId
debug "Connecting to discovered peer"
await node.switch.connect(peerInfo)
var peer = node.getPeer(peerInfo)
await node.switch.connect(peerAddr.peerId, peerAddr.addrs)
var peer = node.getPeer(peerAddr.peerId)
peer.wasDialed = true
#let msDial = newMultistream()
@ -773,17 +775,17 @@ proc connectWorker(network: Eth2Node) {.async.} =
while true:
let
remotePeerInfo = await network.connQueue.popFirst()
peerPoolHasRemotePeer = network.peerPool.hasPeer(remotePeerInfo.peerId)
seenTableHasRemotePeer = network.isSeen(remotePeerInfo)
remotePeerAlreadyConnected = network.connTable.hasKey(remotePeerInfo.peerId)
remotePeerAddr = await network.connQueue.popFirst()
peerPoolHasRemotePeer = network.peerPool.hasPeer(remotePeerAddr.peerId)
seenTableHasRemotePeer = network.isSeen(remotePeerAddr.peerId)
remotePeerAlreadyConnected = remotePeerAddr.peerId in network.connTable
if not(peerPoolHasRemotePeer) and not(seenTableHasRemotePeer) and not(remotePeerAlreadyConnected):
network.connTable[remotePeerInfo.peerId] = remotePeerInfo
network.connTable.incl(remotePeerAddr.peerId)
try:
# We trying to connect to peers which are not in PeerPool, SeenTable and
# ConnTable.
var fut = network.dialPeer(remotePeerInfo)
var fut = network.dialPeer(remotePeerAddr)
# We discarding here just because we going to check future state, to avoid
# condition where connection happens and timeout reached.
discard await withTimeout(fut, network.connectTimeout)
@ -791,19 +793,19 @@ proc connectWorker(network: Eth2Node) {.async.} =
# will be stored in PeerPool.
if fut.finished():
if fut.failed() and not(fut.cancelled()):
debug "Unable to establish connection with peer", peer = remotePeerInfo.id,
debug "Unable to establish connection with peer", peer = remotePeerAddr.peerId,
errMsg = fut.readError().msg
inc nbc_failed_dials
network.addSeen(remotePeerInfo, SeenTableTimeDeadPeer)
network.addSeen(remotePeerAddr.peerId, SeenTableTimeDeadPeer)
continue
debug "Connection to remote peer timed out", peer = remotePeerInfo.id
debug "Connection to remote peer timed out", peer = remotePeerAddr.peerId
inc nbc_timeout_dials
network.addSeen(remotePeerInfo, SeenTableTimeTimeout)
network.addSeen(remotePeerAddr.peerId, SeenTableTimeTimeout)
finally:
network.connTable.del(remotePeerInfo.peerId)
network.connTable.excl(remotePeerAddr.peerId)
else:
trace "Peer is already connected, connecting or already seen",
peer = remotePeerInfo.id, peer_pool_has_peer = $peerPoolHasRemotePeer, seen_table_has_peer = $seenTableHasRemotePeer,
peer = remotePeerAddr.peerId, peer_pool_has_peer = $peerPoolHasRemotePeer, seen_table_has_peer = $seenTableHasRemotePeer,
connecting_peer = $remotePeerAlreadyConnected, seen_table_size = len(network.seenTable)
# Prevent (a purely theoretical) high CPU usage when losing connectivity.
@ -824,12 +826,12 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
try:
let peerRecord = peer.record.toTypedRecord
if peerRecord.isOk:
let peerInfo = peerRecord.value.toPeerInfo
if peerInfo != nil:
if not node.switch.isConnected(peerInfo):
await node.connQueue.addLast(peerInfo)
let peerAddr = peerRecord.value.toPeerAddr
if peerAddr.isOk:
if not node.switch.isConnected(peerAddr.get().peerId):
await node.connQueue.addLast(peerAddr.get())
else:
peerInfo.close()
discard # peerInfo.close()
except CatchableError as err:
debug "Failed to connect to peer", peer = $peer, err = err.msg
except CatchableError as err:
@ -858,8 +860,8 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
result.connectTimeout = 1.minutes
result.seenThreshold = 1.minutes
result.seenTable = initTable[PeerID, SeenItem]()
result.connTable = initTable[PeerID, PeerInfo]()
result.connQueue = newAsyncQueue[PeerInfo](ConcurrentConnections)
result.connTable = initHashSet[PeerID]()
result.connQueue = newAsyncQueue[PeerAddr](ConcurrentConnections)
result.metadata = getPersistentNetMetadata(conf)
result.forkId = enrForkId
result.discovery = Eth2DiscoveryProtocol.new(

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 74a6dccd800153ba0df6c8bb0989bc79d7c81542
Subproject commit 568f81caad1dfd53e7e5a6792f4ed6e41b367b4c