fix discovery v4 ping pong handler

This commit is contained in:
jangko 2022-09-06 22:19:47 +07:00
parent 2186d67163
commit 84e60c6930
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
2 changed files with 97 additions and 9 deletions

View File

@ -39,14 +39,6 @@ type
bindIp: IpAddress bindIp: IpAddress
bindPort: Port bindPort: Port
CommandId = enum
cmdPing = 1
cmdPong = 2
cmdFindNode = 3
cmdNeighbours = 4
cmdENRRequest = 5
cmdENRResponse = 6
DiscProtocolError* = object of CatchableError DiscProtocolError* = object of CatchableError
DiscResult*[T] = Result[T, cstring] DiscResult*[T] = Result[T, cstring]

View File

@ -19,6 +19,9 @@ logScope:
topics = "kademlia" topics = "kademlia"
type type
# 32 bytes NodeId | 16 bytes ip | 1 byte mode
TimeKey = array[49, byte]
KademliaProtocol* [Wire] = ref object KademliaProtocol* [Wire] = ref object
wire: Wire wire: Wire
thisNode: Node thisNode: Node
@ -27,6 +30,7 @@ type
pingFutures: Table[Node, Future[bool]] pingFutures: Table[Node, Future[bool]]
neighboursCallbacks: Table[Node, proc(n: seq[Node]) {.gcsafe, raises: [Defect].}] neighboursCallbacks: Table[Node, proc(n: seq[Node]) {.gcsafe, raises: [Defect].}]
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
pingPongTime: OrderedTable[TimeKey, int64] # int64 -> unix time
NodeId* = UInt256 NodeId* = UInt256
@ -44,12 +48,21 @@ type
replacementCache: seq[Node] replacementCache: seq[Node]
lastUpdated: float # epochTime lastUpdated: float # epochTime
CommandId* = enum
cmdPing = 1
cmdPong = 2
cmdFindNode = 3
cmdNeighbours = 4
cmdENRRequest = 5
cmdENRResponse = 6
const const
BUCKET_SIZE = 16 BUCKET_SIZE = 16
BITS_PER_HOP = 8 BITS_PER_HOP = 8
REQUEST_TIMEOUT = chronos.milliseconds(5000) # timeout of message round trips REQUEST_TIMEOUT = chronos.milliseconds(5000) # timeout of message round trips
FIND_CONCURRENCY = 3 # parallel find node lookups FIND_CONCURRENCY = 3 # parallel find node lookups
ID_SIZE = 256 ID_SIZE = 256
BOND_EXPIRATION = initDuration(hours = 12)
proc toNodeId*(pk: PublicKey): NodeId = proc toNodeId*(pk: PublicKey): NodeId =
readUintBE[256](keccak256.digest(pk.toRaw()).data) readUintBE[256](keccak256.digest(pk.toRaw()).data)
@ -81,6 +94,68 @@ proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.toRaw)
proc `==`*(a, b: Node): bool = (a.isNil and b.isNil) or proc `==`*(a, b: Node): bool = (a.isNil and b.isNil) or
(not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey) (not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey)
proc timeKey(id: NodeId, ip: IpAddress, cmd: CommandId): TimeKey =
result[0..31] = id.toByteArrayBE()[0..31]
case ip.family
of IpAddressFamily.IPv6:
result[32..47] = ip.address_v6[0..15]
of IpAddressFamily.IPv4:
result[32..35] = ip.address_v4[0..3]
result[48] = cmd.byte
proc ip(n: Node): IpAddress =
n.node.address.ip
proc timeKeyPong(n: Node): TimeKey =
timeKey(n.id, n.ip, cmdPong)
proc timeKeyPing(n: Node): TimeKey =
timeKey(n.id, n.ip, cmdPing)
proc lastPingReceived(k: KademliaProtocol, n: Node): Time =
k.pingPongTime.getOrDefault(n.timeKeyPing, 0'i64).fromUnix
proc lastPongReceived(k: KademliaProtocol, n: Node): Time =
k.pingPongTime.getOrDefault(n.timeKeyPong, 0'i64).fromUnix
proc cmp(x, y: (TimeKey, int64)): int =
if x[1] < y[1]: return -1
if x[1] > y[1]: return 1
0
proc removeTooOldPingPongTime(k: KademliaProtocol) =
const
MaxEntries = 128
MaxRC = 5
if k.pingPongTime.len < MaxEntries:
return
k.pingPongTime.sort(cmp, order = SortOrder.Descending)
var
num = 0
rc: array[MaxRC, TimeKey]
for v in keys(k.pingPongTime):
if num < MaxRC:
rc[num] = v
inc num
for i in 0..<num:
k.pingPongTime.del(rc[i])
proc updateLastPingReceived(k: KademliaProtocol, n: Node, t: Time) =
k.removeTooOldPingPongTime()
k.pingPongTime[n.timeKeyPing] = t.toUnix
proc updateLastPongReceived(k: KademliaProtocol, n: Node, t: Time) =
k.removeTooOldPingPongTime()
k.pingPongTime[n.timeKeyPong] = t.toUnix
# checkBond checks if the given node has a recent enough endpoint proof.
proc checkBond(k: KademliaProtocol, n: Node): bool =
getTime() - k.lastPongReceived(n) < BOND_EXPIRATION
proc newKBucket(istart, iend: NodeId): KBucket = proc newKBucket(istart, iend: NodeId): KBucket =
result.new() result.new()
result.istart = istart result.istart = istart
@ -496,16 +571,37 @@ proc recvPong*(k: KademliaProtocol, n: Node, token: seq[byte]) =
var future: Future[bool] var future: Future[bool]
if k.pongFutures.take(pingid, future): if k.pongFutures.take(pingid, future):
future.complete(true) future.complete(true)
k.updateLastPongReceived(n, getTime())
proc recvPing*(k: KademliaProtocol, n: Node, msgHash: any) proc recvPing*(k: KademliaProtocol, n: Node, msgHash: any)
{.raises: [ValueError, Defect].} = {.raises: [ValueError, Defect].} =
trace "<<< ping from ", n trace "<<< ping from ", n
k.updateRoutingTable(n)
k.wire.sendPong(n, msgHash) k.wire.sendPong(n, msgHash)
if getTime() - k.lastPongReceived(n) > BOND_EXPIRATION:
let pingId = pingId(n, k.ping(n))
let fut = if pingId in k.pongFutures:
k.pongFutures[pingId]
else:
k.waitPong(n, pingId)
let cb = proc(data: pointer) {.gcsafe.} =
# fut.read == true if pingid exists
try:
if fut.completed and fut.read:
k.updateRoutingTable(n)
except CatchableError as ex:
error "recvPing:WaitPong exception", msg=ex.msg
fut.addCallback cb
else:
k.updateRoutingTable(n)
var future: Future[bool] var future: Future[bool]
if k.pingFutures.take(n, future): if k.pingFutures.take(n, future):
future.complete(true) future.complete(true)
k.updateLastPingReceived(n, getTime())
proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) = proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) =
## Process a neighbours response. ## Process a neighbours response.