Add ip address voting through pong responses

This commit is contained in:
kdeme 2021-01-21 17:21:36 +01:00 committed by zah
parent 51a8795e56
commit e43ee6ef9c
5 changed files with 217 additions and 8 deletions

View File

@ -53,6 +53,7 @@ proc runP2pTests() =
"test_enr", "test_enr",
"test_hkdf", "test_hkdf",
"test_lru", "test_lru",
"test_ip_vote",
"test_discoveryv5", "test_discoveryv5",
"test_discoveryv5_encoding", "test_discoveryv5_encoding",
"test_routing_table" "test_routing_table"
@ -106,6 +107,7 @@ proc runDiscv5Tests() =
"test_enr", "test_enr",
"test_hkdf", "test_hkdf",
"test_lru", "test_lru",
"test_ip_vote",
"test_discoveryv5", "test_discoveryv5",
"test_discoveryv5_encoding", "test_discoveryv5_encoding",
"test_routing_table" "test_routing_table"

View File

@ -0,0 +1,78 @@
# nim-eth - Node Discovery Protocol v5
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, (LICENSE-APACHEv2)
# * MIT license (LICENSE-MIT)
# at your option.
# This file may not be copied, modified, or distributed except
# according to those terms.
## IP:port address votes implemented similarly as in
## https://github.com/sigp/discv5
##
## This allows the selection of a node its own public IP based on address
## information that is received from other nodes.
## This can be used in conjuction with discovery v5 ping-pong request responses
## that provide this information.
## To select the right address, a majority count is done. This is done over a
## sort of moving window as votes expire after `IpVoteTimeout`.
import
std/[tables, options],
chronos,
./node
export options
{.push raises: [Defect].}
const IpVoteTimeout = 5.minutes ## Duration until a vote expires
type
IpVote* = object
votes: Table[NodeId, (Address, chronos.Moment)]
threshold: uint ## Minimum threshold to allow for a majority to count
func init*(T: type IpVote, threshold: uint = 10): T =
## Initialize IpVote.
##
## If provided threshold is lower than 2 it will be set to 2.
if threshold < 2:
IpVote(threshold: 2)
else:
IpVote(threshold: threshold)
proc insert*(ipvote: var IpVote, key: NodeId, address: Address) =
## Insert a vote for an address coming from a specific `NodeId`. A `NodeId`
## can only hold 1 vote.
ipvote.votes[key] = (address, now(chronos.Moment) + IpVoteTimeout)
proc majority*(ipvote: var IpVote): Option[Address] =
## Get the majority of votes on an address. Pruning of votes older than
## `IpVoteTime` will be done before the majority count.
## Note: When there is a draw the selected "majority" will depend on whichever
## address comes first in the CountTable. This seems acceptable as there is no
## other criteria to make a selection.
let now = now(chronos.Moment)
var
pruneList: seq[NodeId]
ipCount: CountTable[Address]
for k, v in ipvote.votes:
if now > v[1]:
pruneList.add(k)
else:
ipCount.inc(v[0])
for id in pruneList:
ipvote.votes.del(id)
if ipCount.len <= 0:
return none(Address)
let (address, count) = ipCount.largest()
if uint(count) >= ipvote.threshold:
some(address)
else:
none(Address)

View File

@ -3,6 +3,8 @@ import
nimcrypto, stint, chronos, stew/shims/net, chronicles, nimcrypto, stint, chronos, stew/shims/net, chronicles,
eth/keys, enr eth/keys, enr
export stint
{.push raises: [Defect].} {.push raises: [Defect].}
type type
@ -60,10 +62,17 @@ proc updateNode*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress],
ok() ok()
func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw) func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw)
func `==`*(a, b: Node): bool = func `==`*(a, b: Node): bool =
(a.isNil and b.isNil) or (a.isNil and b.isNil) or
(not a.isNil and not b.isNil and a.pubkey == b.pubkey) (not a.isNil and not b.isNil and a.pubkey == b.pubkey)
proc random*(T: type NodeId, rng: var BrHmacDrbgContext): T =
var id: NodeId
brHmacDrbgGenerate(addr rng, addr id, csize_t(sizeof(id)))
id
func `$`*(id: NodeId): string = func `$`*(id: NodeId): string =
id.toHex() id.toHex()
@ -81,6 +90,9 @@ func shortLog*(id: NodeId): string =
result.add(sid[i]) result.add(sid[i])
chronicles.formatIt(NodeId): shortLog(it) chronicles.formatIt(NodeId): shortLog(it)
func hash*(a: Address): hashes.Hash =
hashData(unsafeAddr a, sizeof(a))
func `$`*(a: Address): string = func `$`*(a: Address): string =
result.add($a.ip) result.add($a.ip)
result.add(":" & $a.port) result.add(":" & $a.port)

