mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-02 23:35:31 +00:00
Add uTP over discv5 in portal wire protocol (#922)
* Add uTP over discv5 in portal wire protocol
This commit is contained in:
parent
f47c710ada
commit
f19a64a2fe
@ -15,9 +15,11 @@ import
|
||||
eth/keys, eth/net/nat,
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
./conf, ./common/common_utils,
|
||||
./rpc/[rpc_eth_api, bridge_client, rpc_discovery_api, rpc_portal_api],
|
||||
./rpc/[rpc_eth_api, bridge_client, rpc_discovery_api, rpc_portal_api,
|
||||
rpc_portal_debug_api],
|
||||
./network/state/[state_network, state_content],
|
||||
./network/history/[history_network, history_content],
|
||||
./network/wire/portal_stream,
|
||||
./content_db
|
||||
|
||||
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
|
||||
@ -65,13 +67,17 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
||||
# Store the database at contentdb prefixed with the first 8 chars of node id.
|
||||
# This is done because the content in the db is dependant on the `NodeId` and
|
||||
# the selected `Radius`.
|
||||
let db =
|
||||
ContentDB.new(config.dataDir / "db" / "contentdb_" &
|
||||
let
|
||||
db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
|
||||
d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex())
|
||||
|
||||
let
|
||||
stateNetwork = StateNetwork.new(d, db, bootstrapRecords = bootstrapRecords)
|
||||
historyNetwork = HistoryNetwork.new(d, db, bootstrapRecords = bootstrapRecords)
|
||||
# One instance of PortalStream and thus UtpDiscv5Protocol is shared over all
|
||||
# the Portal networks.
|
||||
portalStream = PortalStream.new(d)
|
||||
stateNetwork = StateNetwork.new(d, db, portalStream,
|
||||
bootstrapRecords = bootstrapRecords)
|
||||
historyNetwork = HistoryNetwork.new(d, db, portalStream,
|
||||
bootstrapRecords = bootstrapRecords)
|
||||
|
||||
# TODO: If no new network key is generated then we should first check if an
|
||||
# enr file exists, and in the case it does read out the seqNum from it and
|
||||
@ -100,6 +106,8 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
||||
rpcHttpServerWithProxy.installDiscoveryApiHandlers(d)
|
||||
rpcHttpServerWithProxy.installPortalApiHandlers(stateNetwork.portalProtocol, "state")
|
||||
rpcHttpServerWithProxy.installPortalApiHandlers(historyNetwork.portalProtocol, "history")
|
||||
rpcHttpServerWithProxy.installPortalDebugApiHandlers(stateNetwork.portalProtocol, "state")
|
||||
rpcHttpServerWithProxy.installPortalDebugApiHandlers(stateNetwork.portalProtocol, "history")
|
||||
# TODO for now we can only proxy to local node (or remote one without ssl) to make it possible
|
||||
# to call infura https://github.com/status-im/nim-json-rpc/pull/101 needs to get merged for http client to support https/
|
||||
waitFor rpcHttpServerWithProxy.start()
|
||||
|
@ -10,7 +10,7 @@ import
|
||||
stew/results, chronos,
|
||||
eth/p2p/discoveryv5/[protocol, enr],
|
||||
../../content_db,
|
||||
../wire/portal_protocol,
|
||||
../wire/[portal_protocol, portal_stream],
|
||||
./history_content
|
||||
|
||||
const
|
||||
@ -51,11 +51,12 @@ proc new*(
|
||||
T: type HistoryNetwork,
|
||||
baseProtocol: protocol.Protocol,
|
||||
contentDB: ContentDB,
|
||||
portalStream: PortalStream,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = []): T =
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, historyProtocolId, contentDB, toContentIdHandler,
|
||||
dataRadius, bootstrapRecords)
|
||||
portalStream, dataRadius, bootstrapRecords)
|
||||
|
||||
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||
|
||||
|
@ -10,7 +10,7 @@ import
|
||||
stew/results, chronos,
|
||||
eth/p2p/discoveryv5/[protocol, enr],
|
||||
../../content_db,
|
||||
../wire/portal_protocol,
|
||||
../wire/[portal_protocol, portal_stream],
|
||||
./state_content,
|
||||
./state_distance
|
||||
|
||||
@ -51,11 +51,12 @@ proc new*(
|
||||
T: type StateNetwork,
|
||||
baseProtocol: protocol.Protocol,
|
||||
contentDB: ContentDB,
|
||||
portalStream: PortalStream,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = []): T =
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, stateProtocolId, contentDB, toContentIdHandler,
|
||||
dataRadius, bootstrapRecords, stateDistanceCalculator)
|
||||
portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator)
|
||||
|
||||
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||
|
||||
|
@ -14,8 +14,10 @@ import
|
||||
std/[sequtils, sets, algorithm],
|
||||
stew/results, chronicles, chronos, nimcrypto/hash, bearssl,
|
||||
ssz_serialization,
|
||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
|
||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
||||
nodes_verification],
|
||||
../../content_db,
|
||||
./portal_stream,
|
||||
./messages
|
||||
|
||||
export messages, routing_table
|
||||
@ -34,18 +36,6 @@ const
|
||||
initialLookups = 1 ## Amount of lookups done when populating the routing table
|
||||
|
||||
type
|
||||
ContentResultKind* = enum
|
||||
ContentFound, ContentMissing, ContentKeyValidationFailure
|
||||
|
||||
ContentResult* = object
|
||||
case kind*: ContentResultKind
|
||||
of ContentFound:
|
||||
content*: seq[byte]
|
||||
of ContentMissing:
|
||||
contentId*: Uint256
|
||||
of ContentKeyValidationFailure:
|
||||
error*: string
|
||||
|
||||
ToContentIdHandler* =
|
||||
proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.}
|
||||
|
||||
@ -62,18 +52,20 @@ type
|
||||
lastLookup: chronos.Moment
|
||||
refreshLoop: Future[void]
|
||||
revalidateLoop: Future[void]
|
||||
stream: PortalStream
|
||||
|
||||
PortalResult*[T] = Result[T, cstring]
|
||||
|
||||
LookupResultKind = enum
|
||||
Nodes, Content
|
||||
FoundContentKind* = enum
|
||||
Nodes,
|
||||
Content
|
||||
|
||||
LookupResult = object
|
||||
case kind: LookupResultKind
|
||||
of Nodes:
|
||||
nodes: seq[Node]
|
||||
FoundContent* = object
|
||||
case kind*: FoundContentKind
|
||||
of Content:
|
||||
content: ByteList
|
||||
content*: ByteList
|
||||
of Nodes:
|
||||
nodes*: seq[Node]
|
||||
|
||||
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
||||
p.routingTable.addNode(node)
|
||||
@ -127,7 +119,8 @@ func handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
|
||||
let enrs = List[ByteList, 32](@[])
|
||||
encodeMessage(NodesMessage(total: 1, enrs: enrs))
|
||||
|
||||
proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
|
||||
proc handleFindContent(
|
||||
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId): seq[byte] =
|
||||
let contentIdOpt = p.toContentId(fc.contentKey)
|
||||
if contentIdOpt.isSome():
|
||||
let
|
||||
@ -142,8 +135,7 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
|
||||
encodeMessage(ContentMessage(
|
||||
contentMessageType: contentType, content: ByteList(content)))
|
||||
else:
|
||||
var connectionId: Bytes2
|
||||
brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId)
|
||||
let connectionId = p.stream.addContentRequest(srcId, ByteList(content))
|
||||
|
||||
encodeMessage(ContentMessage(
|
||||
contentMessageType: connectionIdType, connectionId: connectionId))
|
||||
@ -162,8 +154,9 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
|
||||
# discv5 layer.
|
||||
@[]
|
||||
|
||||
proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] =
|
||||
var contentKeys = ContentKeysBitList.init(o.contentKeys.len)
|
||||
proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
|
||||
var contentKeysBitList = ContentKeysBitList.init(o.contentKeys.len)
|
||||
var contentKeys = ContentKeysList.init(@[])
|
||||
# TODO: Do we need some protection against a peer offering lots (64x) of
|
||||
# content that fits our Radius but is actually bogus?
|
||||
# Additional TODO, but more of a specification clarification: What if we don't
|
||||
@ -175,16 +168,16 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] =
|
||||
let contentId = contentIdOpt.get()
|
||||
if p.inRange(contentId):
|
||||
if not p.contentDB.contains(contentId):
|
||||
contentKeys.setBit(i)
|
||||
contentKeysBitList.setBit(i)
|
||||
discard contentKeys.add(contentKey)
|
||||
else:
|
||||
# Return empty response when content key validation fails
|
||||
return @[]
|
||||
|
||||
var connectionId: Bytes2
|
||||
brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId)
|
||||
let connectionId = p.stream.addContentOffer(srcId, contentKeys)
|
||||
|
||||
encodeMessage(
|
||||
AcceptMessage(connectionId: connectionId, contentKeys: contentKeys))
|
||||
AcceptMessage(connectionId: connectionId, contentKeys: contentKeysBitList))
|
||||
|
||||
# TODO: Neighborhood gossip
|
||||
# After data has been received and validated from an offer, we need to
|
||||
@ -220,9 +213,9 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
||||
of MessageKind.findnodes:
|
||||
p.handleFindNodes(message.findNodes)
|
||||
of MessageKind.findcontent:
|
||||
p.handleFindContent(message.findcontent)
|
||||
p.handleFindContent(message.findcontent, srcId)
|
||||
of MessageKind.offer:
|
||||
p.handleOffer(message.offer)
|
||||
p.handleOffer(message.offer, srcId)
|
||||
else:
|
||||
# This would mean a that Portal wire response message is being send over a
|
||||
# discv5 talkreq message.
|
||||
@ -237,6 +230,7 @@ proc new*(T: type PortalProtocol,
|
||||
protocolId: PortalProtocolId,
|
||||
contentDB: ContentDB,
|
||||
toContentId: ToContentIdHandler,
|
||||
stream: PortalStream,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = [],
|
||||
distanceCalculator: DistanceCalculator = XorDistanceCalculator
|
||||
@ -249,6 +243,7 @@ proc new*(T: type PortalProtocol,
|
||||
baseProtocol: baseProtocol,
|
||||
contentDB: contentDB,
|
||||
toContentId: toContentId,
|
||||
stream: stream,
|
||||
dataRadius: dataRadius,
|
||||
bootstrapRecords: @bootstrapRecords)
|
||||
|
||||
@ -306,14 +301,14 @@ proc findNodes*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
||||
# TODO Add nodes validation
|
||||
return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn)
|
||||
|
||||
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||
proc findContentImpl*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||
Future[PortalResult[ContentMessage]] {.async.} =
|
||||
let fc = FindContentMessage(contentKey: contentKey)
|
||||
|
||||
trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent
|
||||
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
|
||||
|
||||
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||
proc offerImpl*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||
Future[PortalResult[AcceptMessage]] {.async.} =
|
||||
let offer = OfferMessage(contentKeys: contentKeys)
|
||||
|
||||
@ -321,10 +316,6 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||
|
||||
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
||||
|
||||
# TODO: Actually have to parse the accept message and get the uTP connection
|
||||
# id, and initiate an uTP stream with given uTP connection id to get the data
|
||||
# out.
|
||||
|
||||
proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]] =
|
||||
var records: seq[Record]
|
||||
for r in rawRecords.asSeq():
|
||||
@ -338,6 +329,96 @@ proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]
|
||||
|
||||
ok(records)
|
||||
|
||||
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||
Future[PortalResult[FoundContent]] {.async.} =
|
||||
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
|
||||
|
||||
if contentMessageResponse.isOk():
|
||||
let m = contentMessageResponse.get()
|
||||
case m.contentMessageType:
|
||||
of connectionIdType:
|
||||
# uTP protocol uses BE for all values in the header, incl. connection id
|
||||
let socketRes = await p.stream.transport.connectTo(
|
||||
dst, uint16.fromBytesBE(m.connectionId))
|
||||
if socketRes.isErr():
|
||||
# TODO: get proper error mapped
|
||||
return err("Error connecting to uTP socket")
|
||||
let socket = socketRes.get()
|
||||
if not socket.isConnected():
|
||||
socket.close()
|
||||
return err("Portal uTP socket is not in connected state")
|
||||
|
||||
# Read all bytes from the socket
|
||||
# This will either end with a FIN, or because the read action times out.
|
||||
# A FIN does not necessarily mean that the data read is complete. Further
|
||||
# validation is required, using a length prefix here might be beneficial for
|
||||
# this.
|
||||
let readData = socket.read()
|
||||
if await readData.withTimeout(1.seconds):
|
||||
let content = readData.read
|
||||
await socket.closeWait()
|
||||
return ok(FoundContent(kind: Content, content: ByteList(content)))
|
||||
else:
|
||||
await socket.closeWait()
|
||||
return err("Reading data from socket timed out, content request failed")
|
||||
of contentType:
|
||||
return ok(FoundContent(kind: Content, content: m.content))
|
||||
of enrsType:
|
||||
let records = recordsFromBytes(m.enrs)
|
||||
if records.isOk():
|
||||
let verifiedNodes =
|
||||
verifyNodesRecords(records.get(), dst, enrsResultLimit)
|
||||
|
||||
return ok(FoundContent(kind: Nodes, nodes: verifiedNodes))
|
||||
else:
|
||||
return err("Content message returned invalid ENRs")
|
||||
|
||||
# TODO: Depending on how this gets used, it might be better not to request
|
||||
# the data from the database here, but pass it as parameter. (like, if it was
|
||||
# just received it and now needs to be forwarded)
|
||||
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||
Future[PortalResult[void]] {.async.} =
|
||||
let acceptMessageResponse = await p.offerImpl(dst, contentKeys)
|
||||
|
||||
if acceptMessageResponse.isOk():
|
||||
let m = acceptMessageResponse.get()
|
||||
|
||||
let clientSocketRes = await p.stream.transport.connectTo(
|
||||
dst, uint16.fromBytesBE(m.connectionId))
|
||||
if clientSocketRes.isErr():
|
||||
# TODO: get proper error mapped
|
||||
return err("Error connecting to uTP socket")
|
||||
let clientSocket = clientSocketRes.get()
|
||||
if not clientSocket.isConnected():
|
||||
clientSocket.close()
|
||||
return err("Portal uTP socket is not in connected state")
|
||||
|
||||
# Filter contentKeys with bitlist
|
||||
var requestedContentKeys: seq[ByteList]
|
||||
for i, b in m.contentKeys:
|
||||
if b:
|
||||
requestedContentKeys.add(contentKeys[i])
|
||||
|
||||
for contentKey in requestedContentKeys:
|
||||
let contentIdOpt = p.toContentId(contentKey)
|
||||
if contentIdOpt.isSome():
|
||||
let
|
||||
contentId = contentIdOpt.get()
|
||||
maybeContent = p.contentDB.get(contentId)
|
||||
if maybeContent.isSome():
|
||||
let content = maybeContent.get()
|
||||
let dataWritten = await clientSocket.write(content)
|
||||
if dataWritten.isErr:
|
||||
error "Error writing requested data", error = dataWritten.error
|
||||
# No point in trying to continue writing data
|
||||
clientSocket.close()
|
||||
return err("Error writing requested data")
|
||||
|
||||
await clientSocket.closeWait()
|
||||
return ok()
|
||||
else:
|
||||
return err("No accept response")
|
||||
|
||||
proc findNodesVerified*(
|
||||
p: PortalProtocol, dst: Node, distances: seq[uint16]):
|
||||
Future[PortalResult[seq[Node]]] {.async.} =
|
||||
@ -424,42 +505,6 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||
p.lastLookup = now(chronos.Moment)
|
||||
return closestNodes
|
||||
|
||||
proc handleFoundContentMessage(p: PortalProtocol, m: ContentMessage,
|
||||
dst: Node, nodes: var seq[Node]): LookupResult =
|
||||
case m.contentMessageType:
|
||||
of connectionIdType:
|
||||
# TODO: We'd have to get the data through uTP, or wrap some proc around
|
||||
# this call that does that.
|
||||
LookupResult(kind: Content)
|
||||
of contentType:
|
||||
LookupResult(kind: Content, content: m.content)
|
||||
of enrsType:
|
||||
let records = recordsFromBytes(m.enrs)
|
||||
if records.isOk():
|
||||
let verifiedNodes =
|
||||
verifyNodesRecords(records.get(), dst, enrsResultLimit)
|
||||
nodes.add(verifiedNodes)
|
||||
|
||||
for n in nodes:
|
||||
# Attempt to add all nodes discovered
|
||||
discard p.routingTable.addNode(n)
|
||||
|
||||
LookupResult(kind: Nodes, nodes: nodes)
|
||||
else:
|
||||
LookupResult(kind: Content)
|
||||
|
||||
proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList):
|
||||
Future[LookupResult] {.async.} =
|
||||
var nodes: seq[Node]
|
||||
|
||||
let contentMessageResponse = await p.findContent(destNode, target)
|
||||
|
||||
if contentMessageResponse.isOk():
|
||||
return handleFoundContentMessage(
|
||||
p, contentMessageResponse.get(), destNode, nodes)
|
||||
else:
|
||||
return LookupResult(kind: Nodes, nodes: nodes)
|
||||
|
||||
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
||||
# networks will probably be very similar. Extract lookup function to separate module
|
||||
# and make it more generaic
|
||||
@ -478,7 +523,7 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
||||
for node in closestNodes:
|
||||
seen.incl(node.id)
|
||||
|
||||
var pendingQueries = newSeqOfCap[Future[LookupResult]](alpha)
|
||||
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
|
||||
|
||||
while true:
|
||||
var i = 0
|
||||
@ -487,7 +532,7 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
||||
while i < closestNodes.len and pendingQueries.len < alpha:
|
||||
let n = closestNodes[i]
|
||||
if not asked.containsOrIncl(n.id):
|
||||
pendingQueries.add(p.contentLookupWorker(n, target))
|
||||
pendingQueries.add(p.findContent(n, target))
|
||||
inc i
|
||||
|
||||
trace "Pending lookup queries", total = pendingQueries.len
|
||||
@ -504,29 +549,35 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
|
||||
else:
|
||||
error "Resulting query should have been in the pending queries"
|
||||
|
||||
let lookupResult = query.read
|
||||
let contentResult = query.read
|
||||
|
||||
# TODO: Remove node on timed-out query? To handle failure better, LookUpResult
|
||||
# should have third enum option like failure.
|
||||
case lookupResult.kind
|
||||
of Nodes:
|
||||
for n in lookupResult.nodes:
|
||||
if not seen.containsOrIncl(n.id):
|
||||
# If it wasn't seen before, insert node while remaining sorted
|
||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||
proc(x: Node, n: Node): int =
|
||||
cmp(p.routingTable.distance(x.id, targetId),
|
||||
p.routingTable.distance(n.id, targetId))
|
||||
))
|
||||
if contentResult.isOk():
|
||||
let content = contentResult.get()
|
||||
|
||||
if closestNodes.len > BUCKET_SIZE:
|
||||
closestNodes.del(closestNodes.high())
|
||||
of Content:
|
||||
# cancel any pending queries as we have find the content
|
||||
for f in pendingQueries:
|
||||
f.cancel()
|
||||
case content.kind
|
||||
of Nodes:
|
||||
for n in content.nodes:
|
||||
if not seen.containsOrIncl(n.id):
|
||||
discard p.routingTable.addNode(n)
|
||||
# If it wasn't seen before, insert node while remaining sorted
|
||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||
proc(x: Node, n: Node): int =
|
||||
cmp(p.routingTable.distance(x.id, targetId),
|
||||
p.routingTable.distance(n.id, targetId))
|
||||
))
|
||||
|
||||
return some(lookupResult.content)
|
||||
if closestNodes.len > BUCKET_SIZE:
|
||||
closestNodes.del(closestNodes.high())
|
||||
of Content:
|
||||
# cancel any pending queries as we have find the content
|
||||
for f in pendingQueries:
|
||||
f.cancel()
|
||||
|
||||
return some(content.content)
|
||||
else:
|
||||
# TODO: Should we do something with the node that failed responding our
|
||||
# query?
|
||||
discard
|
||||
|
||||
return none[ByteList]()
|
||||
|
||||
|
180
fluffy/network/wire/portal_stream.nim
Normal file
180
fluffy/network/wire/portal_stream.nim
Normal file
@ -0,0 +1,180 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
chronos, stew/byteutils, chronicles,
|
||||
eth/utp/utp_discv5_protocol,
|
||||
# even though utp_discv5_protocol exports this, import is still needed,
|
||||
# perhaps protocol.Protocol type of usage?
|
||||
eth/p2p/discoveryv5/protocol,
|
||||
./messages
|
||||
|
||||
export utp_discv5_protocol
|
||||
|
||||
const
|
||||
utpProtocolId = "utp".toBytes()
|
||||
connectionTimeout = 5.seconds
|
||||
|
||||
type
|
||||
ContentRequest = object
|
||||
connectionId: uint16
|
||||
nodeId: NodeId
|
||||
content: ByteList
|
||||
timeout: Moment
|
||||
|
||||
ContentOffer = object
|
||||
connectionId: uint16
|
||||
nodeId: NodeId
|
||||
contentKeys: ContentKeysList
|
||||
timeout: Moment
|
||||
|
||||
PortalStream* = ref object
|
||||
transport*: UtpDiscv5Protocol
|
||||
# TODO:
|
||||
# Decide on what's the better collection to use and set some limits in them
|
||||
# on how many uTP transfers allowed to happen concurrently.
|
||||
# Either set some limit, and drop whatever comes next. Unsure how to
|
||||
# communicate that with the peer however. Or have some more async waiting
|
||||
# until a spot becomes free, like with an AsyncQueue. Although the latter
|
||||
# probably can not be used here directly. This system however does needs
|
||||
# some agreement on timeout values of how long a uTP socket may be
|
||||
# "listening" before it times out because of inactivity.
|
||||
# Or, depending on the direction, it might also depend on the time out
|
||||
# values of the discovery v5 talkresp message.
|
||||
# TODO: Should the content key also be stored to be able to validate the
|
||||
# received data?
|
||||
contentRequests: seq[ContentRequest]
|
||||
contentOffers: seq[ContentOffer]
|
||||
rng: ref BrHmacDrbgContext
|
||||
|
||||
proc addContentOffer*(
|
||||
stream: PortalStream, nodeId: NodeId, contentKeys: ContentKeysList): Bytes2 =
|
||||
var connectionId: Bytes2
|
||||
brHmacDrbgGenerate(stream.rng[], connectionId)
|
||||
|
||||
# uTP protocol uses BE for all values in the header, incl. connection id.
|
||||
let id = uint16.fromBytesBE(connectionId)
|
||||
let contentOffer = ContentOffer(
|
||||
connectionId: id,
|
||||
nodeId: nodeId,
|
||||
contentKeys: contentKeys,
|
||||
timeout: Moment.now() + connectionTimeout)
|
||||
stream.contentOffers.add(contentOffer)
|
||||
|
||||
return connectionId
|
||||
|
||||
proc addContentRequest*(
|
||||
stream: PortalStream, nodeId: NodeId, content: ByteList): Bytes2 =
|
||||
var connectionId: Bytes2
|
||||
brHmacDrbgGenerate(stream.rng[], connectionId)
|
||||
|
||||
# uTP protocol uses BE for all values in the header, incl. connection id.
|
||||
let id = uint16.fromBytesBE(connectionId)
|
||||
let contentRequest = ContentRequest(
|
||||
connectionId: id,
|
||||
nodeId: nodeId,
|
||||
content: content,
|
||||
timeout: Moment.now() + connectionTimeout)
|
||||
stream.contentRequests.add(contentRequest)
|
||||
|
||||
return connectionId
|
||||
|
||||
proc writeAndClose(socket: UtpSocket[Node], data: seq[byte]) {.async.} =
|
||||
let dataWritten = await socket.write(data)
|
||||
if dataWritten.isErr():
|
||||
debug "Error writing requested data", error = dataWritten.error
|
||||
|
||||
await socket.closeWait()
|
||||
|
||||
proc readAndClose(socket: UtpSocket[Node]) {.async.} =
|
||||
# Read all bytes from the socket
|
||||
# This will either end with a FIN, or because the read action times out.
|
||||
# A FIN does not necessarily mean that the data read is complete. Further
|
||||
# validation is required, using a length prefix here might be beneficial for
|
||||
# this.
|
||||
var readData = socket.read()
|
||||
if await readData.withTimeout(1.seconds):
|
||||
# TODO: Content needs to be validated, stored and also offered again as part
|
||||
# of the neighborhood gossip. This will require access to the specific
|
||||
# Portal wire protocol for the network it was received on. Some async event
|
||||
# will probably be required for this.
|
||||
let content = readData.read
|
||||
echo content.toHex()
|
||||
else:
|
||||
debug "Reading data from socket timed out, content request failed"
|
||||
|
||||
await socket.closeWait()
|
||||
|
||||
proc pruneAllowedConnections(stream: PortalStream) =
|
||||
# Prune requests and offers that didn't receive a connection request
|
||||
# before `connectionTimeout`.
|
||||
let now = Moment.now()
|
||||
stream.contentRequests.keepIf(proc(x: ContentRequest): bool =
|
||||
x.timeout > now)
|
||||
stream.contentOffers.keepIf(proc(x: ContentOffer): bool =
|
||||
x.timeout > now)
|
||||
|
||||
# TODO: I think I'd like it more if we weren't to capture the stream.
|
||||
proc registerIncomingSocketCallback(
|
||||
stream: PortalStream): AcceptConnectionCallback[Node] =
|
||||
return (
|
||||
proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] =
|
||||
# Note: Connection id of uTP SYN is different from other packets, it is
|
||||
# actually the peers `send_conn_id`, opposed to `receive_conn_id` for all
|
||||
# other packets.
|
||||
for i, request in stream.contentRequests:
|
||||
if request.connectionId == client.connectionId and
|
||||
request.nodeId == client.remoteAddress.id:
|
||||
let fut = client.writeAndClose(request.content.asSeq())
|
||||
stream.contentRequests.del(i)
|
||||
return fut
|
||||
|
||||
for i, offer in stream.contentOffers:
|
||||
if offer.connectionId == client.connectionId and
|
||||
offer.nodeId == client.remoteAddress.id:
|
||||
let fut = client.readAndClose()
|
||||
stream.contentOffers.del(i)
|
||||
return fut
|
||||
|
||||
# TODO: Is there a scenario where this can happen,
|
||||
# considering `allowRegisteredIdCallback`? If not, doAssert?
|
||||
var fut = newFuture[void]("fluffy.AcceptConnectionCallback")
|
||||
fut.complete()
|
||||
return fut
|
||||
)
|
||||
|
||||
proc allowRegisteredIdCallback(
|
||||
stream: PortalStream): AllowConnectionCallback[Node] =
|
||||
return (
|
||||
proc(r: UtpRouter[Node], remoteAddress: Node, connectionId: uint16): bool =
|
||||
# stream.pruneAllowedConnections()
|
||||
# `connectionId` is the connection id ofthe uTP SYN packet header, thus
|
||||
# the peers `send_conn_id`.
|
||||
return
|
||||
stream.contentRequests.any(
|
||||
proc (x: ContentRequest): bool =
|
||||
x.connectionId == connectionId and x.nodeId == remoteAddress.id) or
|
||||
stream.contentOffers.any(
|
||||
proc (x: ContentOffer): bool =
|
||||
x.connectionId == connectionId and x.nodeId == remoteAddress.id)
|
||||
)
|
||||
|
||||
proc new*(T: type PortalStream, baseProtocol: protocol.Protocol): T =
|
||||
let
|
||||
stream = PortalStream(rng: baseProtocol.rng)
|
||||
socketConfig = SocketConfig.init(
|
||||
incomingSocketReceiveTimeout = none(Duration))
|
||||
|
||||
stream.transport = UtpDiscv5Protocol.new(
|
||||
baseProtocol,
|
||||
utpProtocolId,
|
||||
registerIncomingSocketCallback(stream),
|
||||
allowRegisteredIdCallback(stream),
|
||||
socketConfig)
|
||||
|
||||
stream
|
@ -76,7 +76,7 @@ proc installPortalApiHandlers*(
|
||||
enrs: Option[seq[Record]]]:
|
||||
let
|
||||
node = toNodeWithAddress(enr)
|
||||
content = await p.findContent(
|
||||
content = await p.findContentImpl(
|
||||
node, ByteList.init(hexToSeqByte(contentKey)))
|
||||
|
||||
if content.isErr():
|
||||
@ -107,6 +107,40 @@ proc installPortalApiHandlers*(
|
||||
records.get(), node, enrsResultLimit).map(
|
||||
proc(n: Node): Record = n.record)))
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_findContentExt") do(
|
||||
enr: Record, contentKey: string) -> tuple[
|
||||
content: Option[string], enrs: Option[seq[Record]]]:
|
||||
let
|
||||
node = toNodeWithAddress(enr)
|
||||
foundContentResult = await p.findContent(
|
||||
node, ByteList.init(hexToSeqByte(contentKey)))
|
||||
|
||||
if foundContentResult.isErr():
|
||||
raise newException(ValueError, $foundContentResult.error)
|
||||
else:
|
||||
let foundContent = foundContentResult.get()
|
||||
case foundContent.kind:
|
||||
of Content:
|
||||
return (
|
||||
some("0x" & foundContent.content.asSeq().toHex()),
|
||||
none(seq[Record]))
|
||||
of Nodes:
|
||||
return (
|
||||
none(string),
|
||||
some(foundContent.nodes.map(proc(n: Node): Record = n.record)))
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_offerExt") do(
|
||||
enr: Record, contentKey: string) -> bool:
|
||||
# Only allow 1 content key for now
|
||||
let
|
||||
node = toNodeWithAddress(enr)
|
||||
contentKeys = ContentKeysList(@[ByteList.init(hexToSeqByte(contentKey))])
|
||||
accept = await p.offer(node, contentKeys)
|
||||
if accept.isErr():
|
||||
raise newException(ValueError, $accept.error)
|
||||
else:
|
||||
return true
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_recursiveFindNodes") do() -> seq[Record]:
|
||||
let discovered = await p.queryRandom()
|
||||
return discovered.map(proc(n: Node): Record = n.record)
|
||||
|
28
fluffy/rpc/rpc_portal_debug_api.nim
Normal file
28
fluffy/rpc/rpc_portal_debug_api.nim
Normal file
@ -0,0 +1,28 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
json_rpc/[rpcproxy, rpcserver], stew/byteutils,
|
||||
../network/wire/portal_protocol,
|
||||
../content_db
|
||||
|
||||
export rpcserver
|
||||
|
||||
# Some RPCs that are (currently) useful for testing & debugging
|
||||
proc installPortalDebugApiHandlers*(
|
||||
rpcServer: RpcServer|RpcProxy, p: PortalProtocol, network: static string)
|
||||
{.raises: [Defect, CatchableError].} =
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_store") do(
|
||||
contentId: string, content: string) -> bool:
|
||||
# Using content id as parameter to make it more easy to store. Might evolve
|
||||
# in using content key.
|
||||
p.contentDB.put(hexToSeqByte(contentId), hexToSeqByte(content))
|
||||
|
||||
return true
|
@ -11,7 +11,7 @@ import
|
||||
chronos, testutils/unittests, stew/shims/net,
|
||||
eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
../network/wire/portal_protocol,
|
||||
../network/wire/[portal_protocol, portal_stream],
|
||||
../content_db,
|
||||
./test_helpers
|
||||
|
||||
@ -41,8 +41,11 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
|
||||
db1 = ContentDB.new("", inMemory = true)
|
||||
db2 = ContentDB.new("", inMemory = true)
|
||||
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler)
|
||||
stream1 = PortalStream.new(node1)
|
||||
stream2 = PortalStream.new(node2)
|
||||
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, stream1)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, stream2)
|
||||
|
||||
Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2)
|
||||
|
||||
@ -128,7 +131,7 @@ procSuite "Portal Wire Protocol Tests":
|
||||
|
||||
# content does not exist so this should provide us with the closest nodes
|
||||
# to the content, which is the only node in the routing table.
|
||||
let content = await test.proto1.findContent(test.proto2.localNode,
|
||||
let content = await test.proto1.findContentImpl(test.proto2.localNode,
|
||||
contentKey)
|
||||
|
||||
check:
|
||||
@ -141,7 +144,7 @@ procSuite "Portal Wire Protocol Tests":
|
||||
let test = defaultTestCase(rng)
|
||||
let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])]))
|
||||
|
||||
let accept = await test.proto1.offer(
|
||||
let accept = await test.proto1.offerImpl(
|
||||
test.proto2.baseProtocol.localNode, contentKeys)
|
||||
|
||||
check:
|
||||
@ -198,9 +201,9 @@ procSuite "Portal Wire Protocol Tests":
|
||||
db2 = ContentDB.new("", inMemory = true)
|
||||
db3 = ContentDB.new("", inMemory = true)
|
||||
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler)
|
||||
proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler)
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, nil)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, nil)
|
||||
proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler, nil)
|
||||
|
||||
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
|
||||
check proto1.addNode(node2.localNode) == Added
|
||||
@ -228,8 +231,8 @@ procSuite "Portal Wire Protocol Tests":
|
||||
db1 = ContentDB.new("", inMemory = true)
|
||||
db2 = ContentDB.new("", inMemory = true)
|
||||
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler,
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, nil)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, nil,
|
||||
bootstrapRecords = [node1.localNode.record])
|
||||
|
||||
proto1.start()
|
||||
@ -251,7 +254,7 @@ procSuite "Portal Wire Protocol Tests":
|
||||
|
||||
db = ContentDB.new("", inMemory = true)
|
||||
# No portal protocol for node1, hence an invalid bootstrap node
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db, testHandler,
|
||||
proto2 = PortalProtocol.new(node2, protocolId, db, testHandler, nil,
|
||||
bootstrapRecords = [node1.localNode.record])
|
||||
|
||||
# seedTable to add node1 to the routing table
|
||||
|
@ -46,8 +46,8 @@ procSuite "State Content Network":
|
||||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil)
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil)
|
||||
|
||||
check proto2.portalProtocol.addNode(node1.localNode) == Added
|
||||
|
||||
@ -102,9 +102,9 @@ procSuite "State Content Network":
|
||||
node3 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
|
||||
proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true))
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil)
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil)
|
||||
proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true), nil)
|
||||
|
||||
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
|
||||
check proto1.portalProtocol.addNode(node2.localNode) == Added
|
||||
@ -161,8 +161,8 @@ procSuite "State Content Network":
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
|
||||
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil)
|
||||
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil)
|
||||
|
||||
check (await node1.ping(node2.localNode)).isOk()
|
||||
check (await node2.ping(node1.localNode)).isOk()
|
||||
|
@ -15,7 +15,7 @@ import
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
../common/common_utils,
|
||||
../content_db,
|
||||
../network/wire/[messages, portal_protocol],
|
||||
../network/wire/[portal_protocol, portal_stream],
|
||||
../network/state/[state_content, state_network]
|
||||
|
||||
const
|
||||
@ -210,10 +210,11 @@ proc run(config: PortalCliConf) =
|
||||
|
||||
d.open()
|
||||
|
||||
let db = ContentDB.new("", inMemory = true)
|
||||
|
||||
let portal = PortalProtocol.new(d, config.protocolId, db, testHandler,
|
||||
bootstrapRecords = bootstrapRecords)
|
||||
let
|
||||
db = ContentDB.new("", inMemory = true)
|
||||
portalStream = PortalStream.new(d)
|
||||
portal = PortalProtocol.new(d, config.protocolId, db, testHandler,
|
||||
portalStream, bootstrapRecords = bootstrapRecords)
|
||||
|
||||
if config.metricsEnabled:
|
||||
let
|
||||
|
Loading…
x
Reference in New Issue
Block a user