From 84e60c69302bef923c689177f1cd995e4a7d7326 Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 6 Sep 2022 22:19:47 +0700 Subject: [PATCH] fix discovery v4 ping pong handler --- eth/p2p/discovery.nim | 8 ---- eth/p2p/kademlia.nim | 98 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 9 deletions(-) diff --git a/eth/p2p/discovery.nim b/eth/p2p/discovery.nim index 0c0000e..acff0ef 100644 --- a/eth/p2p/discovery.nim +++ b/eth/p2p/discovery.nim @@ -39,14 +39,6 @@ type bindIp: IpAddress bindPort: Port - CommandId = enum - cmdPing = 1 - cmdPong = 2 - cmdFindNode = 3 - cmdNeighbours = 4 - cmdENRRequest = 5 - cmdENRResponse = 6 - DiscProtocolError* = object of CatchableError DiscResult*[T] = Result[T, cstring] diff --git a/eth/p2p/kademlia.nim b/eth/p2p/kademlia.nim index c75b502..2cf74d0 100644 --- a/eth/p2p/kademlia.nim +++ b/eth/p2p/kademlia.nim @@ -19,6 +19,9 @@ logScope: topics = "kademlia" type + # 32 bytes NodeId | 16 bytes ip | 1 byte mode + TimeKey = array[49, byte] + KademliaProtocol* [Wire] = ref object wire: Wire thisNode: Node @@ -27,6 +30,7 @@ type pingFutures: Table[Node, Future[bool]] neighboursCallbacks: Table[Node, proc(n: seq[Node]) {.gcsafe, raises: [Defect].}] rng: ref HmacDrbgContext + pingPongTime: OrderedTable[TimeKey, int64] # int64 -> unix time NodeId* = UInt256 @@ -44,12 +48,21 @@ type replacementCache: seq[Node] lastUpdated: float # epochTime + CommandId* = enum + cmdPing = 1 + cmdPong = 2 + cmdFindNode = 3 + cmdNeighbours = 4 + cmdENRRequest = 5 + cmdENRResponse = 6 + const BUCKET_SIZE = 16 BITS_PER_HOP = 8 REQUEST_TIMEOUT = chronos.milliseconds(5000) # timeout of message round trips FIND_CONCURRENCY = 3 # parallel find node lookups ID_SIZE = 256 + BOND_EXPIRATION = initDuration(hours = 12) proc toNodeId*(pk: PublicKey): NodeId = readUintBE[256](keccak256.digest(pk.toRaw()).data) @@ -81,6 +94,68 @@ proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.toRaw) proc `==`*(a, b: Node): bool = (a.isNil and b.isNil) or (not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey) +proc timeKey(id: NodeId, ip: IpAddress, cmd: CommandId): TimeKey = + result[0..31] = id.toByteArrayBE()[0..31] + case ip.family + of IpAddressFamily.IPv6: + result[32..47] = ip.address_v6[0..15] + of IpAddressFamily.IPv4: + result[32..35] = ip.address_v4[0..3] + result[48] = cmd.byte + +proc ip(n: Node): IpAddress = + n.node.address.ip + +proc timeKeyPong(n: Node): TimeKey = + timeKey(n.id, n.ip, cmdPong) + +proc timeKeyPing(n: Node): TimeKey = + timeKey(n.id, n.ip, cmdPing) + +proc lastPingReceived(k: KademliaProtocol, n: Node): Time = + k.pingPongTime.getOrDefault(n.timeKeyPing, 0'i64).fromUnix + +proc lastPongReceived(k: KademliaProtocol, n: Node): Time = + k.pingPongTime.getOrDefault(n.timeKeyPong, 0'i64).fromUnix + +proc cmp(x, y: (TimeKey, int64)): int = + if x[1] < y[1]: return -1 + if x[1] > y[1]: return 1 + 0 + +proc removeTooOldPingPongTime(k: KademliaProtocol) = + const + MaxEntries = 128 + MaxRC = 5 + + if k.pingPongTime.len < MaxEntries: + return + + k.pingPongTime.sort(cmp, order = SortOrder.Descending) + var + num = 0 + rc: array[MaxRC, TimeKey] + + for v in keys(k.pingPongTime): + if num < MaxRC: + rc[num] = v + inc num + + for i in 0.. BOND_EXPIRATION: + let pingId = pingId(n, k.ping(n)) + + let fut = if pingId in k.pongFutures: + k.pongFutures[pingId] + else: + k.waitPong(n, pingId) + + let cb = proc(data: pointer) {.gcsafe.} = + # fut.read == true if pingid exists + try: + if fut.completed and fut.read: + k.updateRoutingTable(n) + except CatchableError as ex: + error "recvPing:WaitPong exception", msg=ex.msg + + fut.addCallback cb + else: + k.updateRoutingTable(n) + var future: Future[bool] if k.pingFutures.take(n, future): future.complete(true) + k.updateLastPingReceived(n, getTime()) proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) = ## Process a neighbours response.