View File

@ -77,7 +77,7 @@ import
stew/shims/net as stewNet, json_serialization/std/net, stew/shims/net as stewNet, json_serialization/std/net,
stew/endians2, chronicles, chronos, stint, bearssl, metrics, stew/endians2, chronicles, chronos, stint, bearssl, metrics,
eth/[rlp, keys, async_utils], eth/[rlp, keys, async_utils],
types, encoding, node, routing_table, enr, random2, sessions types, encoding, node, routing_table, enr, random2, sessions, ip_vote
import nimcrypto except toHex import nimcrypto except toHex
@ -126,6 +126,7 @@ type
revalidateLoop: Future[void] revalidateLoop: Future[void]
lastLookup: chronos.Moment lastLookup: chronos.Moment
bootstrapRecords*: seq[Record] bootstrapRecords*: seq[Record]
ipVote: IpVote
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
PendingRequest = object PendingRequest = object
@ -783,12 +784,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
proc queryRandom*(d: Protocol): Future[seq[Node]] proc queryRandom*(d: Protocol): Future[seq[Node]]
{.async, raises:[Exception, Defect].} = {.async, raises:[Exception, Defect].} =
## Perform a query for a random target, return all nodes discovered. ## Perform a query for a random target, return all nodes discovered.
var id: NodeId return await d.query(NodeId.random(d.rng[]))
var buf: array[sizeof(id), byte]
brHmacDrbgGenerate(d.rng[], buf)
copyMem(addr id, addr buf[0], sizeof(id))
return await d.query(id)
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])): proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
Future[seq[Node]] {.async, raises:[Exception, Defect].} = Future[seq[Node]] {.async, raises:[Exception, Defect].} =
@ -860,12 +856,27 @@ proc revalidateNode*(d: Protocol, n: Node)
let pong = await d.ping(n) let pong = await d.ping(n)
if pong.isOK(): if pong.isOK():
if pong.get().enrSeq > n.record.seqNum: let res = pong.get()
if res.enrSeq > n.record.seqNum:
# Request new ENR # Request new ENR
let nodes = await d.findNode(n, @[0'u32]) let nodes = await d.findNode(n, @[0'u32])
if nodes.isOk() and nodes[].len > 0: if nodes.isOk() and nodes[].len > 0:
discard d.addNode(nodes[][0]) discard d.addNode(nodes[][0])
# Get IP and port from pong message and add it to the ip votes
if res.ip.len == 4:
var ip: array[4, byte]
copyMem(addr ip, unsafeAddr res.ip[0], sizeof(ip))
let a = Address(ip: ipv4(ip), port: Port(res.port))
d.ipVote.insert(n.id, a);
elif res.ip.len == 16:
var ip: array[16, byte]
copyMem(addr ip, unsafeAddr res.ip[0], sizeof(ip))
let a = Address(ip: ipv6(ip), port: Port(res.port))
d.ipVote.insert(n.id, a);
else:
warn "Invalid IP address format", ip = res.ip, node = n
proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop which revalidates the nodes in the routing table by sending the ping ## Loop which revalidates the nodes in the routing table by sending the ping
## message. ## message.
@ -882,6 +893,7 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} = proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop that refreshes the routing table by starting a random query in case ## Loop that refreshes the routing table by starting a random query in case
## no queries were done since `refreshInterval` or more. ## no queries were done since `refreshInterval` or more.
## It also refreshes the majority address voted for via pong responses.
# TODO: General Exception raised. # TODO: General Exception raised.
try: try:
await d.populateTable() await d.populateTable()
@ -893,6 +905,11 @@ proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
trace "Discovered nodes in random target query", nodes = randomQuery.len trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len() debug "Total nodes in discv5 routing table", total = d.routingTable.len()
let majority = d.ipVote.majority()
if majority.isSome():
let address = majority.get()
debug "Majority on voted address", address
await sleepAsync(refreshInterval) await sleepAsync(refreshInterval)
except CancelledError: except CancelledError:
trace "refreshLoop canceled" trace "refreshLoop canceled"
@ -935,6 +952,7 @@ proc newProtocol*(privKey: PrivateKey,
codec: Codec(localNode: node, privKey: privKey, codec: Codec(localNode: node, privKey: privKey,
sessions: Sessions.init(256)), sessions: Sessions.init(256)),
bootstrapRecords: @bootstrapRecords, bootstrapRecords: @bootstrapRecords,
ipVote: IpVote.init(),
rng: rng) rng: rng)
result.routingTable.init(node, DefaultBitsPerHop, tableIpLimits, rng) result.routingTable.init(node, DefaultBitsPerHop, tableIpLimits, rng)

