don't ping nodes during revalidation if they were pinged recently (#1779)
This commit is contained in:
parent
1bb389ad4e
commit
20885f4ca5
|
@ -11,7 +11,7 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, sets, algorithm],
|
std/[sequtils, sets, algorithm, tables],
|
||||||
stew/[results, byteutils, leb128, endians2], chronicles, chronos,
|
stew/[results, byteutils, leb128, endians2], chronicles, chronos,
|
||||||
nimcrypto/hash, bearssl, ssz_serialization, metrics, faststreams,
|
nimcrypto/hash, bearssl, ssz_serialization, metrics, faststreams,
|
||||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
||||||
|
@ -171,6 +171,7 @@ type
|
||||||
offerQueue: AsyncQueue[OfferRequest]
|
offerQueue: AsyncQueue[OfferRequest]
|
||||||
offerWorkers: seq[Future[void]]
|
offerWorkers: seq[Future[void]]
|
||||||
disablePoke: bool
|
disablePoke: bool
|
||||||
|
pingTimings: Table[NodeId, chronos.Moment]
|
||||||
|
|
||||||
PortalResult*[T] = Result[T, string]
|
PortalResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
@ -476,7 +477,9 @@ proc new*(T: type PortalProtocol,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
radiusCache: RadiusCache.init(256),
|
radiusCache: RadiusCache.init(256),
|
||||||
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
|
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
|
||||||
disablePoke: config.disablePoke)
|
disablePoke: config.disablePoke,
|
||||||
|
pingTimings: initTable[NodeId, chronos.Moment]()
|
||||||
|
)
|
||||||
|
|
||||||
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
|
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
|
||||||
"Only one protocol should have this id")
|
"Only one protocol should have this id")
|
||||||
|
@ -520,6 +523,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||||
else:
|
else:
|
||||||
debug "Error receiving message response", error = messageResponse.error,
|
debug "Error receiving message response", error = messageResponse.error,
|
||||||
srcId = dst.id, srcAddress = dst.address
|
srcId = dst.id, srcAddress = dst.address
|
||||||
|
p.pingTimings.del(dst.id)
|
||||||
p.routingTable.replaceNode(dst)
|
p.routingTable.replaceNode(dst)
|
||||||
|
|
||||||
return messageResponse
|
return messageResponse
|
||||||
|
@ -569,6 +573,9 @@ proc ping*(p: PortalProtocol, dst: Node):
|
||||||
let pongResponse = await p.pingImpl(dst)
|
let pongResponse = await p.pingImpl(dst)
|
||||||
|
|
||||||
if pongResponse.isOk():
|
if pongResponse.isOk():
|
||||||
|
# Update last time we pinged this node
|
||||||
|
p.pingTimings[dst.id] = now(chronos.Moment)
|
||||||
|
|
||||||
let pong = pongResponse.get()
|
let pong = pongResponse.get()
|
||||||
# TODO: This should become custom per Portal Network
|
# TODO: This should become custom per Portal Network
|
||||||
let customPayloadDecoded =
|
let customPayloadDecoded =
|
||||||
|
@ -1252,15 +1259,28 @@ proc revalidateNode*(p: PortalProtocol, n: Node) {.async.} =
|
||||||
if nodes.len > 0: # Normally a node should only return 1 record actually
|
if nodes.len > 0: # Normally a node should only return 1 record actually
|
||||||
discard p.routingTable.addNode(nodes[0])
|
discard p.routingTable.addNode(nodes[0])
|
||||||
|
|
||||||
|
proc getNodeForRevalidation(p: PortalProtocol): Opt[Node] =
|
||||||
|
let node = p.routingTable.nodeToRevalidate()
|
||||||
|
if node.isNil:
|
||||||
|
return Opt.none(Node)
|
||||||
|
|
||||||
|
let now = now(chronos.Moment)
|
||||||
|
let timestamp = p.pingTimings.getOrDefault(node.id, now)
|
||||||
|
|
||||||
|
if (timestamp + revalidationTimeout) <= now:
|
||||||
|
Opt.some(node)
|
||||||
|
else:
|
||||||
|
Opt.none(Node)
|
||||||
|
|
||||||
proc revalidateLoop(p: PortalProtocol) {.async.} =
|
proc revalidateLoop(p: PortalProtocol) {.async.} =
|
||||||
## Loop which revalidates the nodes in the routing table by sending the ping
|
## Loop which revalidates the nodes in the routing table by sending the ping
|
||||||
## message.
|
## message.
|
||||||
try:
|
try:
|
||||||
while true:
|
while true:
|
||||||
await sleepAsync(milliseconds(p.baseProtocol.rng[].rand(revalidateMax)))
|
await sleepAsync(milliseconds(p.baseProtocol.rng[].rand(revalidateMax)))
|
||||||
let n = p.routingTable.nodeToRevalidate()
|
let n = getNodeForRevalidation(p)
|
||||||
if not n.isNil:
|
if n.isSome:
|
||||||
asyncSpawn p.revalidateNode(n)
|
asyncSpawn p.revalidateNode(n.get())
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "revalidateLoop canceled"
|
trace "revalidateLoop canceled"
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
import
|
import
|
||||||
std/strutils,
|
std/strutils,
|
||||||
confutils,
|
confutils,
|
||||||
|
chronos,
|
||||||
eth/p2p/discoveryv5/routing_table
|
eth/p2p/discoveryv5/routing_table
|
||||||
|
|
||||||
type
|
type
|
||||||
|
@ -29,10 +30,12 @@ type
|
||||||
radiusConfig*: RadiusConfig
|
radiusConfig*: RadiusConfig
|
||||||
disablePoke*: bool
|
disablePoke*: bool
|
||||||
|
|
||||||
|
|
||||||
const
|
const
|
||||||
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
||||||
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
|
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
|
||||||
defaultDisablePoke* = false
|
defaultDisablePoke* = false
|
||||||
|
revalidationTimeout* = chronos.seconds(30)
|
||||||
|
|
||||||
defaultPortalProtocolConfig* = PortalProtocolConfig(
|
defaultPortalProtocolConfig* = PortalProtocolConfig(
|
||||||
tableIpLimits: DefaultTableIpLimits,
|
tableIpLimits: DefaultTableIpLimits,
|
||||||
|
|
Loading…
Reference in New Issue