discv4: prefer ipv6 (#751)

This allows bonding with ipv4-mapped nodes above all, which fixes a
bunch of warnings
This commit is contained in:
Jacek Sieka 2024-10-29 10:03:13 +01:00 committed by GitHub
parent 30476de038
commit 5d78c6a879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 111 additions and 83 deletions

View File

@ -84,7 +84,7 @@ proc newEthereumNode*(
bootstrapNodes: seq[ENode] = @[], bootstrapNodes: seq[ENode] = @[],
bindUdpPort: Port, bindUdpPort: Port,
bindTcpPort: Port, bindTcpPort: Port,
bindIp = IPv4_any(), bindIp = IPv6_any(),
rng = newRng()): EthereumNode = rng = newRng()): EthereumNode =
if rng == nil: # newRng could fail if rng == nil: # newRng could fail

View File

@ -9,14 +9,17 @@
import import
std/[times, net], std/[times, net],
chronos, stint, nimcrypto/keccak, chronicles, chronos,
stew/objects, results, stint,
nimcrypto/keccak,
chronicles,
stew/objects,
results,
../rlp, ../rlp,
../common/keys, ../common/keys,
"."/[kademlia, enode] "."/[kademlia, enode]
export export Node, results
Node, results
logScope: logScope:
topics = "eth p2p discovery" topics = "eth p2p discovery"
@ -55,9 +58,14 @@ proc append*(w: var RlpWriter, a: IpAddress) =
of IpAddressFamily.IPv4: of IpAddressFamily.IPv4:
w.append(a.address_v4) w.append(a.address_v4)
proc append(w: var RlpWriter, p: Port) = w.append(p.uint) proc append(w: var RlpWriter, p: Port) =
proc append(w: var RlpWriter, pk: PublicKey) = w.append(pk.toRaw()) w.append(p.uint)
proc append(w: var RlpWriter, h: MDigest[256]) = w.append(h.data)
proc append(w: var RlpWriter, pk: PublicKey) =
w.append(pk.toRaw())
proc append(w: var RlpWriter, h: MDigest[256]) =
w.append(h.data)
proc pack(cmdId: CommandId, payload: openArray[byte], pk: PrivateKey): seq[byte] = proc pack(cmdId: CommandId, payload: openArray[byte], pk: PrivateKey): seq[byte] =
## Create and sign a UDP message to be sent to a remote node. ## Create and sign a UDP message to be sent to a remote node.
@ -65,11 +73,13 @@ proc pack(cmdId: CommandId, payload: openArray[byte], pk: PrivateKey): seq[byte]
## See https://github.com/ethereum/devp2p/blob/master/rlpx.md#node-discovery for information on ## See https://github.com/ethereum/devp2p/blob/master/rlpx.md#node-discovery for information on
## how UDP packets are structured. ## how UDP packets are structured.
# TODO: There is a lot of unneeded allocations here result = newSeq[byte](HEAD_SIZE + 1 + payload.len)
let encodedData = @[cmdId.byte] & @payload result[HEAD_SIZE] = cmdId.byte
let signature = @(pk.sign(encodedData).toRaw()) result[HEAD_SIZE + 1 ..< result.len] = payload
let msgHash = keccak256.digest(signature & encodedData) result[MAC_SIZE ..< MAC_SIZE + SIG_SIZE] =
result = @(msgHash.data) & signature & encodedData pk.sign(result.toOpenArray(HEAD_SIZE, result.high)).toRaw()
result[0 ..< MAC_SIZE] =
keccak256.digest(result.toOpenArray(MAC_SIZE, result.high)).data
proc validateMsgHash(msg: openArray[byte]): DiscResult[MDigest[256]] = proc validateMsgHash(msg: openArray[byte]): DiscResult[MDigest[256]] =
if msg.len > HEAD_SIZE: if msg.len > HEAD_SIZE:
@ -85,20 +95,20 @@ proc validateMsgHash(msg: openArray[byte]): DiscResult[MDigest[256]] =
proc recoverMsgPublicKey(msg: openArray[byte]): DiscResult[PublicKey] = proc recoverMsgPublicKey(msg: openArray[byte]): DiscResult[PublicKey] =
if msg.len <= HEAD_SIZE: if msg.len <= HEAD_SIZE:
return err("disc: can't get public key") return err("disc: can't get public key")
let sig = ? Signature.fromRaw(msg.toOpenArray(MAC_SIZE, HEAD_SIZE)) let sig = ?Signature.fromRaw(msg.toOpenArray(MAC_SIZE, HEAD_SIZE))
recover(sig, msg.toOpenArray(HEAD_SIZE, msg.high)) recover(sig, msg.toOpenArray(HEAD_SIZE, msg.high))
proc unpack(msg: openArray[byte]): tuple[cmdId: CommandId, payload: seq[byte]] proc unpack(
{.raises: [DiscProtocolError].} = msg: openArray[byte]
): tuple[cmdId: CommandId, payload: seq[byte]] {.raises: [DiscProtocolError].} =
# Check against possible RangeDefect # Check against possible RangeDefect
if msg[HEAD_SIZE].int < CommandId.low.ord or if msg[HEAD_SIZE].int < CommandId.low.ord or msg[HEAD_SIZE].int > CommandId.high.ord:
msg[HEAD_SIZE].int > CommandId.high.ord:
raise newException(DiscProtocolError, "Unsupported packet id") raise newException(DiscProtocolError, "Unsupported packet id")
result = (cmdId: msg[HEAD_SIZE].CommandId, payload: msg[HEAD_SIZE + 1 .. ^1]) (cmdId: msg[HEAD_SIZE].CommandId, payload: msg[HEAD_SIZE + 1 .. ^1])
proc expiration(): uint32 = proc expiration(): uint64 =
result = uint32(epochTime() + EXPIRATION) uint64(getTime().toUnix() + EXPIRATION)
# Wire protocol # Wire protocol
@ -110,15 +120,16 @@ proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
when defined(chronicles_log_level): when defined(chronicles_log_level):
try: try:
# readError will raise FutureError # readError will raise FutureError
debug "Discovery send failed", msg = f.readError.msg debug "Discovery send failed",
msg = f.readError.msg, address = $n.node.address
except FutureError as exc: except FutureError as exc:
error "Failed to get discovery send future error", msg=exc.msg error "Failed to get discovery send future error", msg = exc.msg
f.addCallback cb f.addCallback cb
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
let payload = rlp.encode((PROTO_VERSION.uint, d.address, n.node.address, let payload =
expiration())) rlp.encode((PROTO_VERSION.uint, d.address, n.node.address, expiration()))
let msg = pack(cmdPing, payload, d.privKey) let msg = pack(cmdPing, payload, d.privKey)
result = msg[0 ..< MAC_SIZE] result = msg[0 ..< MAC_SIZE]
trace ">>> ping ", n trace ">>> ping ", n
@ -135,7 +146,7 @@ proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
data[32 .. ^1] = targetNodeId.toByteArrayBE() data[32 .. ^1] = targetNodeId.toByteArrayBE()
let payload = rlp.encode((data, expiration())) let payload = rlp.encode((data, expiration()))
let msg = pack(cmdFindNode, payload, d.privKey) let msg = pack(cmdFindNode, payload, d.privKey)
trace ">>> find_node to ", n#, ": ", msg.toHex() trace ">>> find_node to ", n #, ": ", msg.toHex()
d.send(n, msg) d.send(n, msg)
proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
@ -152,18 +163,23 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
nodes.setLen(0) nodes.setLen(0)
for i, n in neighbours: for i, n in neighbours:
nodes.add((n.node.address.ip, n.node.address.udpPort, nodes.add(
n.node.address.tcpPort, n.node.pubkey)) (n.node.address.ip, n.node.address.udpPort, n.node.address.tcpPort, n.node.pubkey)
)
if nodes.len == MAX_NEIGHBOURS_PER_PACKET: if nodes.len == MAX_NEIGHBOURS_PER_PACKET:
flush() flush()
if nodes.len != 0: flush() if nodes.len != 0:
flush()
proc newDiscoveryProtocol*( proc newDiscoveryProtocol*(
privKey: PrivateKey, address: Address, privKey: PrivateKey,
address: Address,
bootstrapNodes: openArray[ENode], bootstrapNodes: openArray[ENode],
bindPort: Port, bindIp = IPv4_any(), bindPort: Port,
rng = newRng()): DiscoveryProtocol = bindIp = IPv6_any(),
rng = newRng(),
): DiscoveryProtocol =
let let
localNode = newNode(privKey.toPublicKey(), address) localNode = newNode(privKey.toPublicKey(), address)
discovery = DiscoveryProtocol( discovery = DiscoveryProtocol(
@ -171,27 +187,32 @@ proc newDiscoveryProtocol*(
address: address, address: address,
localNode: localNode, localNode: localNode,
bindIp: bindIp, bindIp: bindIp,
bindPort: bindPort) bindPort: bindPort,
)
kademlia = newKademliaProtocol(localNode, discovery, rng = rng) kademlia = newKademliaProtocol(localNode, discovery, rng = rng)
discovery.kademlia = kademlia discovery.kademlia = kademlia
for n in bootstrapNodes: discovery.bootstrapNodes.add(newNode(n)) for n in bootstrapNodes:
discovery.bootstrapNodes.add(newNode(n))
discovery discovery
proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) proc recvPing(
{.raises: [ValueError].} = d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]
) {.raises: [ValueError].} =
d.kademlia.recvPing(node, msgHash) d.kademlia.recvPing(node, msgHash)
proc recvPong(d: DiscoveryProtocol, node: Node, payload: seq[byte]) proc recvPong(
{.raises: [RlpError].} = d: DiscoveryProtocol, node: Node, payload: seq[byte]
) {.raises: [RlpError].} =
let rlp = rlpFromBytes(payload) let rlp = rlpFromBytes(payload)
let tok = rlp.listElem(1).toBytes() let tok = rlp.listElem(1).toBytes()
d.kademlia.recvPong(node, tok) d.kademlia.recvPong(node, tok)
proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte]) proc recvNeighbours(
{.raises: [RlpError].} = d: DiscoveryProtocol, node: Node, payload: seq[byte]
) {.raises: [RlpError].} =
let rlp = rlpFromBytes(payload) let rlp = rlpFromBytes(payload)
let neighboursList = rlp.listElem(0) let neighboursList = rlp.listElem(0)
let sz = neighboursList.listLen() let sz = neighboursList.listLen()
@ -203,11 +224,9 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte])
var ip: IpAddress var ip: IpAddress
case ipBlob.len case ipBlob.len
of 4: of 4:
ip = IpAddress( ip = IpAddress(family: IpAddressFamily.IPv4, address_v4: toArray(4, ipBlob))
family: IpAddressFamily.IPv4, address_v4: toArray(4, ipBlob))
of 16: of 16:
ip = IpAddress( ip = IpAddress(family: IpAddressFamily.IPv6, address_v6: toArray(16, ipBlob))
family: IpAddressFamily.IPv6, address_v6: toArray(16, ipBlob))
else: else:
error "Wrong ip address length!" error "Wrong ip address length!"
continue continue
@ -222,8 +241,9 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte])
neighbours.add(newNode(pk[], Address(ip: ip, udpPort: udpPort, tcpPort: tcpPort))) neighbours.add(newNode(pk[], Address(ip: ip, udpPort: udpPort, tcpPort: tcpPort)))
d.kademlia.recvNeighbours(node, neighbours) d.kademlia.recvNeighbours(node, neighbours)
proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: openArray[byte]) proc recvFindNode(
{.raises: [RlpError, ValueError].} = d: DiscoveryProtocol, node: Node, payload: openArray[byte]
) {.raises: [RlpError, ValueError].} =
let rlp = rlpFromBytes(payload) let rlp = rlpFromBytes(payload)
trace "<<< find_node from ", node trace "<<< find_node from ", node
let rng = rlp.listElem(0).toBytes let rng = rlp.listElem(0).toBytes
@ -234,8 +254,9 @@ proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: openArray[byte])
else: else:
trace "Invalid target public key received" trace "Invalid target public key received"
proc expirationValid(cmdId: CommandId, rlpEncodedPayload: openArray[byte]): proc expirationValid(
bool {.raises: [DiscProtocolError, RlpError].} = cmdId: CommandId, rlpEncodedPayload: openArray[byte]
): bool {.raises: [DiscProtocolError, RlpError].} =
## Can only raise `DiscProtocolError` and all of `RlpError` ## Can only raise `DiscProtocolError` and all of `RlpError`
# Check if there is a payload # Check if there is a payload
if rlpEncodedPayload.len <= 0: if rlpEncodedPayload.len <= 0:
@ -250,8 +271,9 @@ proc expirationValid(cmdId: CommandId, rlpEncodedPayload: openArray[byte]):
else: else:
raise newException(DiscProtocolError, "Invalid RLP list for this packet id") raise newException(DiscProtocolError, "Invalid RLP list for this packet id")
proc receive*(d: DiscoveryProtocol, a: Address, msg: openArray[byte]) proc receive*(
{.raises: [DiscProtocolError, RlpError, ValueError].} = d: DiscoveryProtocol, a: Address, msg: openArray[byte]
) {.raises: [DiscProtocolError, RlpError, ValueError].} =
# Note: export only needed for testing # Note: export only needed for testing
let msgHash = validateMsgHash(msg) let msgHash = validateMsgHash(msg)
if msgHash.isOk(): if msgHash.isOk():
@ -280,10 +302,13 @@ proc receive*(d: DiscoveryProtocol, a: Address, msg: openArray[byte])
else: else:
notice "Wrong msg mac from ", a notice "Wrong msg mac from ", a
proc processClient(transp: DatagramTransport, raddr: TransportAddress): proc processClient(
Future[void] {.async: (raises: []).} = transp: DatagramTransport, raddr: TransportAddress
): Future[void] {.async: (raises: []).} =
var proto = getUserData[DiscoveryProtocol](transp) var proto = getUserData[DiscoveryProtocol](transp)
let buf = try: transp.getMessage() let buf =
try:
transp.getMessage()
except TransportOsError as e: except TransportOsError as e:
# This is likely to be local network connection issues. # This is likely to be local network connection issues.
warn "Transport getMessage", exception = e.name, msg = e.msg warn "Transport getMessage", exception = e.name, msg = e.msg
@ -326,26 +351,30 @@ proc randomNodes*(d: DiscoveryProtocol, count: int): seq[Node] =
d.kademlia.randomNodes(count) d.kademlia.randomNodes(count)
when isMainModule: when isMainModule:
import logging, stew/byteutils import stew/byteutils, ./bootnodes
const LOCAL_BOOTNODES = [
"enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301"
]
addHandler(newConsoleLogger())
block: block:
let m = hexToSeqByte"79664bff52ee17327b4a2d8f97d8fb32c9244d719e5038eb4f6b64da19ca6d271d659c3ad9ad7861a928ca85f8d8debfbe6b7ade26ad778f2ae2ba712567fcbd55bc09eb3e74a893d6b180370b266f6aaf3fe58a0ad95f7435bf3ddf1db940d20102f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2" let m =
hexToSeqByte"79664bff52ee17327b4a2d8f97d8fb32c9244d719e5038eb4f6b64da19ca6d271d659c3ad9ad7861a928ca85f8d8debfbe6b7ade26ad778f2ae2ba712567fcbd55bc09eb3e74a893d6b180370b266f6aaf3fe58a0ad95f7435bf3ddf1db940d20102f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2"
discard validateMsgHash(m).expect("valid hash") discard validateMsgHash(m).expect("valid hash")
var remotePubkey = recoverMsgPublicKey(m).expect("valid key") var remotePubkey = recoverMsgPublicKey(m).expect("valid key")
let (cmdId, payload) = unpack(m) let (cmdId, payload) = unpack(m)
doAssert(payload == hexToSeqByte"f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2") doAssert(
payload ==
hexToSeqByte"f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2"
)
doAssert(cmdId == cmdPong) doAssert(cmdId == cmdPong)
doAssert(remotePubkey == PublicKey.fromHex( doAssert(
"78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d")[]) remotePubkey ==
PublicKey.fromHex(
"78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d"
)[]
)
let privKey = PrivateKey.fromHex("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[] let privKey = PrivateKey.fromHex(
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617"
)[]
# echo privKey # echo privKey
@ -355,14 +384,14 @@ when isMainModule:
# let (remotePubkey, cmdId, payload) = unpack(m) # let (remotePubkey, cmdId, payload) = unpack(m)
# doAssert(remotePubkey.raw_key.toHex == privKey.public_key.raw_key.toHex) # doAssert(remotePubkey.raw_key.toHex == privKey.public_key.raw_key.toHex)
var bootnodes = newSeq[ENode]() var nodes = newSeq[ENode]()
for item in LOCAL_BOOTNODES: for item in MainnetBootnodes:
bootnodes.add(ENode.fromString(item)[]) nodes.add(ENode.fromString(item)[])
let listenPort = Port(30310) let listenPort = Port(30310)
var address = Address(udpPort: listenPort, tcpPort: listenPort) var address = Address(udpPort: listenPort, tcpPort: listenPort)
address.ip.family = IpAddressFamily.IPv4 address.ip.family = IpAddressFamily.IPv4
let discovery = newDiscoveryProtocol(privkey, address, bootnodes, listenPort) let discovery = newDiscoveryProtocol(privKey, address, nodes, bindPort = listenPort)
echo discovery.localNode.node.pubkey echo discovery.localNode.node.pubkey
echo "this_node.id: ", discovery.localNode.id.toHex() echo "this_node.id: ", discovery.localNode.id.toHex()
@ -372,5 +401,6 @@ when isMainModule:
proc test() {.async.} = proc test() {.async.} =
{.gcsafe.}: {.gcsafe.}:
await discovery.bootstrap() await discovery.bootstrap()
for node in discovery.randomNodes(discovery.kademlia.nodesDiscovered):
echo node
waitFor test() waitFor test()

View File

@ -49,7 +49,6 @@ type
istart, iend: UInt256 istart, iend: UInt256
nodes: seq[Node] nodes: seq[Node]
replacementCache: seq[Node] replacementCache: seq[Node]
lastUpdated: float # epochTime
CommandId* = enum CommandId* = enum
cmdPing = 1 cmdPing = 1
@ -216,7 +215,6 @@ proc add(k: KBucket, n: Node): Node =
## If the bucket is full, we add the node to the bucket's replacement cache and return the ## If the bucket is full, we add the node to the bucket's replacement cache and return the
## node at the head of the list (i.e. the least recently seen), which should be evicted if it ## node at the head of the list (i.e. the least recently seen), which should be evicted if it
## fails to respond to a ping. ## fails to respond to a ping.
k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n) let nodeIdx = k.nodes.find(n)
if nodeIdx != -1: if nodeIdx != -1:
k.nodes.delete(nodeIdx) k.nodes.delete(nodeIdx)