diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d61b2c9b8..62026499d 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -7,7 +7,7 @@ import # Standard library - algorithm, os, tables, strutils, sequtils, times, math, terminal, + std/[algorithm, os, tables, strutils, sequtils, times, math, terminal], # Nimble packages stew/[objects, byteutils, endians2], stew/shims/macros, diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 6f10637bd..038ec5ecd 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1,7 +1,7 @@ import # Std lib - typetraits, strutils, os, random, algorithm, sequtils, math, - options as stdOptions, + std/[typetraits, strutils, os, random, algorithm, sequtils, math, sets], + std/options as stdOptions, # Status libs stew/[varints, base58, base64, endians2, results, byteutils], bearssl, @@ -45,7 +45,7 @@ type GossipMsg = messages.Message SeenItem* = object - pinfo*: PeerInfo + peerId*: PeerID stamp*: chronos.Moment # TODO Is this really needed? @@ -60,10 +60,10 @@ type metadata*: Eth2Metadata connectTimeout*: chronos.Duration seenThreshold*: chronos.Duration - connQueue: AsyncQueue[PeerInfo] + connQueue: AsyncQueue[PeerAddr] seenTable: Table[PeerID, SeenItem] connWorkers: seq[Future[void]] - connTable: Table[PeerID, PeerInfo] + connTable: HashSet[PeerID] forkId: ENRForkID rng*: ref BrHmacDrbgContext @@ -94,6 +94,10 @@ type score*: int lacksSnappy: bool + PeerAddr* = object + peerId*: PeerID + addrs*: seq[MultiAddress] + ConnectionState* = enum None, Connecting, @@ -275,7 +279,7 @@ proc openStream(node: Eth2Node, protocolId: string): Future[Connection] {.async.} = let protocolId = protocolId & (if peer.lacksSnappy: "ssz" else: "ssz_snappy") try: - result = await dial(node.switch, peer.info, protocolId) + result = await dial(node.switch, peer.info.peerId, peer.info.addrs, protocolId) except CancelledError: raise except CatchableError: @@ -290,16 +294,15 @@ proc openStream(node: Eth2Node, proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.} -proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} = - let peerId = peerInfo.peerId +proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = result = node.peerPool.getOrDefault(peerId) if result == nil: # TODO: We should register this peer in the pool! - result = Peer.init(node, peerInfo) + result = Peer.init(node, PeerInfo.init(peerId)) proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} = # TODO: Can this be `nil`? - return network.getPeer(conn.peerInfo) + return network.getPeer(conn.peerInfo.peerId) proc getKey*(peer: Peer): PeerID {.inline.} = result = peer.info.peerId @@ -355,29 +358,28 @@ proc `<`*(a, b: Peer): bool = else: false -proc isSeen*(network: ETh2Node, pinfo: PeerInfo): bool = +proc isSeen*(network: ETh2Node, peerId: PeerID): bool = let currentTime = now(chronos.Moment) - let item = network.seenTable.getOrDefault(pinfo.peerId) - if isNil(item.pinfo): - # Peer is not in SeenTable. + if peerId notin network.seenTable: return false + let item = network.seenTable[peerId] if currentTime >= item.stamp: # Peer is in SeenTable, but the time period has expired. - network.seenTable.del(pinfo.peerId) + network.seenTable.del(peerId) return false return true -proc addSeen*(network: ETh2Node, pinfo: PeerInfo, +proc addSeen*(network: ETh2Node, peerId: PeerID, period: chronos.Duration) = - let item = SeenItem(pinfo: pinfo, stamp: now(chronos.Moment) + period) - network.seenTable[pinfo.peerId] = item + let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period) + network.seenTable[peerId] = item proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = # TODO: How should we notify the other peer? if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting - await peer.network.switch.disconnect(peer.info) + await peer.network.switch.disconnect(peer.info.peerId) peer.connectionState = Disconnected peer.network.peerPool.release(peer) let seenTime = case reason @@ -387,7 +389,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, SeenTableTimeIrrelevantNetwork of FaultOrError: SeemTableTimeFaultOrError - peer.network.addSeen(peer.info, seenTime) + peer.network.addSeen(peer.info.peerId, seenTime) peer.info.close() include eth/p2p/p2p_backends_helpers @@ -718,42 +720,42 @@ proc handleIncomingPeer*(peer: Peer): Future[bool] {.async.} = nbc_peers.set int64(len(network.peerPool)) -proc toPeerInfo*(r: enr.TypedRecord): PeerInfo = - if r.secp256k1.isSome: - var pubKey = keys.PublicKey.fromRaw(r.secp256k1.get) - if pubkey.isErr: - return # TODO +proc toPeerAddr*(r: enr.TypedRecord): + Result[PeerAddr, cstring] {.raises: [Defect].} = + if not r.secp256k1.isSome: + return err("enr: no secp256k1 key in record") - let peerId = PeerID.init crypto.PublicKey( - scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey[])) - var addresses = newSeq[MultiAddress]() + let + pubKey = ? keys.PublicKey.fromRaw(r.secp256k1.get) + peerId = ? PeerID.init(crypto.PublicKey( + scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey))) - if r.ip.isSome and r.tcp.isSome: - let ip = ipv4(r.ip.get) - addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) + var addrs = newSeq[MultiAddress]() - if r.ip6.isSome: - let ip = ipv6(r.ip6.get) - if r.tcp6.isSome: - addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get) - elif r.tcp.isSome: - addresses.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) - else: - discard + if r.ip.isSome and r.tcp.isSome: + let ip = ipv4(r.ip.get) + addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) - if addresses.len > 0: - return PeerInfo.init(peerId.tryGet(), addresses) + if r.ip6.isSome: + let ip = ipv6(r.ip6.get) + if r.tcp6.isSome: + addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get) + elif r.tcp.isSome: + addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) + else: + discard -proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo = - if r.isSome: - return r.get.toPeerInfo + if addrs.len == 0: + return err("enr: no addresses in record") -proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} = - logScope: peer = peerInfo.id + ok(PeerAddr(peerId: peerId, addrs: addrs)) + +proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr) {.async.} = + logScope: peer = peerAddr.peerId debug "Connecting to discovered peer" - await node.switch.connect(peerInfo) - var peer = node.getPeer(peerInfo) + await node.switch.connect(peerAddr.peerId, peerAddr.addrs) + var peer = node.getPeer(peerAddr.peerId) peer.wasDialed = true #let msDial = newMultistream() @@ -773,17 +775,17 @@ proc connectWorker(network: Eth2Node) {.async.} = while true: let - remotePeerInfo = await network.connQueue.popFirst() - peerPoolHasRemotePeer = network.peerPool.hasPeer(remotePeerInfo.peerId) - seenTableHasRemotePeer = network.isSeen(remotePeerInfo) - remotePeerAlreadyConnected = network.connTable.hasKey(remotePeerInfo.peerId) + remotePeerAddr = await network.connQueue.popFirst() + peerPoolHasRemotePeer = network.peerPool.hasPeer(remotePeerAddr.peerId) + seenTableHasRemotePeer = network.isSeen(remotePeerAddr.peerId) + remotePeerAlreadyConnected = remotePeerAddr.peerId in network.connTable if not(peerPoolHasRemotePeer) and not(seenTableHasRemotePeer) and not(remotePeerAlreadyConnected): - network.connTable[remotePeerInfo.peerId] = remotePeerInfo + network.connTable.incl(remotePeerAddr.peerId) try: # We trying to connect to peers which are not in PeerPool, SeenTable and # ConnTable. - var fut = network.dialPeer(remotePeerInfo) + var fut = network.dialPeer(remotePeerAddr) # We discarding here just because we going to check future state, to avoid # condition where connection happens and timeout reached. discard await withTimeout(fut, network.connectTimeout) @@ -791,19 +793,19 @@ proc connectWorker(network: Eth2Node) {.async.} = # will be stored in PeerPool. if fut.finished(): if fut.failed() and not(fut.cancelled()): - debug "Unable to establish connection with peer", peer = remotePeerInfo.id, + debug "Unable to establish connection with peer", peer = remotePeerAddr.peerId, errMsg = fut.readError().msg inc nbc_failed_dials - network.addSeen(remotePeerInfo, SeenTableTimeDeadPeer) + network.addSeen(remotePeerAddr.peerId, SeenTableTimeDeadPeer) continue - debug "Connection to remote peer timed out", peer = remotePeerInfo.id + debug "Connection to remote peer timed out", peer = remotePeerAddr.peerId inc nbc_timeout_dials - network.addSeen(remotePeerInfo, SeenTableTimeTimeout) + network.addSeen(remotePeerAddr.peerId, SeenTableTimeTimeout) finally: - network.connTable.del(remotePeerInfo.peerId) + network.connTable.excl(remotePeerAddr.peerId) else: trace "Peer is already connected, connecting or already seen", - peer = remotePeerInfo.id, peer_pool_has_peer = $peerPoolHasRemotePeer, seen_table_has_peer = $seenTableHasRemotePeer, + peer = remotePeerAddr.peerId, peer_pool_has_peer = $peerPoolHasRemotePeer, seen_table_has_peer = $seenTableHasRemotePeer, connecting_peer = $remotePeerAlreadyConnected, seen_table_size = len(network.seenTable) # Prevent (a purely theoretical) high CPU usage when losing connectivity. @@ -824,12 +826,12 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = try: let peerRecord = peer.record.toTypedRecord if peerRecord.isOk: - let peerInfo = peerRecord.value.toPeerInfo - if peerInfo != nil: - if not node.switch.isConnected(peerInfo): - await node.connQueue.addLast(peerInfo) + let peerAddr = peerRecord.value.toPeerAddr + if peerAddr.isOk: + if not node.switch.isConnected(peerAddr.get().peerId): + await node.connQueue.addLast(peerAddr.get()) else: - peerInfo.close() + discard # peerInfo.close() except CatchableError as err: debug "Failed to connect to peer", peer = $peer, err = err.msg except CatchableError as err: @@ -858,8 +860,8 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID, result.connectTimeout = 1.minutes result.seenThreshold = 1.minutes result.seenTable = initTable[PeerID, SeenItem]() - result.connTable = initTable[PeerID, PeerInfo]() - result.connQueue = newAsyncQueue[PeerInfo](ConcurrentConnections) + result.connTable = initHashSet[PeerID]() + result.connQueue = newAsyncQueue[PeerAddr](ConcurrentConnections) result.metadata = getPersistentNetMetadata(conf) result.forkId = enrForkId result.discovery = Eth2DiscoveryProtocol.new( diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 74a6dccd8..568f81caa 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 74a6dccd800153ba0df6c8bb0989bc79d7c81542 +Subproject commit 568f81caad1dfd53e7e5a6792f4ed6e41b367b4c