View File

@ -0,0 +1,99 @@
import
std/unittest,
eth/keys, stew/shims/net,
eth/p2p/discoveryv5/[node, ip_vote]
suite "IP vote":
let rng = newRng()
test "Majority vote":
var
votes = IpVote.init(2)
let
addr1 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(1))
addr2 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(2))
addr3 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(3))
votes.insert(NodeId.random(rng[]), addr1);
votes.insert(NodeId.random(rng[]), addr1);
votes.insert(NodeId.random(rng[]), addr2);
votes.insert(NodeId.random(rng[]), addr2);
votes.insert(NodeId.random(rng[]), addr2);
votes.insert(NodeId.random(rng[]), addr3);
votes.insert(NodeId.random(rng[]), addr3);
check votes.majority() == some(addr2)
test "Votes below threshold":
const threshold = 10
var
votes = IpVote.init(threshold)
let
addr1 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(1))
addr2 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(2))
addr3 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(3))
votes.insert(NodeId.random(rng[]), addr1);
votes.insert(NodeId.random(rng[]), addr2);
for i in 0..<(threshold - 1):
votes.insert(NodeId.random(rng[]), addr3);
check votes.majority().isNone()
test "Votes at threshold":
const threshold = 10
var
votes = IpVote.init(threshold)
let
addr1 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(1))
addr2 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(2))
addr3 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(3))
votes.insert(NodeId.random(rng[]), addr1);
votes.insert(NodeId.random(rng[]), addr2);
for i in 0..<(threshold):
votes.insert(NodeId.random(rng[]), addr3);
check votes.majority() == some(addr3)
test "Double votes with same address":
const threshold = 2
var
votes = IpVote.init(threshold)
let
addr1 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(1))
addr2 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(2))
let nodeIdA = NodeId.random(rng[])
votes.insert(nodeIdA, addr1);
votes.insert(nodeIdA, addr1);
votes.insert(nodeIdA, addr1);
votes.insert(NodeId.random(rng[]), addr2);
votes.insert(NodeId.random(rng[]), addr2);
check votes.majority() == some(addr2)
test "Double votes with different address":
const threshold = 2
var
votes = IpVote.init(threshold)
let
addr1 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(1))
addr2 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(2))
addr3 = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(3))
let nodeIdA = NodeId.random(rng[])
votes.insert(nodeIdA, addr1);
votes.insert(nodeIdA, addr2);
votes.insert(nodeIdA, addr3);
votes.insert(NodeId.random(rng[]), addr1);
votes.insert(NodeId.random(rng[]), addr2);
votes.insert(NodeId.random(rng[]), addr3);
check votes.majority() == some(addr3)