mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-16 15:25:24 +00:00
56caa5f62f
The radius sort performance improvement in content lookups was not implemented in the trace version. Also cleanup some part of the logging related to uTP connection setup.
1868 lines
66 KiB
Nim
1868 lines
66 KiB
Nim
# Fluffy
|
|
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
## Implementation of the Portal wire protocol as specified at:
|
|
## https://github.com/ethereum/portal-network-specs/blob/master/portal-wire-protocol.md
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[sequtils, sets, algorithm, tables],
|
|
stew/[byteutils, leb128, endians2],
|
|
results,
|
|
chronicles,
|
|
chronos,
|
|
nimcrypto/hash,
|
|
bearssl,
|
|
ssz_serialization,
|
|
metrics,
|
|
faststreams,
|
|
minilru,
|
|
eth/rlp,
|
|
eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
|
|
"."/[portal_stream, portal_protocol_config],
|
|
./messages
|
|
|
|
from std/times import epochTime # For system timestamp in traceContentLookup
|
|
|
|
export messages, routing_table, protocol
|
|
|
|
declareCounter portal_message_requests_incoming,
|
|
"Portal wire protocol incoming message requests",
|
|
labels = ["protocol_id", "message_type"]
|
|
declareCounter portal_message_decoding_failures,
|
|
"Portal wire protocol message decoding failures", labels = ["protocol_id"]
|
|
declareCounter portal_message_requests_outgoing,
|
|
"Portal wire protocol outgoing message requests",
|
|
labels = ["protocol_id", "message_type"]
|
|
declareCounter portal_message_response_incoming,
|
|
"Portal wire protocol incoming message responses",
|
|
labels = ["protocol_id", "message_type"]
|
|
|
|
const requestBuckets = [1.0, 3.0, 5.0, 7.0, 9.0, Inf]
|
|
declareHistogram portal_lookup_node_requests,
|
|
"Portal wire protocol amount of requests per node lookup",
|
|
labels = ["protocol_id"],
|
|
buckets = requestBuckets
|
|
declareHistogram portal_lookup_content_requests,
|
|
"Portal wire protocol amount of requests per content lookup",
|
|
labels = ["protocol_id"],
|
|
buckets = requestBuckets
|
|
declareCounter portal_lookup_content_failures,
|
|
"Portal wire protocol content lookup failures", labels = ["protocol_id"]
|
|
|
|
const contentKeysBuckets = [0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, Inf]
|
|
declareHistogram portal_content_keys_offered,
|
|
"Portal wire protocol amount of content keys per offer message send",
|
|
labels = ["protocol_id"],
|
|
buckets = contentKeysBuckets
|
|
declareHistogram portal_content_keys_accepted,
|
|
"Portal wire protocol amount of content keys per accept message received",
|
|
labels = ["protocol_id"],
|
|
buckets = contentKeysBuckets
|
|
declareCounter portal_gossip_offers_successful,
|
|
"Portal wire protocol successful content offers from neighborhood gossip",
|
|
labels = ["protocol_id"]
|
|
declareCounter portal_gossip_offers_failed,
|
|
"Portal wire protocol failed content offers from neighborhood gossip",
|
|
labels = ["protocol_id"]
|
|
declareCounter portal_gossip_with_lookup,
|
|
"Portal wire protocol neighborhood gossip that required a node lookup",
|
|
labels = ["protocol_id"]
|
|
declareCounter portal_gossip_without_lookup,
|
|
"Portal wire protocol neighborhood gossip that did not require a node lookup",
|
|
labels = ["protocol_id"]
|
|
declareCounter portal_content_cache_hits,
|
|
"Portal wire protocol local content lookups that hit the cache",
|
|
labels = ["protocol_id"]
|
|
declareCounter portal_content_cache_misses,
|
|
"Portal wire protocol local content lookups that don't hit the cache",
|
|
labels = ["protocol_id"]
|
|
|
|
declareCounter portal_poke_offers,
|
|
"Portal wire protocol offers through poke mechanism", labels = ["protocol_id"]
|
|
|
|
# Note: These metrics are to get some idea on how many enrs are send on average.
|
|
# Relevant issue: https://github.com/ethereum/portal-network-specs/issues/136
|
|
const enrsBuckets = [0.0, 1.0, 3.0, 5.0, 8.0, 9.0, Inf]
|
|
declareHistogram portal_nodes_enrs_packed,
|
|
"Portal wire protocol amount of enrs packed in a nodes message",
|
|
labels = ["protocol_id"],
|
|
buckets = enrsBuckets
|
|
# This one will currently hit the max numbers because all neighbours are send,
|
|
# not only the ones closer to the content.
|
|
declareHistogram portal_content_enrs_packed,
|
|
"Portal wire protocol amount of enrs packed in a content message",
|
|
labels = ["protocol_id"],
|
|
buckets = enrsBuckets
|
|
|
|
const distanceBuckets = [
|
|
float64 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253,
|
|
254, 255, 256,
|
|
]
|
|
declareHistogram portal_find_content_log_distance,
|
|
"Portal wire protocol logarithmic distance of requested content",
|
|
labels = ["protocol_id"],
|
|
buckets = distanceBuckets
|
|
|
|
declareHistogram portal_offer_log_distance,
|
|
"Portal wire protocol logarithmic distance of offered content",
|
|
labels = ["protocol_id"],
|
|
buckets = distanceBuckets
|
|
|
|
logScope:
|
|
topics = "portal_wire"
|
|
|
|
const
|
|
enrsResultLimit* = 32 ## Maximum amount of ENRs in the total Nodes messages
|
|
## that will be processed
|
|
refreshInterval = 5.minutes ## Interval of launching a random query to
|
|
## refresh the routing table.
|
|
revalidateMax = 4000 ## Revalidation of a peer is done between 0 and this
|
|
## value in milliseconds
|
|
initialLookups = 1 ## Amount of lookups done when populating the routing table
|
|
|
|
type
|
|
ToContentIdHandler* =
|
|
proc(contentKey: ContentKeyByteList): results.Opt[ContentId] {.raises: [], gcsafe.}
|
|
|
|
DbGetHandler* = proc(
|
|
contentKey: ContentKeyByteList, contentId: ContentId
|
|
): results.Opt[seq[byte]] {.raises: [], gcsafe.}
|
|
|
|
DbStoreHandler* = proc(
|
|
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
|
|
) {.raises: [], gcsafe.}
|
|
|
|
DbContainsHandler* = proc(contentKey: ContentKeyByteList, contentId: ContentId): bool {.
|
|
raises: [], gcsafe
|
|
.}
|
|
|
|
DbRadiusHandler* = proc(): UInt256 {.raises: [], gcsafe.}
|
|
|
|
PortalProtocolId* = array[2, byte]
|
|
|
|
RadiusCache* = LruCache[NodeId, UInt256]
|
|
|
|
ContentCache = LruCache[ContentId, seq[byte]]
|
|
|
|
ContentKV* = object
|
|
contentKey*: ContentKeyByteList
|
|
content*: seq[byte]
|
|
|
|
OfferRequestType = enum
|
|
Direct
|
|
Database
|
|
|
|
OfferRequest = object
|
|
dst: Node
|
|
case kind: OfferRequestType
|
|
of Direct:
|
|
contentList: List[ContentKV, contentKeysLimit]
|
|
of Database:
|
|
contentKeys: ContentKeysList
|
|
|
|
PortalProtocol* = ref object of TalkProtocol
|
|
protocolId*: PortalProtocolId
|
|
routingTable*: RoutingTable
|
|
baseProtocol*: protocol.Protocol
|
|
toContentId*: ToContentIdHandler
|
|
contentCache: ContentCache
|
|
dbGet*: DbGetHandler
|
|
dbPut*: DbStoreHandler
|
|
dbContains*: DbContainsHandler
|
|
dataRadius*: DbRadiusHandler
|
|
bootstrapRecords*: seq[Record]
|
|
lastLookup: chronos.Moment
|
|
refreshLoop: Future[void]
|
|
revalidateLoop: Future[void]
|
|
stream*: PortalStream
|
|
radiusCache: RadiusCache
|
|
offerQueue: AsyncQueue[OfferRequest]
|
|
offerWorkers: seq[Future[void]]
|
|
pingTimings: Table[NodeId, chronos.Moment]
|
|
config*: PortalProtocolConfig
|
|
|
|
PortalResult*[T] = Result[T, string]
|
|
|
|
FoundContentKind* = enum
|
|
Nodes
|
|
Content
|
|
|
|
FoundContent* = object
|
|
src*: Node
|
|
case kind*: FoundContentKind
|
|
of Content:
|
|
content*: seq[byte]
|
|
utpTransfer*: bool
|
|
of Nodes:
|
|
nodes*: seq[Node]
|
|
|
|
ContentLookupResult* = object
|
|
content*: seq[byte]
|
|
utpTransfer*: bool
|
|
# List of nodes which do not have requested content, and for which
|
|
# content is in their range
|
|
nodesInterestedInContent*: seq[Node]
|
|
|
|
TraceResponse* = object
|
|
durationMs*: int64
|
|
respondedWith*: seq[NodeId]
|
|
|
|
NodeMetadata* = object
|
|
enr*: Record
|
|
distance*: UInt256
|
|
|
|
TraceObject* = object
|
|
origin*: NodeId
|
|
targetId: UInt256
|
|
receivedFrom*: Opt[NodeId]
|
|
responses*: Table[string, TraceResponse]
|
|
metadata*: Table[string, NodeMetadata]
|
|
cancelled*: seq[NodeId]
|
|
startedAtMs*: int64
|
|
|
|
TraceContentLookupResult* = object
|
|
content*: Opt[seq[byte]]
|
|
utpTransfer*: bool
|
|
trace*: TraceObject
|
|
|
|
func init*(T: type ContentKV, contentKey: ContentKeyByteList, content: seq[byte]): T =
|
|
ContentKV(contentKey: contentKey, content: content)
|
|
|
|
func init*(
|
|
T: type ContentLookupResult,
|
|
content: seq[byte],
|
|
utpTransfer: bool,
|
|
nodesInterestedInContent: seq[Node],
|
|
): T =
|
|
ContentLookupResult(
|
|
content: content,
|
|
utpTransfer: utpTransfer,
|
|
nodesInterestedInContent: nodesInterestedInContent,
|
|
)
|
|
|
|
func getProtocolId*(
|
|
network: PortalNetwork, subnetwork: PortalSubnetwork
|
|
): PortalProtocolId =
|
|
const portalPrefix = byte(0x50)
|
|
|
|
case network
|
|
of PortalNetwork.none, PortalNetwork.mainnet:
|
|
case subnetwork
|
|
of PortalSubnetwork.state:
|
|
[portalPrefix, 0x0A]
|
|
of PortalSubnetwork.history:
|
|
[portalPrefix, 0x0B]
|
|
of PortalSubnetwork.beacon:
|
|
[portalPrefix, 0x0C]
|
|
of PortalSubnetwork.transactionIndex:
|
|
[portalPrefix, 0x0D]
|
|
of PortalSubnetwork.verkleState:
|
|
[portalPrefix, 0x0E]
|
|
of PortalSubnetwork.transactionGossip:
|
|
[portalPrefix, 0x0F]
|
|
of PortalNetwork.angelfood:
|
|
case subnetwork
|
|
of PortalSubnetwork.state:
|
|
[portalPrefix, 0x4A]
|
|
of PortalSubnetwork.history:
|
|
[portalPrefix, 0x4B]
|
|
of PortalSubnetwork.beacon:
|
|
[portalPrefix, 0x4C]
|
|
of PortalSubnetwork.transactionIndex:
|
|
[portalPrefix, 0x4D]
|
|
of PortalSubnetwork.verkleState:
|
|
[portalPrefix, 0x4E]
|
|
of PortalSubnetwork.transactionGossip:
|
|
[portalPrefix, 0x4F]
|
|
|
|
func `$`(id: PortalProtocolId): string =
|
|
id.toHex()
|
|
|
|
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
|
p.routingTable.addNode(node)
|
|
|
|
proc addNode*(p: PortalProtocol, r: Record): bool =
|
|
p.addNode(Node.fromRecord(r)) == Added
|
|
|
|
func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
|
|
p.routingTable.getNode(id)
|
|
|
|
func localNode*(p: PortalProtocol): Node =
|
|
p.baseProtocol.localNode
|
|
|
|
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
|
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
|
|
|
func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
|
|
p.routingTable.distance(a, b)
|
|
|
|
func logDistance(p: PortalProtocol, a, b: NodeId): uint16 =
|
|
p.routingTable.logDistance(a, b)
|
|
|
|
func inRange(
|
|
p: PortalProtocol, nodeId: NodeId, nodeRadius: UInt256, contentId: ContentId
|
|
): bool =
|
|
let distance = p.distance(nodeId, contentId)
|
|
distance <= nodeRadius
|
|
|
|
template inRange*(p: PortalProtocol, contentId: ContentId): bool =
|
|
p.inRange(p.localNode.id, p.dataRadius(), contentId)
|
|
|
|
func truncateEnrs(
|
|
nodes: seq[Node], maxSize: int, enrOverhead: int
|
|
): List[ByteList[2048], 32] =
|
|
var enrs: List[ByteList[2048], 32]
|
|
var totalSize = 0
|
|
for n in nodes:
|
|
let enr = ByteList[2048].init(n.record.raw)
|
|
if totalSize + enr.len() + enrOverhead <= maxSize:
|
|
let res = enrs.add(enr)
|
|
# With max payload of discv5 and the sizes of ENRs this should not occur.
|
|
doAssert(res, "32 limit will not be reached")
|
|
totalSize = totalSize + enr.len() + enrOverhead
|
|
else:
|
|
break
|
|
|
|
enrs
|
|
|
|
proc handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] =
|
|
# TODO: This should become custom per Portal Network
|
|
# TODO: Need to think about the effect of malicious actor sending lots of
|
|
# pings from different nodes to clear the LRU.
|
|
let customPayloadDecoded =
|
|
try:
|
|
SSZ.decode(ping.customPayload.asSeq(), CustomPayload)
|
|
except SerializationError:
|
|
# invalid custom payload, send empty back
|
|
return @[]
|
|
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
|
|
|
|
let customPayload = CustomPayload(dataRadius: p.dataRadius())
|
|
let p = PongMessage(
|
|
enrSeq: p.localNode.record.seqNum,
|
|
customPayload: ByteList[2048](SSZ.encode(customPayload)),
|
|
)
|
|
|
|
encodeMessage(p)
|
|
|
|
proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
|
|
if fn.distances.len == 0:
|
|
let enrs = List[ByteList[2048], 32](@[])
|
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
|
elif fn.distances.contains(0):
|
|
# A request for our own record.
|
|
let enr = ByteList[2048](rlp.encode(p.localNode.record))
|
|
encodeMessage(NodesMessage(total: 1, enrs: List[ByteList[2048], 32](@[enr])))
|
|
else:
|
|
let distances = fn.distances.asSeq()
|
|
if distances.all(
|
|
proc(x: uint16): bool =
|
|
return x <= 256
|
|
):
|
|
let nodes = p.routingTable.neighboursAtDistances(distances, seenOnly = true)
|
|
|
|
# TODO: Total amount of messages is set fixed to 1 for now, else we would
|
|
# need to either move the send of the talkresp messages here, or allow for
|
|
# returning multiple messages.
|
|
# On the long run, it might just be better to use a stream in these cases?
|
|
# Size calculation is done to truncate the ENR results in order to not go
|
|
# over the discv5 packet size limits. ENRs are sorted so the closest nodes
|
|
# will still be passed.
|
|
const
|
|
nodesOverhead = 1 + 1 + 4 # msg id + total + container offset
|
|
maxPayloadSize = maxDiscv5TalkRespPayload - nodesOverhead
|
|
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
|
|
|
|
let enrs = truncateEnrs(nodes, maxPayloadSize, enrOverhead)
|
|
portal_nodes_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])
|
|
|
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
|
else:
|
|
# invalid request, send empty back
|
|
let enrs = List[ByteList[2048], 32](@[])
|
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
|
|
|
proc handleFindContent(
|
|
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId
|
|
): seq[byte] =
|
|
const
|
|
contentOverhead = 1 + 1 # msg id + SSZ Union selector
|
|
maxPayloadSize = maxDiscv5TalkRespPayload - contentOverhead
|
|
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
|
|
|
|
let contentId = p.toContentId(fc.contentKey).valueOr:
|
|
# Return empty response when content key validation fails
|
|
# TODO: Better would be to return no message at all? Needs changes on
|
|
# discv5 layer.
|
|
return @[]
|
|
|
|
let logDistance = p.logDistance(contentId, p.localNode.id)
|
|
portal_find_content_log_distance.observe(
|
|
int64(logDistance), labelValues = [$p.protocolId]
|
|
)
|
|
|
|
# Clear out the timed out connections and pending transfers
|
|
p.stream.pruneAllowedRequestConnections()
|
|
|
|
# Check first if content is in range, as this is a cheaper operation
|
|
if p.inRange(contentId) and p.stream.canAddPendingTransfer(srcId, contentId):
|
|
let contentResult = p.dbGet(fc.contentKey, contentId)
|
|
if contentResult.isOk():
|
|
let content = contentResult.get()
|
|
if content.len <= maxPayloadSize:
|
|
return encodeMessage(
|
|
ContentMessage(
|
|
contentMessageType: contentType, content: ByteList[2048](content)
|
|
)
|
|
)
|
|
else:
|
|
p.stream.addPendingTransfer(srcId, contentId)
|
|
let connectionId = p.stream.addContentRequest(srcId, contentId, content)
|
|
|
|
return encodeMessage(
|
|
ContentMessage(
|
|
contentMessageType: connectionIdType, connectionId: connectionId
|
|
)
|
|
)
|
|
|
|
# Node does not have the content, or content is not even in radius,
|
|
# send closest neighbours to the requested content id.
|
|
let
|
|
closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true)
|
|
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
|
|
portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])
|
|
|
|
encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs))
|
|
|
|
proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
|
|
# Early return when our contentQueue is full. This means there is a backlog
|
|
# of content to process and potentially gossip around. Don't accept more
|
|
# data in this case.
|
|
if p.stream.contentQueue.full():
|
|
return encodeMessage(
|
|
AcceptMessage(
|
|
connectionId: Bytes2([byte 0x00, 0x00]),
|
|
contentKeys: ContentKeysBitList.init(o.contentKeys.len),
|
|
)
|
|
)
|
|
|
|
# Clear out the timed out connections and pending transfers
|
|
p.stream.pruneAllowedOfferConnections()
|
|
|
|
var
|
|
contentKeysBitList = ContentKeysBitList.init(o.contentKeys.len)
|
|
contentKeys = ContentKeysList.init(@[])
|
|
contentIds = newSeq[ContentId]()
|
|
# TODO: Do we need some protection against a peer offering lots (64x) of
|
|
# content that fits our Radius but is actually bogus?
|
|
# Additional TODO, but more of a specification clarification: What if we don't
|
|
# want any of the content? Reply with empty bitlist and a connectionId of
|
|
# all zeroes but don't actually allow an uTP connection?
|
|
for i, contentKey in o.contentKeys:
|
|
let contentIdResult = p.toContentId(contentKey)
|
|
if contentIdResult.isOk():
|
|
let contentId = contentIdResult.get()
|
|
|
|
let logDistance = p.logDistance(contentId, p.localNode.id)
|
|
portal_offer_log_distance.observe(
|
|
int64(logDistance), labelValues = [$p.protocolId]
|
|
)
|
|
|
|
if p.inRange(contentId) and p.stream.canAddPendingTransfer(srcId, contentId) and
|
|
not p.dbContains(contentKey, contentId):
|
|
p.stream.addPendingTransfer(srcId, contentId)
|
|
contentKeysBitList.setBit(i)
|
|
discard contentKeys.add(contentKey)
|
|
contentIds.add(contentId)
|
|
else:
|
|
# Return empty response when content key validation fails
|
|
return @[]
|
|
|
|
let connectionId =
|
|
if contentKeysBitList.countOnes() != 0:
|
|
p.stream.addContentOffer(srcId, contentKeys, contentIds)
|
|
else:
|
|
# When the node does not accept any of the content offered, reply with an
|
|
# all zeroes bitlist and connectionId.
|
|
# Note: What to do in this scenario is not defined in the Portal spec.
|
|
Bytes2([byte 0x00, 0x00])
|
|
|
|
encodeMessage(
|
|
AcceptMessage(connectionId: connectionId, contentKeys: contentKeysBitList)
|
|
)
|
|
|
|
proc messageHandler(
|
|
protocol: TalkProtocol,
|
|
request: seq[byte],
|
|
srcId: NodeId,
|
|
srcUdpAddress: Address,
|
|
nodeOpt: Opt[Node],
|
|
): seq[byte] =
|
|
doAssert(protocol of PortalProtocol)
|
|
|
|
logScope:
|
|
protocolId = p.protocolId
|
|
|
|
let p = PortalProtocol(protocol)
|
|
|
|
let decoded = decodeMessage(request)
|
|
if decoded.isOk():
|
|
let message = decoded.get()
|
|
trace "Received message request", srcId, srcUdpAddress, kind = message.kind
|
|
# Received a proper Portal message, check first if an ENR is provided by
|
|
# the discovery v5 layer and add it to the portal network routing table.
|
|
# If not provided through the handshake, try to get it from the discovery v5
|
|
# routing table.
|
|
# When the node would be eligable for the portal network routing table, it
|
|
# is possible that it exists in the base discv5 routing table as the same
|
|
# node ids are used. It is not certain at all however as more nodes might
|
|
# exists on the base layer, and it will also depend on the distance,
|
|
# order of lookups, etc.
|
|
# Note: As third measure, could run a findNodes request with distance 0.
|
|
if nodeOpt.isSome():
|
|
let node = nodeOpt.value()
|
|
let status = p.addNode(node)
|
|
trace "Adding new node to routing table after incoming request", status, node
|
|
else:
|
|
let nodeOpt = p.baseProtocol.getNode(srcId)
|
|
if nodeOpt.isSome():
|
|
let node = nodeOpt.value()
|
|
let status = p.addNode(node)
|
|
trace "Adding new node to routing table after incoming request", status, node
|
|
|
|
portal_message_requests_incoming.inc(labelValues = [$p.protocolId, $message.kind])
|
|
|
|
case message.kind
|
|
of MessageKind.ping:
|
|
p.handlePing(message.ping, srcId)
|
|
of MessageKind.findNodes:
|
|
p.handleFindNodes(message.findNodes)
|
|
of MessageKind.findContent:
|
|
p.handleFindContent(message.findContent, srcId)
|
|
of MessageKind.offer:
|
|
p.handleOffer(message.offer, srcId)
|
|
else:
|
|
# This would mean a that Portal wire response message is being send over a
|
|
# discv5 talkreq message.
|
|
debug "Invalid Portal wire message type over talkreq", kind = message.kind
|
|
@[]
|
|
else:
|
|
portal_message_decoding_failures.inc(labelValues = [$p.protocolId])
|
|
debug "Packet decoding error", error = decoded.error, srcId, srcUdpAddress
|
|
@[]
|
|
|
|
proc new*(
|
|
T: type PortalProtocol,
|
|
baseProtocol: protocol.Protocol,
|
|
protocolId: PortalProtocolId,
|
|
toContentId: ToContentIdHandler,
|
|
dbGet: DbGetHandler,
|
|
dbPut: DbStoreHandler,
|
|
dbContains: DbContainsHandler,
|
|
dbRadius: DbRadiusHandler,
|
|
stream: PortalStream,
|
|
bootstrapRecords: openArray[Record] = [],
|
|
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
|
|
config: PortalProtocolConfig = defaultPortalProtocolConfig,
|
|
): T =
|
|
let proto = PortalProtocol(
|
|
protocolHandler: messageHandler,
|
|
protocolId: protocolId,
|
|
routingTable: RoutingTable.init(
|
|
baseProtocol.localNode, config.bitsPerHop, config.tableIpLimits, baseProtocol.rng,
|
|
distanceCalculator,
|
|
),
|
|
baseProtocol: baseProtocol,
|
|
toContentId: toContentId,
|
|
contentCache:
|
|
ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
|
|
dbGet: dbGet,
|
|
dbPut: dbPut,
|
|
dbContains: dbContains,
|
|
dataRadius: dbRadius,
|
|
bootstrapRecords: @bootstrapRecords,
|
|
stream: stream,
|
|
radiusCache: RadiusCache.init(256),
|
|
offerQueue: newAsyncQueue[OfferRequest](config.maxConcurrentOffers),
|
|
pingTimings: Table[NodeId, chronos.Moment](),
|
|
config: config,
|
|
)
|
|
|
|
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
|
|
"Only one protocol should have this id"
|
|
)
|
|
|
|
proto
|
|
|
|
# Sends the discv5 talkreq message with provided Portal message, awaits and
|
|
# validates the proper response, and updates the Portal Network routing table.
|
|
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
|
p: PortalProtocol, dst: Node, request: Request
|
|
): Future[PortalResult[Response]] {.async: (raises: [CancelledError]).} =
|
|
logScope:
|
|
protocolId = p.protocolId
|
|
|
|
trace "Send message request", dstId = dst.id, kind = messageKind(Request)
|
|
portal_message_requests_outgoing.inc(
|
|
labelValues = [$p.protocolId, $messageKind(Request)]
|
|
)
|
|
|
|
let talkresp =
|
|
await talkReq(p.baseProtocol, dst, @(p.protocolId), encodeMessage(request))
|
|
|
|
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
|
# not supporting the specific talk protocol, as according to specification
|
|
# an empty response needs to be send in that case.
|
|
# See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05
|
|
|
|
let messageResponse = talkresp
|
|
.mapErr(
|
|
proc(x: cstring): string =
|
|
$x
|
|
)
|
|
.flatMap(
|
|
proc(x: seq[byte]): Result[Message, string] =
|
|
decodeMessage(x)
|
|
)
|
|
.flatMap(
|
|
proc(m: Message): Result[Response, string] =
|
|
getInnerMessage[Response](m)
|
|
)
|
|
|
|
if messageResponse.isOk():
|
|
trace "Received message response",
|
|
srcId = dst.id, srcAddress = dst.address, kind = messageKind(Response)
|
|
portal_message_response_incoming.inc(
|
|
labelValues = [$p.protocolId, $messageKind(Response)]
|
|
)
|
|
|
|
p.routingTable.setJustSeen(dst)
|
|
else:
|
|
debug "Error receiving message response",
|
|
error = messageResponse.error, srcId = dst.id, srcAddress = dst.address
|
|
p.pingTimings.del(dst.id)
|
|
p.routingTable.replaceNode(dst)
|
|
|
|
return messageResponse
|
|
|
|
proc pingImpl*(
|
|
p: PortalProtocol, dst: Node
|
|
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
|
|
let customPayload = CustomPayload(dataRadius: p.dataRadius())
|
|
let ping = PingMessage(
|
|
enrSeq: p.localNode.record.seqNum,
|
|
customPayload: ByteList[2048](SSZ.encode(customPayload)),
|
|
)
|
|
|
|
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
|
|
|
proc findNodesImpl*(
|
|
p: PortalProtocol, dst: Node, distances: List[uint16, 256]
|
|
): Future[PortalResult[NodesMessage]] {.async: (raises: [CancelledError]).} =
|
|
let fn = FindNodesMessage(distances: distances)
|
|
|
|
# TODO Add nodes validation
|
|
return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn)
|
|
|
|
proc findContentImpl*(
|
|
p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList
|
|
): Future[PortalResult[ContentMessage]] {.async: (raises: [CancelledError]).} =
|
|
let fc = FindContentMessage(contentKey: contentKey)
|
|
|
|
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
|
|
|
|
proc offerImpl*(
|
|
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
|
|
): Future[PortalResult[AcceptMessage]] {.async: (raises: [CancelledError]).} =
|
|
let offer = OfferMessage(contentKeys: contentKeys)
|
|
|
|
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
|
|
|
proc recordsFromBytes*(
|
|
rawRecords: List[ByteList[2048], 32]
|
|
): PortalResult[seq[Record]] =
|
|
var records: seq[Record]
|
|
for r in rawRecords.asSeq():
|
|
let record = enr.Record.fromBytes(r.asSeq()).valueOr:
|
|
# If any of the ENRs is invalid, fail immediatly. This is similar as what
|
|
# is done on the discovery v5 layer.
|
|
return err("Deserialization of an ENR failed")
|
|
|
|
records.add(record)
|
|
|
|
ok(records)
|
|
|
|
proc ping*(
|
|
p: PortalProtocol, dst: Node
|
|
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
|
|
let pongResponse = await p.pingImpl(dst)
|
|
|
|
if pongResponse.isOk():
|
|
# Update last time we pinged this node
|
|
p.pingTimings[dst.id] = now(chronos.Moment)
|
|
|
|
let pong = pongResponse.get()
|
|
# TODO: This should become custom per Portal Network
|
|
let customPayloadDecoded =
|
|
try:
|
|
SSZ.decode(pong.customPayload.asSeq(), CustomPayload)
|
|
except SerializationError:
|
|
# invalid custom payload
|
|
return err("Pong message contains invalid custom payload")
|
|
|
|
p.radiusCache.put(dst.id, customPayloadDecoded.dataRadius)
|
|
|
|
return pongResponse
|
|
|
|
proc findNodes*(
|
|
p: PortalProtocol, dst: Node, distances: seq[uint16]
|
|
): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
|
|
let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances))
|
|
if nodesMessage.isOk():
|
|
let records = recordsFromBytes(nodesMessage.get().enrs)
|
|
if records.isOk():
|
|
# TODO: distance function is wrong here for state, fix + tests
|
|
return ok(verifyNodesRecords(records.get(), dst, enrsResultLimit, distances))
|
|
else:
|
|
return err(records.error)
|
|
else:
|
|
return err(nodesMessage.error)
|
|
|
|
proc findContent*(
|
|
p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList
|
|
): Future[PortalResult[FoundContent]] {.async: (raises: [CancelledError]).} =
|
|
logScope:
|
|
node = dst
|
|
contentKey
|
|
|
|
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
|
|
|
|
if contentMessageResponse.isOk():
|
|
let m = contentMessageResponse.get()
|
|
case m.contentMessageType
|
|
of connectionIdType:
|
|
let nodeAddress = NodeAddress.init(dst).valueOr:
|
|
# This should not happen as it comes a after succesfull talkreq/talkresp
|
|
return err("Trying to connect to node with unknown address: " & $dst.id)
|
|
|
|
let socket =
|
|
?(
|
|
await p.stream.connectTo(
|
|
# uTP protocol uses BE for all values in the header, incl. connection id
|
|
nodeAddress,
|
|
uint16.fromBytesBE(m.connectionId),
|
|
)
|
|
)
|
|
|
|
try:
|
|
# Read all bytes from the socket
|
|
# This will either end with a FIN, or because the read action times out.
|
|
# A FIN does not necessarily mean that the data read is complete.
|
|
# Further validation is required, using a length prefix here might be
|
|
# beneficial for this.
|
|
let readFut = socket.read()
|
|
|
|
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
|
|
debug "Socket read cancelled", socketKey = socket.socketKey
|
|
# In case this `findContent` gets cancelled while reading the data,
|
|
# send a FIN and clean up the socket.
|
|
socket.close()
|
|
|
|
if await readFut.withTimeout(p.stream.contentReadTimeout):
|
|
let content = await readFut
|
|
# socket received remote FIN and drained whole buffer, it can be
|
|
# safely destroyed without notifing remote
|
|
debug "Socket read fully", socketKey = socket.socketKey
|
|
socket.destroy()
|
|
return ok(
|
|
FoundContent(src: dst, kind: Content, content: content, utpTransfer: true)
|
|
)
|
|
else:
|
|
debug "Socket read time-out", socketKey = socket.socketKey
|
|
# Note: This might look a bit strange, but not doing a socket.close()
|
|
# here as this is already done internally. utp_socket `checkTimeouts`
|
|
# already does a socket.destroy() on timeout. Might want to change the
|
|
# API on this later though.
|
|
return err("Reading data from socket timed out, content request failed")
|
|
except CancelledError as exc:
|
|
# even though we already installed cancelCallback on readFut, it is worth
|
|
# catching CancelledError in case that withTimeout throws CancelledError
|
|
# but readFut have already finished.
|
|
debug "Socket read cancelled", socketKey = socket.socketKey
|
|
|
|
socket.close()
|
|
raise exc
|
|
of contentType:
|
|
return ok(
|
|
FoundContent(
|
|
src: dst, kind: Content, content: m.content.asSeq(), utpTransfer: false
|
|
)
|
|
)
|
|
of enrsType:
|
|
let records = recordsFromBytes(m.enrs)
|
|
if records.isOk():
|
|
let verifiedNodes = verifyNodesRecords(records.get(), dst, enrsResultLimit)
|
|
|
|
return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
|
|
else:
|
|
return err("Content message returned invalid ENRs")
|
|
else:
|
|
debug "FindContent failed due to find content request failure ",
|
|
error = contentMessageResponse.error
|
|
|
|
return err("No content response")
|
|
|
|
proc getContentKeys(o: OfferRequest): ContentKeysList =
|
|
case o.kind
|
|
of Direct:
|
|
var contentKeys: ContentKeysList
|
|
for info in o.contentList:
|
|
discard contentKeys.add(info.contentKey)
|
|
return contentKeys
|
|
of Database:
|
|
return o.contentKeys
|
|
|
|
func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int =
|
|
## Calculates how many ContentKeys will fit in one offer message which
|
|
## will be small enouch to fit into discv5 limit.
|
|
## This is neccesarry as contentKeysLimit (64) is sometimes to big, and even
|
|
## half of this can be too much to fit into discv5 limits.
|
|
|
|
let maxTalkReqPayload = maxDiscv5PacketSize - getTalkReqOverhead(int(protocolIdLen))
|
|
# To calculate how much bytes, `n` content keys of size `maxKeySize` will take
|
|
# we can use following equation:
|
|
# bytes = (n * (maxKeySize + perContentKeyOverhead)) + offerMessageOverhead
|
|
# to calculate maximal number of keys which will given space this can be
|
|
# transformed to:
|
|
# n = trunc((bytes - offerMessageOverhead) / (maxKeySize + perContentKeyOverhead))
|
|
return ((maxTalkReqPayload - 5) div (int(maxKeySize) + 4))
|
|
|
|
proc offer(
|
|
p: PortalProtocol, o: OfferRequest
|
|
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
|
|
## Offer triggers offer-accept interaction with one peer
|
|
## Whole flow has two phases:
|
|
## 1. Come to an agreement on what content to transfer, by using offer and
|
|
## accept messages.
|
|
## 2. Open uTP stream from content provider to content receiver and transfer
|
|
## agreed content.
|
|
## There are two types of possible offer requests:
|
|
## Direct - when caller provides content to transfer. This way, content is
|
|
## guaranteed to be transferred as it stays in memory until whole transfer
|
|
## is completed.
|
|
## Database - when caller provides keys of content to be transferred. This
|
|
## way content is provided from database just before it is transferred through
|
|
## uTP socket. This is useful when there is a lot of content to be transferred
|
|
## to many peers, and keeping it all in memory could exhaust node resources.
|
|
## Main drawback is that content may be deleted from the node database
|
|
## by the cleanup process before it will be transferred, so this way does not
|
|
## guarantee content transfer.
|
|
let contentKeys = getContentKeys(o)
|
|
|
|
logScope:
|
|
node = o.dst
|
|
contentKeys
|
|
|
|
debug "Offering content"
|
|
|
|
portal_content_keys_offered.observe(
|
|
contentKeys.len().int64, labelValues = [$p.protocolId]
|
|
)
|
|
|
|
let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys)
|
|
|
|
if acceptMessageResponse.isOk():
|
|
let m = acceptMessageResponse.get()
|
|
|
|
let contentKeysLen =
|
|
case o.kind
|
|
of Direct:
|
|
o.contentList.len()
|
|
of Database:
|
|
o.contentKeys.len()
|
|
|
|
if m.contentKeys.len() != contentKeysLen:
|
|
# TODO:
|
|
# When there is such system, the peer should get scored negatively here.
|
|
error "Accepted content key bitlist has invalid size"
|
|
return err("Accepted content key bitlist has invalid size")
|
|
|
|
let acceptedKeysAmount = m.contentKeys.countOnes()
|
|
portal_content_keys_accepted.observe(
|
|
acceptedKeysAmount.int64, labelValues = [$p.protocolId]
|
|
)
|
|
if acceptedKeysAmount == 0:
|
|
debug "No content accepted"
|
|
# Don't open an uTP stream if no content was requested
|
|
return ok(m.contentKeys)
|
|
|
|
let nodeAddress = NodeAddress.init(o.dst).valueOr:
|
|
# This should not happen as it comes a after succesfull talkreq/talkresp
|
|
return err("Trying to connect to node with unknown address: " & $o.dst.id)
|
|
|
|
let socket =
|
|
?(await p.stream.connectTo(nodeAddress, uint16.fromBytesBE(m.connectionId)))
|
|
|
|
template lenu32(x: untyped): untyped =
|
|
uint32(len(x))
|
|
|
|
case o.kind
|
|
of Direct:
|
|
for i, b in m.contentKeys:
|
|
if b:
|
|
let content = o.contentList[i].content
|
|
var output = memoryOutput()
|
|
try:
|
|
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
|
|
output.write(content)
|
|
except IOError as e:
|
|
# This should not happen in case of in-memory streams
|
|
raiseAssert e.msg
|
|
|
|
let dataWritten = (await socket.write(output.getOutput)).valueOr:
|
|
debug "Error writing requested data", error
|
|
# No point in trying to continue writing data
|
|
socket.close()
|
|
return err("Error writing requested data")
|
|
|
|
trace "Offered content item send", dataWritten = dataWritten
|
|
of Database:
|
|
for i, b in m.contentKeys:
|
|
if b:
|
|
let
|
|
contentKey = o.contentKeys[i]
|
|
contentIdResult = p.toContentId(contentKey)
|
|
if contentIdResult.isOk():
|
|
let
|
|
contentId = contentIdResult.get()
|
|
contentResult = p.dbGet(contentKey, contentId)
|
|
|
|
var output = memoryOutput()
|
|
if contentResult.isOk():
|
|
let content = contentResult.get()
|
|
try:
|
|
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
|
|
output.write(content)
|
|
except IOError as e:
|
|
# This should not happen in case of in-memory streams
|
|
raiseAssert e.msg
|
|
else:
|
|
try:
|
|
# When data turns out missing, add a 0 size varint
|
|
output.write(toBytes(0'u8, Leb128).toOpenArray())
|
|
except IOError as e:
|
|
raiseAssert e.msg
|
|
|
|
let dataWritten = (await socket.write(output.getOutput)).valueOr:
|
|
debug "Error writing requested data", error
|
|
# No point in trying to continue writing data
|
|
socket.close()
|
|
return err("Error writing requested data")
|
|
|
|
trace "Offered content item send", dataWritten = dataWritten
|
|
await socket.closeWait()
|
|
debug "Content successfully offered"
|
|
|
|
return ok(m.contentKeys)
|
|
else:
|
|
debug "Offer failed due to accept request failure ",
|
|
error = acceptMessageResponse.error
|
|
return err("No or invalid accept response: " & acceptMessageResponse.error)
|
|
|
|
proc offer*(
|
|
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
|
|
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
|
|
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
|
|
return await p.offer(req)
|
|
|
|
proc offer*(
|
|
p: PortalProtocol, dst: Node, content: seq[ContentKV]
|
|
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
|
|
if len(content) > contentKeysLimit:
|
|
return err("Cannot offer more than 64 content items")
|
|
if len(content) == 0:
|
|
return err("Cannot offer empty content list")
|
|
|
|
let contentList = List[ContentKV, contentKeysLimit].init(content)
|
|
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
|
|
return await p.offer(req)
|
|
|
|
proc offerWorker(p: PortalProtocol) {.async: (raises: [CancelledError]).} =
|
|
while true:
|
|
let req = await p.offerQueue.popFirst()
|
|
|
|
let res = await p.offer(req)
|
|
if res.isOk():
|
|
portal_gossip_offers_successful.inc(labelValues = [$p.protocolId])
|
|
else:
|
|
portal_gossip_offers_failed.inc(labelValues = [$p.protocolId])
|
|
|
|
proc offerQueueEmpty*(p: PortalProtocol): bool =
|
|
p.offerQueue.empty()
|
|
|
|
proc lookupWorker(
|
|
p: PortalProtocol, dst: Node, target: NodeId
|
|
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
|
let distances = lookupDistances(target, dst.id)
|
|
let nodesMessage = await p.findNodes(dst, distances)
|
|
if nodesMessage.isOk():
|
|
let nodes = nodesMessage.get()
|
|
# Attempt to add all nodes discovered
|
|
for n in nodes:
|
|
discard p.addNode(n)
|
|
return nodes
|
|
else:
|
|
return @[]
|
|
|
|
proc lookup*(
|
|
p: PortalProtocol, target: NodeId
|
|
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
|
## Perform a lookup for the given target, return the closest n nodes to the
|
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
|
# Unvalidated nodes are used for requests as a form of validation.
|
|
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
|
|
|
|
var asked, seen = HashSet[NodeId]()
|
|
asked.incl(p.localNode.id) # No need to ask our own node
|
|
seen.incl(p.localNode.id) # No need to discover our own node
|
|
for node in closestNodes:
|
|
seen.incl(node.id)
|
|
|
|
var pendingQueries =
|
|
newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](p.config.alpha)
|
|
var requestAmount = 0'i64
|
|
|
|
while true:
|
|
var i = 0
|
|
# Doing `p.config.alpha` amount of requests at once as long as closer non queried
|
|
# nodes are discovered.
|
|
while i < closestNodes.len and pendingQueries.len < p.config.alpha:
|
|
let n = closestNodes[i]
|
|
if not asked.containsOrIncl(n.id):
|
|
pendingQueries.add(p.lookupWorker(n, target))
|
|
requestAmount.inc()
|
|
inc i
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
if pendingQueries.len == 0:
|
|
break
|
|
|
|
let query =
|
|
try:
|
|
await one(pendingQueries)
|
|
except ValueError:
|
|
raiseAssert("pendingQueries should not have been empty")
|
|
|
|
trace "Got lookup query response"
|
|
|
|
let index = pendingQueries.find(query)
|
|
if index != -1:
|
|
pendingQueries.del(index)
|
|
else:
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
let nodes = await query
|
|
# TODO: Remove node on timed-out query?
|
|
for n in nodes:
|
|
if not seen.containsOrIncl(n.id):
|
|
# If it wasn't seen before, insert node while remaining sorted
|
|
closestNodes.insert(
|
|
n,
|
|
closestNodes.lowerBound(
|
|
n,
|
|
proc(x: Node, n: Node): int =
|
|
cmp(p.distance(x.id, target), p.distance(n.id, target)),
|
|
),
|
|
)
|
|
|
|
if closestNodes.len > BUCKET_SIZE:
|
|
closestNodes.del(closestNodes.high())
|
|
|
|
portal_lookup_node_requests.observe(requestAmount, labelValues = [$p.protocolId])
|
|
p.lastLookup = now(chronos.Moment)
|
|
return closestNodes
|
|
|
|
proc triggerPoke*(
|
|
p: PortalProtocol,
|
|
nodes: seq[Node],
|
|
contentKey: ContentKeyByteList,
|
|
content: seq[byte],
|
|
) =
|
|
## In order to properly test gossip mechanisms (e.g. in Portal Hive),
|
|
## we need the option to turn off the POKE functionality as it influences
|
|
## how data moves around the network.
|
|
if p.config.disablePoke:
|
|
return
|
|
## Triggers asynchronous offer-accept interaction to provided nodes.
|
|
## Provided content should be in range of provided nodes.
|
|
for node in nodes:
|
|
if not p.offerQueue.full():
|
|
try:
|
|
let
|
|
contentKV = ContentKV(contentKey: contentKey, content: content)
|
|
list = List[ContentKV, contentKeysLimit].init(@[contentKV])
|
|
req = OfferRequest(dst: node, kind: Direct, contentList: list)
|
|
p.offerQueue.putNoWait(req)
|
|
portal_poke_offers.inc(labelValues = [$p.protocolId])
|
|
except AsyncQueueFullError as e:
|
|
# Should not occur as full() check is done.
|
|
raiseAssert(e.msg)
|
|
else:
|
|
# Offer queue is full, do not start more offer-accept interactions
|
|
return
|
|
|
|
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
|
# networks will probably be very similar. Extract lookup function to separate module
|
|
# and make it more generaic
|
|
proc contentLookup*(
|
|
p: PortalProtocol, target: ContentKeyByteList, targetId: UInt256
|
|
): Future[Opt[ContentLookupResult]] {.async: (raises: [CancelledError]).} =
|
|
## Perform a lookup for the given target, return the closest n nodes to the
|
|
## target.
|
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
|
# Unvalidated nodes are used for requests as a form of validation.
|
|
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
|
|
|
# Shuffling the order of the nodes in order to not always hit the same node
|
|
# first for the same request.
|
|
p.baseProtocol.rng[].shuffle(closestNodes)
|
|
|
|
# Sort closestNodes so that nodes that are in range of the target content
|
|
# are queried first
|
|
proc nodesCmp(x, y: Node): int =
|
|
let
|
|
xRadius = p.radiusCache.get(x.id)
|
|
yRadius = p.radiusCache.get(y.id)
|
|
|
|
if xRadius.isSome() and p.inRange(x.id, xRadius.unsafeGet(), targetId):
|
|
-1
|
|
elif yRadius.isSome() and p.inRange(y.id, yRadius.unsafeGet(), targetId):
|
|
1
|
|
else:
|
|
0
|
|
|
|
closestNodes.sort(nodesCmp)
|
|
|
|
var asked, seen = HashSet[NodeId]()
|
|
asked.incl(p.localNode.id) # No need to ask our own node
|
|
seen.incl(p.localNode.id) # No need to discover our own node
|
|
for node in closestNodes:
|
|
seen.incl(node.id)
|
|
|
|
var pendingQueries = newSeqOfCap[
|
|
Future[PortalResult[FoundContent]].Raising([CancelledError])
|
|
](p.config.alpha)
|
|
var requestAmount = 0'i64
|
|
|
|
var nodesWithoutContent: seq[Node] = newSeq[Node]()
|
|
|
|
while true:
|
|
var i = 0
|
|
# Doing `p.config.alpha` amount of requests at once as long as closer non queried
|
|
# nodes are discovered.
|
|
while i < closestNodes.len and pendingQueries.len < p.config.alpha:
|
|
let n = closestNodes[i]
|
|
if not asked.containsOrIncl(n.id):
|
|
pendingQueries.add(p.findContent(n, target))
|
|
requestAmount.inc()
|
|
inc i
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
if pendingQueries.len == 0:
|
|
break
|
|
|
|
let query =
|
|
try:
|
|
await one(pendingQueries)
|
|
except ValueError:
|
|
raiseAssert("pendingQueries should not have been empty")
|
|
|
|
trace "Got lookup query response"
|
|
|
|
let index = pendingQueries.find(query)
|
|
if index != -1:
|
|
pendingQueries.del(index)
|
|
else:
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
let contentResult = await query
|
|
if contentResult.isOk():
|
|
let content = contentResult.get()
|
|
|
|
case content.kind
|
|
of Nodes:
|
|
let maybeRadius = p.radiusCache.get(content.src.id)
|
|
if maybeRadius.isSome() and
|
|
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
|
|
# Only return nodes which may be interested in content.
|
|
# No need to check for duplicates in nodesWithoutContent
|
|
# as requests are never made two times to the same node.
|
|
nodesWithoutContent.add(content.src)
|
|
|
|
for n in content.nodes:
|
|
if not seen.containsOrIncl(n.id):
|
|
discard p.addNode(n)
|
|
# If it wasn't seen before, insert node while remaining sorted
|
|
closestNodes.insert(
|
|
n,
|
|
closestNodes.lowerBound(
|
|
n,
|
|
proc(x: Node, n: Node): int =
|
|
cmp(p.distance(x.id, targetId), p.distance(n.id, targetId)),
|
|
),
|
|
)
|
|
|
|
if closestNodes.len > BUCKET_SIZE:
|
|
closestNodes.del(closestNodes.high())
|
|
of Content:
|
|
# cancel any pending queries as the content has been found
|
|
for f in pendingQueries:
|
|
f.cancelSoon()
|
|
portal_lookup_content_requests.observe(
|
|
requestAmount, labelValues = [$p.protocolId]
|
|
)
|
|
return Opt.some(
|
|
ContentLookupResult.init(
|
|
content.content, content.utpTransfer, nodesWithoutContent
|
|
)
|
|
)
|
|
else:
|
|
# Note: Not doing any retries here as retries can/should be done on a
|
|
# higher layer. However, depending on the failure we could attempt a retry,
|
|
# e.g. on uTP specific errors.
|
|
discard
|
|
|
|
portal_lookup_content_failures.inc(labelValues = [$p.protocolId])
|
|
return Opt.none(ContentLookupResult)
|
|
|
|
proc traceContentLookup*(
|
|
p: PortalProtocol, target: ContentKeyByteList, targetId: UInt256
|
|
): Future[TraceContentLookupResult] {.async: (raises: [CancelledError]).} =
|
|
## Perform a lookup for the given target, return the closest n nodes to the
|
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
|
# Unvalidated nodes are used for requests as a form of validation.
|
|
let startedAt = Moment.now()
|
|
# Need to use a system clock and not the mono clock for this.
|
|
let startedAtMs = int64(times.epochTime() * 1000)
|
|
|
|
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
|
# Shuffling the order of the nodes in order to not always hit the same node
|
|
# first for the same request.
|
|
p.baseProtocol.rng[].shuffle(closestNodes)
|
|
|
|
# Sort closestNodes so that nodes that are in range of the target content
|
|
# are queried first
|
|
proc nodesCmp(x, y: Node): int =
|
|
let
|
|
xRadius = p.radiusCache.get(x.id)
|
|
yRadius = p.radiusCache.get(y.id)
|
|
|
|
if xRadius.isSome() and p.inRange(x.id, xRadius.unsafeGet(), targetId):
|
|
-1
|
|
elif yRadius.isSome() and p.inRange(y.id, yRadius.unsafeGet(), targetId):
|
|
1
|
|
else:
|
|
0
|
|
|
|
closestNodes.sort(nodesCmp)
|
|
|
|
var asked, seen = HashSet[NodeId]()
|
|
asked.incl(p.localNode.id) # No need to ask our own node
|
|
seen.incl(p.localNode.id) # No need to discover our own node
|
|
for node in closestNodes:
|
|
seen.incl(node.id)
|
|
|
|
# Trace data
|
|
var responses = Table[string, TraceResponse]()
|
|
var metadata = Table[string, NodeMetadata]()
|
|
# Local node should be part of the responses
|
|
responses["0x" & $p.localNode.id] =
|
|
TraceResponse(durationMs: 0, respondedWith: seen.toSeq())
|
|
metadata["0x" & $p.localNode.id] = NodeMetadata(
|
|
enr: p.localNode.record, distance: p.distance(p.localNode.id, targetId)
|
|
)
|
|
# And metadata for all the nodes local node closestNodes
|
|
for node in closestNodes:
|
|
metadata["0x" & $node.id] =
|
|
NodeMetadata(enr: node.record, distance: p.distance(node.id, targetId))
|
|
|
|
var pendingQueries = newSeqOfCap[
|
|
Future[PortalResult[FoundContent]].Raising([CancelledError])
|
|
](p.config.alpha)
|
|
var pendingNodes = newSeq[Node]()
|
|
var requestAmount = 0'i64
|
|
|
|
var nodesWithoutContent: seq[Node] = newSeq[Node]()
|
|
|
|
while true:
|
|
var i = 0
|
|
# Doing `p.config.alpha` amount of requests at once as long as closer non queried
|
|
# nodes are discovered.
|
|
while i < closestNodes.len and pendingQueries.len < p.config.alpha:
|
|
let n = closestNodes[i]
|
|
if not asked.containsOrIncl(n.id):
|
|
pendingQueries.add(p.findContent(n, target))
|
|
pendingNodes.add(n)
|
|
requestAmount.inc()
|
|
inc i
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
if pendingQueries.len == 0:
|
|
break
|
|
|
|
let query =
|
|
try:
|
|
await one(pendingQueries)
|
|
except ValueError:
|
|
raiseAssert("pendingQueries should not have been empty")
|
|
trace "Got lookup query response"
|
|
|
|
let index = pendingQueries.find(query)
|
|
if index != -1:
|
|
pendingQueries.del(index)
|
|
pendingNodes.del(index)
|
|
else:
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
let contentResult = await query
|
|
|
|
if contentResult.isOk():
|
|
let content = contentResult.get()
|
|
|
|
case content.kind
|
|
of Nodes:
|
|
let duration = chronos.milliseconds(Moment.now() - startedAt)
|
|
|
|
let maybeRadius = p.radiusCache.get(content.src.id)
|
|
if maybeRadius.isSome() and
|
|
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
|
|
# Only return nodes which may be interested in content.
|
|
# No need to check for duplicates in nodesWithoutContent
|
|
# as requests are never made two times to the same node.
|
|
nodesWithoutContent.add(content.src)
|
|
|
|
var respondedWith = newSeq[NodeId]()
|
|
|
|
for n in content.nodes:
|
|
let dist = p.distance(n.id, targetId)
|
|
|
|
metadata["0x" & $n.id] = NodeMetadata(enr: n.record, distance: dist)
|
|
respondedWith.add(n.id)
|
|
|
|
if not seen.containsOrIncl(n.id):
|
|
discard p.addNode(n)
|
|
# If it wasn't seen before, insert node while remaining sorted
|
|
closestNodes.insert(
|
|
n,
|
|
closestNodes.lowerBound(
|
|
n,
|
|
proc(x: Node, n: Node): int =
|
|
cmp(p.distance(x.id, targetId), dist),
|
|
),
|
|
)
|
|
|
|
if closestNodes.len > BUCKET_SIZE:
|
|
closestNodes.del(closestNodes.high())
|
|
|
|
let distance = p.distance(content.src.id, targetId)
|
|
|
|
responses["0x" & $content.src.id] =
|
|
TraceResponse(durationMs: duration, respondedWith: respondedWith)
|
|
|
|
metadata["0x" & $content.src.id] =
|
|
NodeMetadata(enr: content.src.record, distance: distance)
|
|
of Content:
|
|
let duration = chronos.milliseconds(Moment.now() - startedAt)
|
|
|
|
# cancel any pending queries as the content has been found
|
|
for f in pendingQueries:
|
|
f.cancelSoon()
|
|
portal_lookup_content_requests.observe(
|
|
requestAmount, labelValues = [$p.protocolId]
|
|
)
|
|
|
|
let distance = p.distance(content.src.id, targetId)
|
|
|
|
responses["0x" & $content.src.id] =
|
|
TraceResponse(durationMs: duration, respondedWith: newSeq[NodeId]())
|
|
|
|
metadata["0x" & $content.src.id] =
|
|
NodeMetadata(enr: content.src.record, distance: distance)
|
|
|
|
var pendingNodeIds = newSeq[NodeId]()
|
|
|
|
for pn in pendingNodes:
|
|
pendingNodeIds.add(pn.id)
|
|
metadata["0x" & $pn.id] =
|
|
NodeMetadata(enr: pn.record, distance: p.distance(pn.id, targetId))
|
|
|
|
return TraceContentLookupResult(
|
|
content: Opt.some(content.content),
|
|
utpTransfer: content.utpTransfer,
|
|
trace: TraceObject(
|
|
origin: p.localNode.id,
|
|
targetId: targetId,
|
|
receivedFrom: Opt.some(content.src.id),
|
|
responses: responses,
|
|
metadata: metadata,
|
|
cancelled: pendingNodeIds,
|
|
startedAtMs: startedAtMs,
|
|
),
|
|
)
|
|
else:
|
|
# Note: Not doing any retries here as retries can/should be done on a
|
|
# higher layer. However, depending on the failure we could attempt a retry,
|
|
# e.g. on uTP specific errors.
|
|
# TODO: Ideally we get an empty response added to the responses table
|
|
# and the metadata for the node that failed to respond. In the current
|
|
# implementation there is no access to the node information however.
|
|
discard
|
|
|
|
portal_lookup_content_failures.inc(labelValues = [$p.protocolId])
|
|
return TraceContentLookupResult(
|
|
content: Opt.none(seq[byte]),
|
|
utpTransfer: false,
|
|
trace: TraceObject(
|
|
origin: p.localNode.id,
|
|
targetId: targetId,
|
|
receivedFrom: Opt.none(NodeId),
|
|
responses: responses,
|
|
metadata: metadata,
|
|
cancelled: newSeq[NodeId](),
|
|
startedAtMs: startedAtMs,
|
|
),
|
|
)
|
|
|
|
proc query*(
|
|
p: PortalProtocol, target: NodeId, k = BUCKET_SIZE
|
|
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
|
## Query k nodes for the given target, returns all nodes found, including the
|
|
## nodes queried.
|
|
##
|
|
## This will take k nodes from the routing table closest to target and
|
|
## query them for nodes closest to target. If there are less than k nodes in
|
|
## the routing table, nodes returned by the first queries will be used.
|
|
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
|
|
|
|
var asked, seen = HashSet[NodeId]()
|
|
asked.incl(p.localNode.id) # No need to ask our own node
|
|
seen.incl(p.localNode.id) # No need to discover our own node
|
|
for node in queryBuffer:
|
|
seen.incl(node.id)
|
|
|
|
var pendingQueries =
|
|
newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](p.config.alpha)
|
|
|
|
while true:
|
|
var i = 0
|
|
while i < min(queryBuffer.len, k) and pendingQueries.len < p.config.alpha:
|
|
let n = queryBuffer[i]
|
|
if not asked.containsOrIncl(n.id):
|
|
pendingQueries.add(p.lookupWorker(n, target))
|
|
inc i
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
if pendingQueries.len == 0:
|
|
break
|
|
|
|
let query =
|
|
try:
|
|
await one(pendingQueries)
|
|
except ValueError:
|
|
raiseAssert("pendingQueries should not have been empty")
|
|
trace "Got lookup query response"
|
|
|
|
let index = pendingQueries.find(query)
|
|
if index != -1:
|
|
pendingQueries.del(index)
|
|
else:
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
let nodes = await query
|
|
# TODO: Remove node on timed-out query?
|
|
for n in nodes:
|
|
if not seen.containsOrIncl(n.id):
|
|
queryBuffer.add(n)
|
|
|
|
p.lastLookup = now(chronos.Moment)
|
|
return queryBuffer
|
|
|
|
proc queryRandom*(
|
|
p: PortalProtocol
|
|
): Future[seq[Node]] {.async: (raw: true, raises: [CancelledError]).} =
|
|
## Perform a query for a random target, return all nodes discovered.
|
|
p.query(NodeId.random(p.baseProtocol.rng[]))
|
|
|
|
proc getNClosestNodesWithRadius*(
|
|
p: PortalProtocol, targetId: NodeId, n: int, seenOnly: bool = false
|
|
): seq[(Node, UInt256)] =
|
|
let closestLocalNodes =
|
|
p.routingTable.neighbours(targetId, k = n, seenOnly = seenOnly)
|
|
|
|
var nodesWithRadiuses: seq[(Node, UInt256)]
|
|
for node in closestLocalNodes:
|
|
let radius = p.radiusCache.get(node.id)
|
|
if radius.isSome():
|
|
nodesWithRadiuses.add((node, radius.unsafeGet()))
|
|
return nodesWithRadiuses
|
|
|
|
proc neighborhoodGossip*(
|
|
p: PortalProtocol,
|
|
srcNodeId: Opt[NodeId],
|
|
contentKeys: ContentKeysList,
|
|
content: seq[seq[byte]],
|
|
): Future[int] {.async: (raises: [CancelledError]).} =
|
|
## Run neighborhood gossip for provided content.
|
|
## Returns the number of peers to which content was attempted to be gossiped.
|
|
if content.len() == 0:
|
|
return 0
|
|
|
|
var contentList = List[ContentKV, contentKeysLimit].init(@[])
|
|
for i, contentItem in content:
|
|
let contentKV = ContentKV(contentKey: contentKeys[i], content: contentItem)
|
|
discard contentList.add(contentKV)
|
|
|
|
# Just taking the first content item as target id.
|
|
# TODO: come up with something better?
|
|
let contentId = p.toContentId(contentList[0].contentKey).valueOr:
|
|
return 0
|
|
|
|
# For selecting the closest nodes to whom to gossip the content a mixed
|
|
# approach is taken:
|
|
# 1. Select the closest neighbours in the routing table
|
|
# 2. Check if the radius is known for these these nodes and whether they are
|
|
# in range of the content to be offered.
|
|
# 3. If more than n (= maxGossipNodes) nodes are in range, offer these nodes
|
|
# the content (maxed out at n).
|
|
# 4. If less than n nodes are in range, do a node lookup, and offer the nodes
|
|
# returned from the lookup the content (max nodes set at 8)
|
|
#
|
|
# This should give a bigger rate of success and avoid the data being stopped
|
|
# in its propagation than when looking only for nodes in the own routing
|
|
# table, but at the same time avoid unnecessary node lookups.
|
|
# It might still cause issues in data getting propagated in a wider id range.
|
|
|
|
let closestLocalNodes =
|
|
p.routingTable.neighbours(NodeId(contentId), k = 16, seenOnly = true)
|
|
|
|
var gossipNodes: seq[Node]
|
|
for node in closestLocalNodes:
|
|
let radius = p.radiusCache.get(node.id)
|
|
if radius.isSome():
|
|
if p.inRange(node.id, radius.unsafeGet(), contentId):
|
|
if srcNodeId.isNone:
|
|
gossipNodes.add(node)
|
|
elif node.id != srcNodeId.get():
|
|
gossipNodes.add(node)
|
|
|
|
if gossipNodes.len >= p.config.maxGossipNodes: # use local nodes for gossip
|
|
portal_gossip_without_lookup.inc(labelValues = [$p.protocolId])
|
|
let numberOfGossipedNodes = min(gossipNodes.len, p.config.maxGossipNodes)
|
|
for node in gossipNodes[0 ..< numberOfGossipedNodes]:
|
|
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
|
await p.offerQueue.addLast(req)
|
|
return numberOfGossipedNodes
|
|
else: # use looked up nodes for gossip
|
|
portal_gossip_with_lookup.inc(labelValues = [$p.protocolId])
|
|
let closestNodes = await p.lookup(NodeId(contentId))
|
|
let numberOfGossipedNodes = min(closestNodes.len, p.config.maxGossipNodes)
|
|
for node in closestNodes[0 ..< numberOfGossipedNodes]:
|
|
# Note: opportunistically not checking if the radius of the node is known
|
|
# and thus if the node is in radius with the content. Reason is, these
|
|
# should really be the closest nodes in the DHT, and thus are most likely
|
|
# going to be in range of the requested content.
|
|
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
|
await p.offerQueue.addLast(req)
|
|
return numberOfGossipedNodes
|
|
|
|
proc neighborhoodGossipDiscardPeers*(
|
|
p: PortalProtocol,
|
|
srcNodeId: Opt[NodeId],
|
|
contentKeys: ContentKeysList,
|
|
content: seq[seq[byte]],
|
|
): Future[void] {.async: (raises: [CancelledError]).} =
|
|
discard await p.neighborhoodGossip(srcNodeId, contentKeys, content)
|
|
|
|
proc randomGossip*(
|
|
p: PortalProtocol,
|
|
srcNodeId: Opt[NodeId],
|
|
contentKeys: ContentKeysList,
|
|
content: seq[seq[byte]],
|
|
): Future[int] {.async: (raises: [CancelledError]).} =
|
|
## Run random gossip for provided content.
|
|
## Returns the number of peers to which content was attempted to be gossiped.
|
|
if content.len() == 0:
|
|
return 0
|
|
|
|
var contentList = List[ContentKV, contentKeysLimit].init(@[])
|
|
for i, contentItem in content:
|
|
let contentKV = ContentKV(contentKey: contentKeys[i], content: contentItem)
|
|
discard contentList.add(contentKV)
|
|
|
|
let nodes = p.routingTable.randomNodes(p.config.maxGossipNodes)
|
|
|
|
for node in nodes[0 ..< nodes.len()]:
|
|
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
|
await p.offerQueue.addLast(req)
|
|
return nodes.len()
|
|
|
|
proc randomGossipDiscardPeers*(
|
|
p: PortalProtocol,
|
|
srcNodeId: Opt[NodeId],
|
|
contentKeys: ContentKeysList,
|
|
content: seq[seq[byte]],
|
|
): Future[void] {.async: (raises: [CancelledError]).} =
|
|
discard await p.randomGossip(srcNodeId, contentKeys, content)
|
|
|
|
proc storeContent*(
|
|
p: PortalProtocol,
|
|
contentKey: ContentKeyByteList,
|
|
contentId: ContentId,
|
|
content: seq[byte],
|
|
cacheContent = false,
|
|
): bool {.discardable.} =
|
|
if cacheContent and not p.config.disableContentCache:
|
|
# We cache content regardless of whether it is in our radius or not
|
|
p.contentCache.put(contentId, content)
|
|
|
|
# Always re-check that the key is still in the node range to make sure only
|
|
# content in range is stored.
|
|
if p.inRange(contentId):
|
|
doAssert(p.dbPut != nil)
|
|
p.dbPut(contentKey, contentId, content)
|
|
true
|
|
else:
|
|
false
|
|
|
|
proc getLocalContent*(
|
|
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
|
|
): Opt[seq[byte]] =
|
|
# The cache can contain content that is not in our radius
|
|
let maybeContent = p.contentCache.get(contentId)
|
|
if maybeContent.isSome():
|
|
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
|
|
return maybeContent
|
|
|
|
portal_content_cache_misses.inc(labelValues = [$p.protocolId])
|
|
|
|
# Check first if content is in range, as this is a cheaper operation
|
|
# than the database lookup.
|
|
if p.inRange(contentId):
|
|
p.dbGet(contentKey, contentId)
|
|
else:
|
|
Opt.none(seq[byte])
|
|
|
|
proc seedTable*(p: PortalProtocol) =
|
|
## Seed the table with specifically provided Portal bootstrap nodes. These are
|
|
## nodes that must support the wire protocol for the specific content network.
|
|
# Note: We allow replacing the bootstrap nodes in the routing table as it is
|
|
# possible that some of these are not supporting the specific portal network.
|
|
# Other note: One could also pick nodes from the discv5 routing table to
|
|
# bootstrap the portal networks, however it would require a flag in the ENR to
|
|
# be added and there might be none in the routing table due to low amount of
|
|
# Portal nodes versus other nodes.
|
|
logScope:
|
|
protocolId = p.protocolId
|
|
|
|
for record in p.bootstrapRecords:
|
|
if p.addNode(record):
|
|
debug "Added bootstrap node", uri = toURI(record), protocolId = p.protocolId
|
|
else:
|
|
error "Bootstrap node could not be added",
|
|
uri = toURI(record), protocolId = p.protocolId
|
|
|
|
proc populateTable(p: PortalProtocol) {.async: (raises: [CancelledError]).} =
|
|
## Do a set of initial lookups to quickly populate the table.
|
|
# start with a self target query (neighbour nodes)
|
|
logScope:
|
|
protocolId = p.protocolId
|
|
|
|
let selfQuery = await p.query(p.localNode.id)
|
|
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
|
|
|
for i in 0 ..< initialLookups:
|
|
let randomQuery = await p.queryRandom()
|
|
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
|
|
|
debug "Total nodes in routing table after populate", total = p.routingTable.len()
|
|
|
|
proc revalidateNode*(p: PortalProtocol, n: Node) {.async: (raises: [CancelledError]).} =
|
|
let pong = await p.ping(n)
|
|
|
|
if pong.isOk():
|
|
let res = pong.get()
|
|
if res.enrSeq > n.record.seqNum:
|
|
# Request new ENR
|
|
let nodesMessage = await p.findNodes(n, @[0'u16])
|
|
if nodesMessage.isOk():
|
|
let nodes = nodesMessage.get()
|
|
if nodes.len > 0: # Normally a node should only return 1 record actually
|
|
discard p.addNode(nodes[0])
|
|
|
|
proc getNodeForRevalidation(p: PortalProtocol): Opt[Node] =
|
|
let node = p.routingTable.nodeToRevalidate()
|
|
if node.isNil:
|
|
# This should not occur except for when the RT is empty
|
|
return Opt.none(Node)
|
|
|
|
let now = now(chronos.Moment)
|
|
let timestamp = p.pingTimings.getOrDefault(node.id, Moment.init(0'i64, Second))
|
|
|
|
if (timestamp + revalidationTimeout) < now:
|
|
Opt.some(node)
|
|
else:
|
|
Opt.none(Node)
|
|
|
|
proc revalidateLoop(p: PortalProtocol) {.async: (raises: []).} =
|
|
## Loop which revalidates the nodes in the routing table by sending the ping
|
|
## message.
|
|
try:
|
|
while true:
|
|
await sleepAsync(milliseconds(p.baseProtocol.rng[].rand(revalidateMax)))
|
|
let n = getNodeForRevalidation(p)
|
|
if n.isSome:
|
|
asyncSpawn p.revalidateNode(n.get())
|
|
except CancelledError:
|
|
trace "revalidateLoop canceled"
|
|
|
|
proc refreshLoop(p: PortalProtocol) {.async: (raises: []).} =
|
|
## Loop that refreshes the routing table by starting a random query in case
|
|
## no queries were done since `refreshInterval` or more.
|
|
## It also refreshes the majority address voted for via pong responses.
|
|
logScope:
|
|
protocolId = p.protocolId
|
|
|
|
try:
|
|
while true:
|
|
# TODO: It would be nicer and more secure if this was event based and/or
|
|
# steered from the routing table.
|
|
while p.routingTable.len() == 0:
|
|
p.seedTable()
|
|
await p.populateTable()
|
|
await sleepAsync(5.seconds)
|
|
|
|
let currentTime = now(chronos.Moment)
|
|
if currentTime > (p.lastLookup + refreshInterval):
|
|
let randomQuery = await p.queryRandom()
|
|
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
|
debug "Total nodes in routing table", total = p.routingTable.len()
|
|
|
|
await sleepAsync(refreshInterval)
|
|
except CancelledError:
|
|
trace "refreshLoop canceled"
|
|
|
|
proc start*(p: PortalProtocol) =
|
|
p.refreshLoop = refreshLoop(p)
|
|
p.revalidateLoop = revalidateLoop(p)
|
|
|
|
# These are the concurrent offers per Portal wire protocol that is running.
|
|
# Using the `offerQueue` allows for limiting the amount of offers send and
|
|
# thus how many streams can be started.
|
|
# TODO:
|
|
# More thought needs to go into this as it is currently on a per network
|
|
# basis. Keep it simple like that? Or limit it better at the stream transport
|
|
# level? In the latter case, this might still need to be checked/blocked at
|
|
# the very start of sending the offer, because blocking/waiting too long
|
|
# between the received accept message and actually starting the stream and
|
|
# sending data could give issues due to timeouts on the other side.
|
|
# And then there are still limits to be applied also for FindContent and the
|
|
# incoming directions.
|
|
for i in 0 ..< p.config.maxConcurrentOffers:
|
|
p.offerWorkers.add(offerWorker(p))
|
|
|
|
proc stop*(p: PortalProtocol) {.async: (raises: []).} =
|
|
var futures: seq[Future[void]]
|
|
|
|
if not p.revalidateLoop.isNil():
|
|
futures.add(p.revalidateLoop.cancelAndWait())
|
|
if not p.refreshLoop.isNil():
|
|
futures.add(p.refreshLoop.cancelAndWait())
|
|
|
|
for worker in p.offerWorkers:
|
|
futures.add(worker.cancelAndWait())
|
|
|
|
await noCancel(allFutures(futures))
|
|
|
|
p.revalidateLoop = nil
|
|
p.refreshLoop = nil
|
|
p.offerWorkers = @[]
|
|
|
|
proc resolve*(
|
|
p: PortalProtocol, id: NodeId
|
|
): Future[Opt[Node]] {.async: (raises: [CancelledError]).} =
|
|
## Resolve a `Node` based on provided `NodeId`.
|
|
##
|
|
## This will first look in the own routing table. If the node is known, it
|
|
## will try to contact if for newer information. If node is not known or it
|
|
## does not reply, a lookup is done to see if it can find a (newer) record of
|
|
## the node on the network.
|
|
if id == p.localNode.id:
|
|
return Opt.some(p.localNode)
|
|
|
|
let node = p.getNode(id)
|
|
if node.isSome():
|
|
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
|
|
# TODO: Handle failures better. E.g. stop on different failures than timeout
|
|
if nodesMessage.isOk() and nodesMessage[].len > 0:
|
|
return Opt.some(nodesMessage[][0])
|
|
|
|
let discovered = await p.lookup(id)
|
|
for n in discovered:
|
|
if n.id == id:
|
|
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
|
|
return node
|
|
else:
|
|
return Opt.some(n)
|
|
|
|
return node
|
|
|
|
proc resolveWithRadius*(
|
|
p: PortalProtocol, id: NodeId
|
|
): Future[Opt[(Node, UInt256)]] {.async: (raises: [CancelledError]).} =
|
|
## Resolve a `Node` based on provided `NodeId`, also try to establish what
|
|
## is known radius of found node.
|
|
##
|
|
## This will first look in the own routing table. If the node is known, it
|
|
## will try to contact if for newer information. If node is not known or it
|
|
## does not reply, a lookup is done to see if it can find a (newer) record of
|
|
## the node on the network.
|
|
##
|
|
## If node is found, radius will be first checked in radius cache, it radius
|
|
## is not known node will be pinged to establish what is its current radius
|
|
##
|
|
|
|
let n = await p.resolve(id)
|
|
if n.isNone():
|
|
return Opt.none((Node, UInt256))
|
|
|
|
let node = n.unsafeGet()
|
|
|
|
let r = p.radiusCache.get(id)
|
|
if r.isSome():
|
|
return Opt.some((node, r.unsafeGet()))
|
|
|
|
let pongResult = await p.ping(node)
|
|
if pongResult.isOk():
|
|
let maybeRadius = p.radiusCache.get(id)
|
|
# After successful ping radius should already be in cache, but for the
|
|
# unlikely case that it is not, check it just to be sure.
|
|
# TODO: refactor ping to return node radius.
|
|
if maybeRadius.isNone():
|
|
return Opt.none((Node, UInt256))
|
|
else:
|
|
return Opt.some((node, maybeRadius.unsafeGet()))
|
|
else:
|
|
return Opt.none((Node, UInt256))
|