mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-24 09:48:24 +00:00
Portal protocol clean-up for some routing table calls (#1904)
This commit is contained in:
parent
11e3171723
commit
cf9a44f78e
@ -459,7 +459,7 @@ proc createStoreHandler*(
|
|||||||
of Dynamic:
|
of Dynamic:
|
||||||
# In case of dynamic radius setting we obey storage limits and adjust
|
# In case of dynamic radius setting we obey storage limits and adjust
|
||||||
# radius to store network fraction corresponding to those storage limits.
|
# radius to store network fraction corresponding to those storage limits.
|
||||||
let res = db.put(contentId, content, p.baseProtocol.localNode.id)
|
let res = db.put(contentId, content, p.localNode.id)
|
||||||
if res.kind == DbPruned:
|
if res.kind == DbPruned:
|
||||||
portal_pruning_counter.inc(labelValues = [$p.protocolId])
|
portal_pruning_counter.inc(labelValues = [$p.protocolId])
|
||||||
portal_pruning_deleted_elements.set(
|
portal_pruning_deleted_elements.set(
|
||||||
|
@ -227,7 +227,7 @@ type
|
|||||||
utpTransfer*: bool
|
utpTransfer*: bool
|
||||||
trace*: TraceObject
|
trace*: TraceObject
|
||||||
|
|
||||||
proc init*(
|
func init*(
|
||||||
T: type ContentKV,
|
T: type ContentKV,
|
||||||
contentKey: ByteList,
|
contentKey: ByteList,
|
||||||
content: seq[byte]): T =
|
content: seq[byte]): T =
|
||||||
@ -236,7 +236,7 @@ proc init*(
|
|||||||
content: content
|
content: content
|
||||||
)
|
)
|
||||||
|
|
||||||
proc init*(
|
func init*(
|
||||||
T: type ContentLookupResult,
|
T: type ContentLookupResult,
|
||||||
content: seq[byte],
|
content: seq[byte],
|
||||||
utpTransfer: bool,
|
utpTransfer: bool,
|
||||||
@ -260,7 +260,7 @@ proc addNode*(p: PortalProtocol, r: Record): bool =
|
|||||||
else:
|
else:
|
||||||
false
|
false
|
||||||
|
|
||||||
proc getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
|
func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
|
||||||
p.routingTable.getNode(id)
|
p.routingTable.getNode(id)
|
||||||
|
|
||||||
func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
||||||
@ -268,12 +268,18 @@ func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
|||||||
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
||||||
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
||||||
|
|
||||||
proc inRange(
|
func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
|
||||||
p: PortalProtocol,
|
p.routingTable.distance(a, b)
|
||||||
nodeId: NodeId,
|
|
||||||
nodeRadius: UInt256,
|
func logDistance(p: PortalProtocol, a, b: NodeId): uint16 =
|
||||||
contentId: ContentId): bool =
|
p.routingTable.logDistance(a, b)
|
||||||
let distance = p.routingTable.distance(nodeId, contentId)
|
|
||||||
|
func inRange(
|
||||||
|
p: PortalProtocol,
|
||||||
|
nodeId: NodeId,
|
||||||
|
nodeRadius: UInt256,
|
||||||
|
contentId: ContentId): bool =
|
||||||
|
let distance = p.distance(nodeId, contentId)
|
||||||
distance <= nodeRadius
|
distance <= nodeRadius
|
||||||
|
|
||||||
func inRange*(p: PortalProtocol, contentId: ContentId): bool =
|
func inRange*(p: PortalProtocol, contentId: ContentId): bool =
|
||||||
@ -306,7 +312,7 @@ func handlePing(
|
|||||||
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
|
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
|
||||||
|
|
||||||
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
||||||
let p = PongMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
let p = PongMessage(enrSeq: p.localNode.record.seqNum,
|
||||||
customPayload: ByteList(SSZ.encode(customPayload)))
|
customPayload: ByteList(SSZ.encode(customPayload)))
|
||||||
|
|
||||||
encodeMessage(p)
|
encodeMessage(p)
|
||||||
@ -317,7 +323,7 @@ proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
|
|||||||
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
||||||
elif fn.distances.contains(0):
|
elif fn.distances.contains(0):
|
||||||
# A request for our own record.
|
# A request for our own record.
|
||||||
let enr = ByteList(rlp.encode(p.baseProtocol.localNode.record))
|
let enr = ByteList(rlp.encode(p.localNode.record))
|
||||||
encodeMessage(NodesMessage(total: 1, enrs: List[ByteList, 32](@[enr])))
|
encodeMessage(NodesMessage(total: 1, enrs: List[ByteList, 32](@[enr])))
|
||||||
else:
|
else:
|
||||||
let distances = fn.distances.asSeq()
|
let distances = fn.distances.asSeq()
|
||||||
@ -359,7 +365,7 @@ proc handleFindContent(
|
|||||||
# discv5 layer.
|
# discv5 layer.
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
let logDistance = p.routingTable.logDistance(contentId, p.localNode.id)
|
let logDistance = p.logDistance(contentId, p.localNode.id)
|
||||||
portal_find_content_log_distance.observe(
|
portal_find_content_log_distance.observe(
|
||||||
int64(logDistance), labelValues = [$p.protocolId])
|
int64(logDistance), labelValues = [$p.protocolId])
|
||||||
|
|
||||||
@ -406,7 +412,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
|
|||||||
if contentIdResult.isOk():
|
if contentIdResult.isOk():
|
||||||
let contentId = contentIdResult.get()
|
let contentId = contentIdResult.get()
|
||||||
|
|
||||||
let logDistance = p.routingTable.logDistance(contentId, p.localNode.id)
|
let logDistance = p.logDistance(contentId, p.localNode.id)
|
||||||
portal_offer_log_distance.observe(
|
portal_offer_log_distance.observe(
|
||||||
int64(logDistance), labelValues = [$p.protocolId])
|
int64(logDistance), labelValues = [$p.protocolId])
|
||||||
|
|
||||||
@ -564,7 +570,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
|||||||
proc pingImpl*(p: PortalProtocol, dst: Node):
|
proc pingImpl*(p: PortalProtocol, dst: Node):
|
||||||
Future[PortalResult[PongMessage]] {.async.} =
|
Future[PortalResult[PongMessage]] {.async.} =
|
||||||
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
||||||
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
let ping = PingMessage(enrSeq: p.localNode.record.seqNum,
|
||||||
customPayload: ByteList(SSZ.encode(customPayload)))
|
customPayload: ByteList(SSZ.encode(customPayload)))
|
||||||
|
|
||||||
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
||||||
@ -925,8 +931,8 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
|
|||||||
seenOnly = false)
|
seenOnly = false)
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
asked.incl(p.localNode.id) # No need to ask our own node
|
||||||
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
seen.incl(p.localNode.id) # No need to discover our own node
|
||||||
for node in closestNodes:
|
for node in closestNodes:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
@ -965,8 +971,8 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
|
|||||||
# If it wasn't seen before, insert node while remaining sorted
|
# If it wasn't seen before, insert node while remaining sorted
|
||||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||||
proc(x: Node, n: Node): int =
|
proc(x: Node, n: Node): int =
|
||||||
cmp(p.routingTable.distance(x.id, target),
|
cmp(p.distance(x.id, target),
|
||||||
p.routingTable.distance(n.id, target))
|
p.distance(n.id, target))
|
||||||
))
|
))
|
||||||
|
|
||||||
if closestNodes.len > BUCKET_SIZE:
|
if closestNodes.len > BUCKET_SIZE:
|
||||||
@ -1019,8 +1025,8 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
|||||||
p.baseProtocol.rng[].shuffle(closestNodes)
|
p.baseProtocol.rng[].shuffle(closestNodes)
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
asked.incl(p.localNode.id) # No need to ask our own node
|
||||||
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
seen.incl(p.localNode.id) # No need to discover our own node
|
||||||
for node in closestNodes:
|
for node in closestNodes:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
@ -1075,8 +1081,8 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
|||||||
# If it wasn't seen before, insert node while remaining sorted
|
# If it wasn't seen before, insert node while remaining sorted
|
||||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||||
proc(x: Node, n: Node): int =
|
proc(x: Node, n: Node): int =
|
||||||
cmp(p.routingTable.distance(x.id, targetId),
|
cmp(p.distance(x.id, targetId),
|
||||||
p.routingTable.distance(n.id, targetId))
|
p.distance(n.id, targetId))
|
||||||
))
|
))
|
||||||
|
|
||||||
if closestNodes.len > BUCKET_SIZE:
|
if closestNodes.len > BUCKET_SIZE:
|
||||||
@ -1114,8 +1120,8 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
var metadata = initTable[string, NodeMetadata]()
|
var metadata = initTable[string, NodeMetadata]()
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
asked.incl(p.localNode.id) # No need to ask our own node
|
||||||
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
seen.incl(p.localNode.id) # No need to discover our own node
|
||||||
for node in closestNodes:
|
for node in closestNodes:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
@ -1127,7 +1133,7 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
|
|
||||||
metadata["0x" & $p.localNode.id] = NodeMetadata(
|
metadata["0x" & $p.localNode.id] = NodeMetadata(
|
||||||
enr: p.localNode.record,
|
enr: p.localNode.record,
|
||||||
distance: p.routingTable.distance(p.localNode.id, targetId)
|
distance: p.distance(p.localNode.id, targetId)
|
||||||
)
|
)
|
||||||
|
|
||||||
# We should also have metadata for all the closes nodes
|
# We should also have metadata for all the closes nodes
|
||||||
@ -1135,7 +1141,7 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
for cn in closestNodes:
|
for cn in closestNodes:
|
||||||
metadata["0x" & $cn.id] = NodeMetadata(
|
metadata["0x" & $cn.id] = NodeMetadata(
|
||||||
enr: cn.record,
|
enr: cn.record,
|
||||||
distance: p.routingTable.distance(cn.id, targetId)
|
distance: p.distance(cn.id, targetId)
|
||||||
)
|
)
|
||||||
|
|
||||||
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
|
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
|
||||||
@ -1191,7 +1197,7 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
var respondedWith = newSeq[NodeId]()
|
var respondedWith = newSeq[NodeId]()
|
||||||
|
|
||||||
for n in content.nodes:
|
for n in content.nodes:
|
||||||
let dist = p.routingTable.distance(n.id, targetId)
|
let dist = p.distance(n.id, targetId)
|
||||||
|
|
||||||
metadata["0x" & $n.id] = NodeMetadata(
|
metadata["0x" & $n.id] = NodeMetadata(
|
||||||
enr: n.record,
|
enr: n.record,
|
||||||
@ -1200,17 +1206,17 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
respondedWith.add(n.id)
|
respondedWith.add(n.id)
|
||||||
|
|
||||||
if not seen.containsOrIncl(n.id):
|
if not seen.containsOrIncl(n.id):
|
||||||
discard p.routingTable.addNode(n)
|
discard p.addNode(n)
|
||||||
# If it wasn't seen before, insert node while remaining sorted
|
# If it wasn't seen before, insert node while remaining sorted
|
||||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||||
proc(x: Node, n: Node): int =
|
proc(x: Node, n: Node): int =
|
||||||
cmp(p.routingTable.distance(x.id, targetId), dist)
|
cmp(p.distance(x.id, targetId), dist)
|
||||||
))
|
))
|
||||||
|
|
||||||
if closestNodes.len > BUCKET_SIZE:
|
if closestNodes.len > BUCKET_SIZE:
|
||||||
closestNodes.del(closestNodes.high())
|
closestNodes.del(closestNodes.high())
|
||||||
|
|
||||||
let distance = p.routingTable.distance(content.src.id, targetId)
|
let distance = p.distance(content.src.id, targetId)
|
||||||
|
|
||||||
let address = content.src.address.get()
|
let address = content.src.address.get()
|
||||||
|
|
||||||
@ -1232,7 +1238,7 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
f.cancelSoon()
|
f.cancelSoon()
|
||||||
portal_lookup_content_requests.observe(requestAmount)
|
portal_lookup_content_requests.observe(requestAmount)
|
||||||
|
|
||||||
let distance = p.routingTable.distance(content.src.id, targetId)
|
let distance = p.distance(content.src.id, targetId)
|
||||||
|
|
||||||
responses["0x" & $content.src.id] = TraceResponse(
|
responses["0x" & $content.src.id] = TraceResponse(
|
||||||
durationMs: duration,
|
durationMs: duration,
|
||||||
@ -1250,7 +1256,7 @@ proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256)
|
|||||||
pendingNodeIds.add(pn.id)
|
pendingNodeIds.add(pn.id)
|
||||||
metadata["0x" & $pn.id] = NodeMetadata(
|
metadata["0x" & $pn.id] = NodeMetadata(
|
||||||
enr: pn.record,
|
enr: pn.record,
|
||||||
distance: p.routingTable.distance(pn.id, targetId)
|
distance: p.distance(pn.id, targetId)
|
||||||
)
|
)
|
||||||
|
|
||||||
return Opt.some(TraceContentLookupResult(
|
return Opt.some(TraceContentLookupResult(
|
||||||
@ -1285,8 +1291,8 @@ proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node
|
|||||||
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
|
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
asked.incl(p.localNode.id) # No need to ask our own node
|
||||||
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
seen.incl(p.localNode.id) # No need to discover our own node
|
||||||
for node in queryBuffer:
|
for node in queryBuffer:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
@ -1484,7 +1490,7 @@ proc populateTable(p: PortalProtocol) {.async.} =
|
|||||||
logScope:
|
logScope:
|
||||||
protocolId = p.protocolId
|
protocolId = p.protocolId
|
||||||
|
|
||||||
let selfQuery = await p.query(p.baseProtocol.localNode.id)
|
let selfQuery = await p.query(p.localNode.id)
|
||||||
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
||||||
|
|
||||||
for i in 0..<initialLookups:
|
for i in 0..<initialLookups:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user