2021-07-09 11:34:16 +00:00
|
|
|
# Nimbus - Portal Network
|
|
|
|
# Copyright (c) 2021 Status Research & Development GmbH
|
|
|
|
# Licensed and distributed under either of
|
|
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
|
|
|
|
import
|
2021-07-30 19:19:03 +00:00
|
|
|
std/[sequtils, sets, algorithm],
|
2021-09-22 15:07:14 +00:00
|
|
|
stew/results, chronicles, chronos, nimcrypto/hash,
|
2021-09-15 09:24:03 +00:00
|
|
|
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
|
2021-09-20 10:55:03 +00:00
|
|
|
./messages
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-10-09 11:22:03 +00:00
|
|
|
export messages, routing_table
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
logScope:
|
2021-09-22 15:07:14 +00:00
|
|
|
topics = "portal_wire"
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
const
|
2021-07-30 19:19:03 +00:00
|
|
|
Alpha = 3 ## Kademlia concurrency factor
|
|
|
|
LookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
|
|
|
|
## message for a lookup or query
|
2021-09-15 09:24:03 +00:00
|
|
|
EnrsResultLimit = 32 ## Maximum amount of ENRs in the total Nodes messages
|
|
|
|
## that will be processed
|
2021-07-30 19:19:03 +00:00
|
|
|
RefreshInterval = 5.minutes ## Interval of launching a random query to
|
|
|
|
## refresh the routing table.
|
|
|
|
RevalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
|
|
|
|
## value in milliseconds
|
|
|
|
InitialLookups = 1 ## Amount of lookups done when populating the routing table
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
type
|
2021-09-03 08:57:19 +00:00
|
|
|
ContentResultKind* = enum
|
|
|
|
ContentFound, ContentMissing, ContentKeyValidationFailure
|
|
|
|
|
|
|
|
ContentResult* = object
|
|
|
|
case kind*: ContentResultKind
|
|
|
|
of ContentFound:
|
|
|
|
content*: seq[byte]
|
|
|
|
of ContentMissing:
|
2021-09-24 09:22:07 +00:00
|
|
|
contentId*: Uint256
|
2021-09-03 08:57:19 +00:00
|
|
|
of ContentKeyValidationFailure:
|
|
|
|
error*: string
|
|
|
|
|
2021-09-13 13:56:44 +00:00
|
|
|
# Treating Result as typed union type. If the content is present the handler
|
|
|
|
# should return it, if not it should return the content id so that closest
|
|
|
|
# neighbours can be localized.
|
|
|
|
ContentHandler* =
|
|
|
|
proc(contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.}
|
2021-09-03 08:57:19 +00:00
|
|
|
|
2021-07-09 11:34:16 +00:00
|
|
|
PortalProtocol* = ref object of TalkProtocol
|
2021-09-22 15:07:14 +00:00
|
|
|
protocolId: seq[byte]
|
2021-10-09 11:22:03 +00:00
|
|
|
routingTable*: RoutingTable
|
2021-07-09 11:34:16 +00:00
|
|
|
baseProtocol*: protocol.Protocol
|
|
|
|
dataRadius*: UInt256
|
2021-09-03 08:57:19 +00:00
|
|
|
handleContentRequest: ContentHandler
|
2021-09-23 12:26:41 +00:00
|
|
|
bootstrapRecords*: seq[Record]
|
2021-07-30 19:19:03 +00:00
|
|
|
lastLookup: chronos.Moment
|
|
|
|
refreshLoop: Future[void]
|
|
|
|
revalidateLoop: Future[void]
|
|
|
|
|
|
|
|
PortalResult*[T] = Result[T, cstring]
|
|
|
|
|
2021-09-02 12:35:25 +00:00
|
|
|
LookupResultKind = enum
|
|
|
|
Nodes, Content
|
|
|
|
|
|
|
|
LookupResult = object
|
|
|
|
case kind: LookupResultKind
|
|
|
|
of Nodes:
|
|
|
|
nodes: seq[Node]
|
|
|
|
of Content:
|
2021-09-03 08:57:19 +00:00
|
|
|
content: ByteList
|
2021-08-20 14:02:29 +00:00
|
|
|
|
|
|
|
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
|
|
|
p.routingTable.addNode(node)
|
|
|
|
|
2021-09-23 12:26:41 +00:00
|
|
|
proc addNode*(p: PortalProtocol, r: Record): bool =
|
|
|
|
let node = newNode(r)
|
|
|
|
if node.isOk():
|
|
|
|
p.addNode(node[]) == Added
|
|
|
|
else:
|
|
|
|
false
|
|
|
|
|
|
|
|
func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
|
|
|
|
2021-08-20 14:02:29 +00:00
|
|
|
proc neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
|
|
|
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
|
|
|
|
2021-07-30 19:19:03 +00:00
|
|
|
# TODO:
|
|
|
|
# - On incoming portal ping of unknown node: add node to routing table by
|
|
|
|
# grabbing ENR from discv5 routing table (might not have it)?
|
|
|
|
# - ENRs with portal protocol capabilities as field?
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
proc handlePing(p: PortalProtocol, ping: PingMessage):
|
|
|
|
seq[byte] =
|
|
|
|
let p = PongMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
|
|
|
dataRadius: p.dataRadius)
|
|
|
|
|
|
|
|
encodeMessage(p)
|
|
|
|
|
|
|
|
proc handleFindNode(p: PortalProtocol, fn: FindNodeMessage): seq[byte] =
|
|
|
|
if fn.distances.len == 0:
|
|
|
|
let enrs = List[ByteList, 32](@[])
|
|
|
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
|
|
|
elif fn.distances.contains(0):
|
|
|
|
# A request for our own record.
|
|
|
|
let enr = ByteList(rlp.encode(p.baseProtocol.localNode.record))
|
|
|
|
encodeMessage(NodesMessage(total: 1, enrs: List[ByteList, 32](@[enr])))
|
|
|
|
else:
|
2021-07-13 13:15:33 +00:00
|
|
|
let distances = fn.distances.asSeq()
|
|
|
|
if distances.all(proc (x: uint16): bool = return x <= 256):
|
|
|
|
let
|
2021-07-30 19:19:03 +00:00
|
|
|
nodes = p.routingTable.neighboursAtDistances(distances, seenOnly = true)
|
2021-07-13 13:15:33 +00:00
|
|
|
enrs = nodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
|
|
|
|
|
|
|
|
# TODO: Fixed here to total message of 1 for now, as else we would need to
|
|
|
|
# either move the send of the talkresp messages here, or allow for
|
|
|
|
# returning multiple messages.
|
|
|
|
# On the long run, it might just be better to use a stream in these cases?
|
|
|
|
encodeMessage(
|
|
|
|
NodesMessage(total: 1, enrs: List[ByteList, 32](List(enrs))))
|
|
|
|
else:
|
|
|
|
# invalid request, send empty back
|
|
|
|
let enrs = List[ByteList, 32](@[])
|
|
|
|
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-07-13 13:15:33 +00:00
|
|
|
proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
|
2021-09-03 08:57:19 +00:00
|
|
|
let contentHandlingResult = p.handleContentRequest(fc.contentKey)
|
2021-07-13 13:15:33 +00:00
|
|
|
# TODO: Need to check networkId, type, trie path
|
2021-09-03 08:57:19 +00:00
|
|
|
case contentHandlingResult.kind
|
|
|
|
of ContentFound:
|
|
|
|
let content = contentHandlingResult.content
|
2021-07-13 13:15:33 +00:00
|
|
|
let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send
|
|
|
|
encodeMessage(FoundContentMessage(
|
2021-09-03 08:57:19 +00:00
|
|
|
enrs: enrs, payload: ByteList(content)))
|
|
|
|
of ContentMissing:
|
2021-07-13 13:15:33 +00:00
|
|
|
let
|
2021-09-03 08:57:19 +00:00
|
|
|
contentId = contentHandlingResult.contentId
|
|
|
|
# TODO: Should we first do a simple check on ContentId versus Radius?
|
2021-07-30 19:19:03 +00:00
|
|
|
closestNodes = p.routingTable.neighbours(
|
2021-09-24 09:22:07 +00:00
|
|
|
NodeId(contentId), seenOnly = true)
|
2021-07-13 13:15:33 +00:00
|
|
|
payload = ByteList(@[]) # Empty payload when enrs are send
|
|
|
|
enrs =
|
2021-09-03 08:57:19 +00:00
|
|
|
closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
|
2021-07-13 13:15:33 +00:00
|
|
|
encodeMessage(FoundContentMessage(
|
|
|
|
enrs: List[ByteList, 32](List(enrs)), payload: payload))
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-09-03 08:57:19 +00:00
|
|
|
of ContentKeyValidationFailure:
|
|
|
|
# Retrun empty response when content key validation fail
|
|
|
|
let content = ByteList(@[])
|
|
|
|
let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send
|
|
|
|
encodeMessage(FoundContentMessage(
|
|
|
|
enrs: enrs, payload: content))
|
|
|
|
|
2021-09-22 09:28:04 +00:00
|
|
|
proc handleOffer(p: PortalProtocol, a: OfferMessage): seq[byte] =
|
2021-07-09 11:34:16 +00:00
|
|
|
let
|
2021-09-22 09:28:04 +00:00
|
|
|
# TODO: Random ID that needs to be stored together with some buffer that
|
|
|
|
# gets shared with uTP session that needs to be set up (start listening)
|
|
|
|
connectionId = Bytes2([byte 0x01, 0x02])
|
|
|
|
# TODO: Not implemented: Based on the content radius and the content that is
|
|
|
|
# already stored, interest in provided content keys needs to be indicated
|
|
|
|
# by setting bits in this BitList
|
|
|
|
contentKeys = ContentKeysBitList.init(a.contentKeys.len)
|
|
|
|
encodeMessage(
|
|
|
|
AcceptMessage(connectionId: connectionId, contentKeys: contentKeys))
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-09-07 13:56:51 +00:00
|
|
|
proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
|
|
|
|
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
|
2021-07-09 11:34:16 +00:00
|
|
|
doAssert(protocol of PortalProtocol)
|
|
|
|
|
|
|
|
let p = PortalProtocol(protocol)
|
|
|
|
|
|
|
|
let decoded = decodeMessage(request)
|
|
|
|
if decoded.isOk():
|
|
|
|
let message = decoded.get()
|
|
|
|
trace "Received message response", kind = message.kind
|
|
|
|
case message.kind
|
|
|
|
of MessageKind.ping:
|
|
|
|
p.handlePing(message.ping)
|
|
|
|
of MessageKind.findnode:
|
|
|
|
p.handleFindNode(message.findNode)
|
|
|
|
of MessageKind.findcontent:
|
|
|
|
p.handleFindContent(message.findcontent)
|
2021-09-22 09:28:04 +00:00
|
|
|
of MessageKind.offer:
|
|
|
|
p.handleOffer(message.offer)
|
2021-07-09 11:34:16 +00:00
|
|
|
else:
|
|
|
|
@[]
|
|
|
|
else:
|
|
|
|
@[]
|
|
|
|
|
2021-09-22 15:07:14 +00:00
|
|
|
proc new*(T: type PortalProtocol,
|
|
|
|
baseProtocol: protocol.Protocol,
|
|
|
|
protocolId: seq[byte],
|
2021-09-03 08:57:19 +00:00
|
|
|
contentHandler: ContentHandler,
|
2021-09-23 12:26:41 +00:00
|
|
|
dataRadius = UInt256.high(),
|
2021-10-05 19:16:33 +00:00
|
|
|
bootstrapRecords: openarray[Record] = [],
|
|
|
|
distanceCalculator: DistanceCalculator = XorDistanceCalculator
|
|
|
|
): T =
|
2021-07-09 11:34:16 +00:00
|
|
|
let proto = PortalProtocol(
|
2021-09-23 12:26:41 +00:00
|
|
|
protocolHandler: messageHandler,
|
|
|
|
protocolId: protocolId,
|
2021-09-07 13:56:51 +00:00
|
|
|
routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop,
|
2021-10-05 19:16:33 +00:00
|
|
|
DefaultTableIpLimits, baseProtocol.rng, distanceCalculator),
|
2021-07-09 11:34:16 +00:00
|
|
|
baseProtocol: baseProtocol,
|
2021-09-03 08:57:19 +00:00
|
|
|
dataRadius: dataRadius,
|
2021-09-22 15:07:14 +00:00
|
|
|
handleContentRequest: contentHandler,
|
2021-09-23 12:26:41 +00:00
|
|
|
bootstrapRecords: @bootstrapRecords)
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-09-22 15:07:14 +00:00
|
|
|
proto.baseProtocol.registerTalkProtocol(proto.protocolId, proto).expect(
|
2021-07-09 11:34:16 +00:00
|
|
|
"Only one protocol should have this id")
|
|
|
|
|
|
|
|
return proto
|
|
|
|
|
2021-09-13 13:56:44 +00:00
|
|
|
# Sends the discv5 talkreq nessage with provided Portal message, awaits and
|
|
|
|
# validates the proper response, and updates the Portal Network routing table.
|
2021-08-05 14:04:29 +00:00
|
|
|
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
2021-09-13 13:56:44 +00:00
|
|
|
p: PortalProtocol,
|
|
|
|
toNode: Node,
|
|
|
|
request: Request
|
|
|
|
): Future[PortalResult[Response]] {.async.} =
|
2021-09-23 12:26:41 +00:00
|
|
|
let talkresp =
|
2021-09-22 15:07:14 +00:00
|
|
|
await talkreq(p.baseProtocol, toNode, p.protocolId, encodeMessage(request))
|
2021-09-13 13:56:44 +00:00
|
|
|
|
2021-09-23 12:26:41 +00:00
|
|
|
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
|
|
|
# not supporting the specific talk protocol, as according to specification
|
|
|
|
# an empty response needs to be send in that case.
|
|
|
|
# See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05
|
|
|
|
let messageResponse = talkresp
|
2021-09-13 13:56:44 +00:00
|
|
|
.flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x))
|
|
|
|
.flatMap(proc (m: Message): Result[Response, cstring] =
|
2021-09-23 12:26:41 +00:00
|
|
|
getInnerMessageResult[Response](
|
2021-09-13 13:56:44 +00:00
|
|
|
m, cstring"Invalid message response received")
|
|
|
|
)
|
2021-08-05 14:04:29 +00:00
|
|
|
|
2021-09-23 12:26:41 +00:00
|
|
|
if messageResponse.isOk():
|
|
|
|
p.routingTable.setJustSeen(toNode)
|
|
|
|
else:
|
|
|
|
p.routingTable.replaceNode(toNode)
|
|
|
|
|
|
|
|
return messageResponse
|
|
|
|
|
2021-07-09 11:34:16 +00:00
|
|
|
proc ping*(p: PortalProtocol, dst: Node):
|
2021-07-30 19:19:03 +00:00
|
|
|
Future[PortalResult[PongMessage]] {.async.} =
|
2021-07-09 11:34:16 +00:00
|
|
|
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
|
|
|
dataRadius: p.dataRadius)
|
|
|
|
|
|
|
|
trace "Send message request", dstId = dst.id, kind = MessageKind.ping
|
2021-09-22 15:07:14 +00:00
|
|
|
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
2021-07-30 19:19:03 +00:00
|
|
|
Future[PortalResult[NodesMessage]] {.async.} =
|
2021-07-09 11:34:16 +00:00
|
|
|
let fn = FindNodeMessage(distances: distances)
|
|
|
|
|
|
|
|
trace "Send message request", dstId = dst.id, kind = MessageKind.findnode
|
2021-08-05 14:04:29 +00:00
|
|
|
# TODO Add nodes validation
|
2021-09-22 15:07:14 +00:00
|
|
|
return await reqResponse[FindNodeMessage, NodesMessage](p, dst, fn)
|
2021-07-09 11:34:16 +00:00
|
|
|
|
2021-09-03 08:57:19 +00:00
|
|
|
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
2021-07-30 19:19:03 +00:00
|
|
|
Future[PortalResult[FoundContentMessage]] {.async.} =
|
2021-09-03 08:57:19 +00:00
|
|
|
let fc = FindContentMessage(contentKey: contentKey)
|
2021-07-09 11:34:16 +00:00
|
|
|
|
|
|
|
trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent
|
2021-09-22 15:07:14 +00:00
|
|
|
return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, fc)
|
2021-07-30 19:19:03 +00:00
|
|
|
|
2021-09-22 09:28:04 +00:00
|
|
|
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
|
|
|
Future[PortalResult[AcceptMessage]] {.async.} =
|
|
|
|
let offer = OfferMessage(contentKeys: contentKeys)
|
|
|
|
|
|
|
|
trace "Send message request", dstId = dst.id, kind = MessageKind.offer
|
|
|
|
|
2021-09-22 15:07:14 +00:00
|
|
|
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
2021-09-22 09:28:04 +00:00
|
|
|
|
|
|
|
# TODO: Actually have to parse the offer message and get the uTP connection
|
|
|
|
# id, and initiate an uTP stream with given uTP connection id to get the data
|
|
|
|
# out.
|
|
|
|
|
2021-07-30 19:19:03 +00:00
|
|
|
proc recordsFromBytes(rawRecords: List[ByteList, 32]): seq[Record] =
|
|
|
|
var records: seq[Record]
|
|
|
|
for r in rawRecords.asSeq():
|
|
|
|
var record: Record
|
|
|
|
if record.fromBytes(r.asSeq()):
|
|
|
|
records.add(record)
|
|
|
|
|
|
|
|
records
|
|
|
|
|
|
|
|
proc lookupWorker(p: PortalProtocol, destNode: Node, target: NodeId):
|
|
|
|
Future[seq[Node]] {.async.} =
|
|
|
|
var nodes: seq[Node]
|
2021-10-09 11:22:03 +00:00
|
|
|
# TODO: Distances are not correct here. Fix + tests
|
2021-07-30 19:19:03 +00:00
|
|
|
let distances = lookupDistances(target, destNode.id)
|
|
|
|
|
|
|
|
let nodesMessage = await p.findNode(destNode, List[uint16, 256](distances))
|
|
|
|
if nodesMessage.isOk():
|
|
|
|
let records = recordsFromBytes(nodesMessage.get().enrs)
|
2021-10-09 11:22:03 +00:00
|
|
|
# TODO: distance function is wrong inhere, fix + tests
|
2021-09-15 09:24:03 +00:00
|
|
|
let verifiedNodes = verifyNodesRecords(records, destNode, EnrsResultLimit, distances)
|
2021-07-30 19:19:03 +00:00
|
|
|
nodes.add(verifiedNodes)
|
|
|
|
|
|
|
|
# Attempt to add all nodes discovered
|
|
|
|
for n in nodes:
|
|
|
|
discard p.routingTable.addNode(n)
|
|
|
|
|
|
|
|
return nodes
|
|
|
|
|
|
|
|
proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
|
|
|
|
## Perform a lookup for the given target, return the closest n nodes to the
|
|
|
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
|
|
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
|
|
|
# Unvalidated nodes are used for requests as a form of validation.
|
|
|
|
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE,
|
|
|
|
seenOnly = false)
|
|
|
|
|
|
|
|
var asked, seen = initHashSet[NodeId]()
|
|
|
|
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
|
|
|
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
|
|
|
for node in closestNodes:
|
|
|
|
seen.incl(node.id)
|
|
|
|
|
|
|
|
var pendingQueries = newSeqOfCap[Future[seq[Node]]](Alpha)
|
|
|
|
|
|
|
|
while true:
|
|
|
|
var i = 0
|
|
|
|
# Doing `alpha` amount of requests at once as long as closer non queried
|
|
|
|
# nodes are discovered.
|
|
|
|
while i < closestNodes.len and pendingQueries.len < Alpha:
|
|
|
|
let n = closestNodes[i]
|
|
|
|
if not asked.containsOrIncl(n.id):
|
|
|
|
pendingQueries.add(p.lookupWorker(n, target))
|
|
|
|
inc i
|
|
|
|
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
|
|
|
|
if pendingQueries.len == 0:
|
|
|
|
break
|
|
|
|
|
|
|
|
let query = await one(pendingQueries)
|
|
|
|
trace "Got lookup query response"
|
|
|
|
|
|
|
|
let index = pendingQueries.find(query)
|
|
|
|
if index != -1:
|
|
|
|
pendingQueries.del(index)
|
|
|
|
else:
|
|
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
|
|
|
|
let nodes = query.read
|
|
|
|
# TODO: Remove node on timed-out query?
|
|
|
|
for n in nodes:
|
|
|
|
if not seen.containsOrIncl(n.id):
|
|
|
|
# If it wasn't seen before, insert node while remaining sorted
|
|
|
|
closestNodes.insert(n, closestNodes.lowerBound(n,
|
|
|
|
proc(x: Node, n: Node): int =
|
2021-10-09 11:22:03 +00:00
|
|
|
cmp(p.routingTable.distance(x.id, target),
|
|
|
|
p.routingTable.distance(n.id, target))
|
2021-07-30 19:19:03 +00:00
|
|
|
))
|
|
|
|
|
|
|
|
if closestNodes.len > BUCKET_SIZE:
|
|
|
|
closestNodes.del(closestNodes.high())
|
|
|
|
|
|
|
|
p.lastLookup = now(chronos.Moment)
|
|
|
|
return closestNodes
|
|
|
|
|
2021-09-13 13:56:44 +00:00
|
|
|
proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage,
|
|
|
|
dst: Node, nodes: var seq[Node]): LookupResult =
|
2021-09-02 12:35:25 +00:00
|
|
|
if (m.enrs.len() != 0 and m.payload.len() == 0):
|
|
|
|
let records = recordsFromBytes(m.enrs)
|
2021-09-15 09:24:03 +00:00
|
|
|
let verifiedNodes = verifyNodesRecords(records, dst, EnrsResultLimit)
|
|
|
|
nodes.add(verifiedNodes)
|
|
|
|
|
|
|
|
for n in nodes:
|
|
|
|
# Attempt to add all nodes discovered
|
|
|
|
discard p.routingTable.addNode(n)
|
2021-09-02 12:35:25 +00:00
|
|
|
|
|
|
|
return LookupResult(kind: Nodes, nodes: nodes)
|
|
|
|
elif (m.payload.len() != 0 and m.enrs.len() == 0):
|
|
|
|
return LookupResult(kind: Content, content: m.payload)
|
|
|
|
elif ((m.payload.len() != 0 and m.enrs.len() != 0)):
|
2021-09-03 08:57:19 +00:00
|
|
|
# Both payload and enrs are filled, which means protocol breach. For now
|
2021-09-02 12:35:25 +00:00
|
|
|
# just logging offending node to quickly identify it
|
|
|
|
warn "Invalid foundcontent response form node ", uri = toURI(dst.record)
|
|
|
|
return LookupResult(kind: Nodes, nodes: nodes)
|
|
|
|
else:
|
|
|
|
return LookupResult(kind: Nodes, nodes: nodes)
|
|
|
|
|
2021-09-03 08:57:19 +00:00
|
|
|
proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList):
|
2021-09-02 12:35:25 +00:00
|
|
|
Future[LookupResult] {.async.} =
|
|
|
|
var nodes: seq[Node]
|
|
|
|
|
|
|
|
let contentMessageResponse = await p.findContent(destNode, target)
|
|
|
|
|
|
|
|
if contentMessageResponse.isOk():
|
2021-09-13 13:56:44 +00:00
|
|
|
return handleFoundContentMessage(
|
|
|
|
p, contentMessageResponse.get(), destNode, nodes)
|
2021-09-02 12:35:25 +00:00
|
|
|
else:
|
|
|
|
return LookupResult(kind: Nodes, nodes: nodes)
|
|
|
|
|
|
|
|
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
|
|
|
# networks will probably be very similar. Extract lookup function to separate module
|
|
|
|
# and make it more generaic
|
2021-09-13 13:56:44 +00:00
|
|
|
proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
|
|
|
Future[Option[ByteList]] {.async.} =
|
2021-09-02 12:35:25 +00:00
|
|
|
## Perform a lookup for the given target, return the closest n nodes to the
|
|
|
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
|
|
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
|
|
|
# Unvalidated nodes are used for requests as a form of validation.
|
|
|
|
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE,
|
|
|
|
seenOnly = false)
|
|
|
|
|
|
|
|
var asked, seen = initHashSet[NodeId]()
|
|
|
|
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
|
|
|
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
|
|
|
for node in closestNodes:
|
|
|
|
seen.incl(node.id)
|
|
|
|
|
|
|
|
var pendingQueries = newSeqOfCap[Future[LookupResult]](Alpha)
|
|
|
|
|
|
|
|
while true:
|
|
|
|
var i = 0
|
|
|
|
# Doing `alpha` amount of requests at once as long as closer non queried
|
|
|
|
# nodes are discovered.
|
|
|
|
while i < closestNodes.len and pendingQueries.len < Alpha:
|
|
|
|
let n = closestNodes[i]
|
|
|
|
if not asked.containsOrIncl(n.id):
|
|
|
|
pendingQueries.add(p.contentLookupWorker(n, target))
|
|
|
|
inc i
|
|
|
|
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
|
|
|
|
if pendingQueries.len == 0:
|
|
|
|
break
|
|
|
|
|
|
|
|
let query = await one(pendingQueries)
|
|
|
|
trace "Got lookup query response"
|
|
|
|
|
|
|
|
let index = pendingQueries.find(query)
|
|
|
|
if index != -1:
|
|
|
|
pendingQueries.del(index)
|
|
|
|
else:
|
|
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
|
|
|
|
let lookupResult = query.read
|
|
|
|
|
|
|
|
# TODO: Remove node on timed-out query? To handle failure better, LookUpResult
|
|
|
|
# should have third enum option like failure.
|
|
|
|
case lookupResult.kind
|
|
|
|
of Nodes:
|
|
|
|
for n in lookupResult.nodes:
|
|
|
|
if not seen.containsOrIncl(n.id):
|
|
|
|
# If it wasn't seen before, insert node while remaining sorted
|
|
|
|
closestNodes.insert(n, closestNodes.lowerBound(n,
|
|
|
|
proc(x: Node, n: Node): int =
|
2021-10-09 11:22:03 +00:00
|
|
|
cmp(p.routingTable.distance(x.id, targetId),
|
|
|
|
p.routingTable.distance(n.id, targetId))
|
2021-09-02 12:35:25 +00:00
|
|
|
))
|
|
|
|
|
|
|
|
if closestNodes.len > BUCKET_SIZE:
|
|
|
|
closestNodes.del(closestNodes.high())
|
|
|
|
of Content:
|
|
|
|
# cancel any pending queries as we have find the content
|
|
|
|
for f in pendingQueries:
|
|
|
|
f.cancel()
|
|
|
|
|
|
|
|
return some(lookupResult.content)
|
2021-09-03 08:57:19 +00:00
|
|
|
|
2021-09-02 12:35:25 +00:00
|
|
|
return none[ByteList]()
|
|
|
|
|
2021-07-30 19:19:03 +00:00
|
|
|
proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
|
|
|
{.async.} =
|
|
|
|
## Query k nodes for the given target, returns all nodes found, including the
|
|
|
|
## nodes queried.
|
|
|
|
##
|
|
|
|
## This will take k nodes from the routing table closest to target and
|
|
|
|
## query them for nodes closest to target. If there are less than k nodes in
|
|
|
|
## the routing table, nodes returned by the first queries will be used.
|
|
|
|
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
|
|
|
|
|
|
|
|
var asked, seen = initHashSet[NodeId]()
|
|
|
|
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
|
|
|
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
|
|
|
for node in queryBuffer:
|
|
|
|
seen.incl(node.id)
|
|
|
|
|
|
|
|
var pendingQueries = newSeqOfCap[Future[seq[Node]]](Alpha)
|
|
|
|
|
|
|
|
while true:
|
|
|
|
var i = 0
|
|
|
|
while i < min(queryBuffer.len, k) and pendingQueries.len < Alpha:
|
|
|
|
let n = queryBuffer[i]
|
|
|
|
if not asked.containsOrIncl(n.id):
|
|
|
|
pendingQueries.add(p.lookupWorker(n, target))
|
|
|
|
inc i
|
|
|
|
|
|
|
|
trace "Pending lookup queries", total = pendingQueries.len
|
|
|
|
|
|
|
|
if pendingQueries.len == 0:
|
|
|
|
break
|
|
|
|
|
|
|
|
let query = await one(pendingQueries)
|
|
|
|
trace "Got lookup query response"
|
|
|
|
|
|
|
|
let index = pendingQueries.find(query)
|
|
|
|
if index != -1:
|
|
|
|
pendingQueries.del(index)
|
|
|
|
else:
|
|
|
|
error "Resulting query should have been in the pending queries"
|
|
|
|
|
|
|
|
let nodes = query.read
|
|
|
|
# TODO: Remove node on timed-out query?
|
|
|
|
for n in nodes:
|
|
|
|
if not seen.containsOrIncl(n.id):
|
|
|
|
queryBuffer.add(n)
|
|
|
|
|
|
|
|
p.lastLookup = now(chronos.Moment)
|
|
|
|
return queryBuffer
|
|
|
|
|
|
|
|
proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
|
|
|
|
## Perform a query for a random target, return all nodes discovered.
|
|
|
|
p.query(NodeId.random(p.baseProtocol.rng[]))
|
|
|
|
|
2021-09-23 12:26:41 +00:00
|
|
|
proc seedTable*(p: PortalProtocol) =
|
|
|
|
## Seed the table with nodes from the discv5 table and with specifically
|
|
|
|
## provided bootstrap nodes. The latter are then supposed to be nodes
|
|
|
|
## supporting the wire protocol for the specific content network.
|
|
|
|
# Note: We allow replacing the bootstrap nodes in the routing table as it is
|
|
|
|
# possible that some of these are not supporting the specific portal network.
|
|
|
|
|
|
|
|
# TODO: Picking some nodes from discv5 routing table now. Should definitely
|
|
|
|
# add supported Portal network info in a k:v pair in the ENRs and filter on
|
|
|
|
# that.
|
2021-07-30 19:19:03 +00:00
|
|
|
let closestNodes = p.baseProtocol.neighbours(
|
|
|
|
NodeId.random(p.baseProtocol.rng[]), seenOnly = true)
|
|
|
|
|
|
|
|
for node in closestNodes:
|
|
|
|
if p.routingTable.addNode(node) == Added:
|
|
|
|
debug "Added node from discv5 routing table", uri = toURI(node.record)
|
|
|
|
else:
|
|
|
|
debug "Node from discv5 routing table could not be added", uri = toURI(node.record)
|
|
|
|
|
2021-09-23 12:26:41 +00:00
|
|
|
# Seed the table with bootstrap nodes.
|
|
|
|
for record in p.bootstrapRecords:
|
|
|
|
if p.addNode(record):
|
|
|
|
debug "Added bootstrap node", uri = toURI(record),
|
|
|
|
protocolId = p.protocolId
|
|
|
|
else:
|
|
|
|
error "Bootstrap node could not be added", uri = toURI(record),
|
|
|
|
protocolId = p.protocolId
|
|
|
|
|
2021-07-30 19:19:03 +00:00
|
|
|
proc populateTable(p: PortalProtocol) {.async.} =
|
|
|
|
## Do a set of initial lookups to quickly populate the table.
|
|
|
|
# start with a self target query (neighbour nodes)
|
|
|
|
let selfQuery = await p.query(p.baseProtocol.localNode.id)
|
|
|
|
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
|
|
|
|
|
|
|
for i in 0..<InitialLookups:
|
|
|
|
let randomQuery = await p.queryRandom()
|
|
|
|
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
|
|
|
|
|
|
|
debug "Total nodes in routing table after populate",
|
|
|
|
total = p.routingTable.len()
|
|
|
|
|
|
|
|
proc revalidateNode*(p: PortalProtocol, n: Node) {.async.} =
|
|
|
|
let pong = await p.ping(n)
|
|
|
|
|
|
|
|
if pong.isOK():
|
|
|
|
let res = pong.get()
|
|
|
|
if res.enrSeq > n.record.seqNum:
|
|
|
|
# Request new ENR
|
|
|
|
let nodes = await p.findNode(n, List[uint16, 256](@[0'u16]))
|
|
|
|
if nodes.isOk():
|
|
|
|
let records = recordsFromBytes(nodes.get().enrs)
|
2021-10-09 11:22:03 +00:00
|
|
|
# TODO: distance function is wrong inhere, fix + tests
|
2021-09-15 09:24:03 +00:00
|
|
|
let verifiedNodes = verifyNodesRecords(records, n, EnrsResultLimit, @[0'u16])
|
2021-07-30 19:19:03 +00:00
|
|
|
if verifiedNodes.len > 0:
|
|
|
|
discard p.routingTable.addNode(verifiedNodes[0])
|
|
|
|
|
|
|
|
proc revalidateLoop(p: PortalProtocol) {.async.} =
|
|
|
|
## Loop which revalidates the nodes in the routing table by sending the ping
|
|
|
|
## message.
|
|
|
|
try:
|
|
|
|
while true:
|
|
|
|
await sleepAsync(milliseconds(p.baseProtocol.rng[].rand(RevalidateMax)))
|
|
|
|
let n = p.routingTable.nodeToRevalidate()
|
|
|
|
if not n.isNil:
|
|
|
|
asyncSpawn p.revalidateNode(n)
|
|
|
|
except CancelledError:
|
|
|
|
trace "revalidateLoop canceled"
|
|
|
|
|
|
|
|
proc refreshLoop(p: PortalProtocol) {.async.} =
|
|
|
|
## Loop that refreshes the routing table by starting a random query in case
|
|
|
|
## no queries were done since `refreshInterval` or more.
|
|
|
|
## It also refreshes the majority address voted for via pong responses.
|
|
|
|
try:
|
|
|
|
await p.populateTable()
|
|
|
|
|
|
|
|
while true:
|
|
|
|
let currentTime = now(chronos.Moment)
|
|
|
|
if currentTime > (p.lastLookup + RefreshInterval):
|
|
|
|
let randomQuery = await p.queryRandom()
|
|
|
|
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
|
|
|
debug "Total nodes in routing table", total = p.routingTable.len()
|
|
|
|
|
|
|
|
await sleepAsync(RefreshInterval)
|
|
|
|
except CancelledError:
|
|
|
|
trace "refreshLoop canceled"
|
|
|
|
|
|
|
|
proc start*(p: PortalProtocol) =
|
|
|
|
p.seedTable()
|
|
|
|
|
|
|
|
p.refreshLoop = refreshLoop(p)
|
|
|
|
p.revalidateLoop = revalidateLoop(p)
|
|
|
|
|
|
|
|
proc stop*(p: PortalProtocol) =
|
|
|
|
if not p.revalidateLoop.isNil:
|
|
|
|
p.revalidateLoop.cancel()
|
|
|
|
if not p.refreshLoop.isNil:
|
|
|
|
p.refreshLoop.cancel()
|