diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 4f78e3580..d11b4388c 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -15,9 +15,11 @@ import eth/keys, eth/net/nat, eth/p2p/discoveryv5/protocol as discv5_protocol, ./conf, ./common/common_utils, - ./rpc/[rpc_eth_api, bridge_client, rpc_discovery_api, rpc_portal_api], + ./rpc/[rpc_eth_api, bridge_client, rpc_discovery_api, rpc_portal_api, + rpc_portal_debug_api], ./network/state/[state_network, state_content], ./network/history/[history_network, history_content], + ./network/wire/portal_stream, ./content_db proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = @@ -65,13 +67,17 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = # Store the database at contentdb prefixed with the first 8 chars of node id. # This is done because the content in the db is dependant on the `NodeId` and # the selected `Radius`. - let db = - ContentDB.new(config.dataDir / "db" / "contentdb_" & + let + db = ContentDB.new(config.dataDir / "db" / "contentdb_" & d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex()) - let - stateNetwork = StateNetwork.new(d, db, bootstrapRecords = bootstrapRecords) - historyNetwork = HistoryNetwork.new(d, db, bootstrapRecords = bootstrapRecords) + # One instance of PortalStream and thus UtpDiscv5Protocol is shared over all + # the Portal networks. + portalStream = PortalStream.new(d) + stateNetwork = StateNetwork.new(d, db, portalStream, + bootstrapRecords = bootstrapRecords) + historyNetwork = HistoryNetwork.new(d, db, portalStream, + bootstrapRecords = bootstrapRecords) # TODO: If no new network key is generated then we should first check if an # enr file exists, and in the case it does read out the seqNum from it and @@ -100,6 +106,8 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = rpcHttpServerWithProxy.installDiscoveryApiHandlers(d) rpcHttpServerWithProxy.installPortalApiHandlers(stateNetwork.portalProtocol, "state") rpcHttpServerWithProxy.installPortalApiHandlers(historyNetwork.portalProtocol, "history") + rpcHttpServerWithProxy.installPortalDebugApiHandlers(stateNetwork.portalProtocol, "state") + rpcHttpServerWithProxy.installPortalDebugApiHandlers(stateNetwork.portalProtocol, "history") # TODO for now we can only proxy to local node (or remote one without ssl) to make it possible # to call infura https://github.com/status-im/nim-json-rpc/pull/101 needs to get merged for http client to support https/ waitFor rpcHttpServerWithProxy.start() diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 8ecf9975f..15b9cbb82 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -10,7 +10,7 @@ import stew/results, chronos, eth/p2p/discoveryv5/[protocol, enr], ../../content_db, - ../wire/portal_protocol, + ../wire/[portal_protocol, portal_stream], ./history_content const @@ -51,11 +51,12 @@ proc new*( T: type HistoryNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, + portalStream: PortalStream, dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = []): T = let portalProtocol = PortalProtocol.new( baseProtocol, historyProtocolId, contentDB, toContentIdHandler, - dataRadius, bootstrapRecords) + portalStream, dataRadius, bootstrapRecords) return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 903e2fd5c..966989c39 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -10,7 +10,7 @@ import stew/results, chronos, eth/p2p/discoveryv5/[protocol, enr], ../../content_db, - ../wire/portal_protocol, + ../wire/[portal_protocol, portal_stream], ./state_content, ./state_distance @@ -51,11 +51,12 @@ proc new*( T: type StateNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, + portalStream: PortalStream, dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = []): T = let portalProtocol = PortalProtocol.new( baseProtocol, stateProtocolId, contentDB, toContentIdHandler, - dataRadius, bootstrapRecords, stateDistanceCalculator) + portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator) return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 32097ce2d..3aa86569e 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -14,8 +14,10 @@ import std/[sequtils, sets, algorithm], stew/results, chronicles, chronos, nimcrypto/hash, bearssl, ssz_serialization, - eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification], + eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, + nodes_verification], ../../content_db, + ./portal_stream, ./messages export messages, routing_table @@ -34,18 +36,6 @@ const initialLookups = 1 ## Amount of lookups done when populating the routing table type - ContentResultKind* = enum - ContentFound, ContentMissing, ContentKeyValidationFailure - - ContentResult* = object - case kind*: ContentResultKind - of ContentFound: - content*: seq[byte] - of ContentMissing: - contentId*: Uint256 - of ContentKeyValidationFailure: - error*: string - ToContentIdHandler* = proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.} @@ -62,18 +52,20 @@ type lastLookup: chronos.Moment refreshLoop: Future[void] revalidateLoop: Future[void] + stream: PortalStream PortalResult*[T] = Result[T, cstring] - LookupResultKind = enum - Nodes, Content + FoundContentKind* = enum + Nodes, + Content - LookupResult = object - case kind: LookupResultKind - of Nodes: - nodes: seq[Node] + FoundContent* = object + case kind*: FoundContentKind of Content: - content: ByteList + content*: ByteList + of Nodes: + nodes*: seq[Node] proc addNode*(p: PortalProtocol, node: Node): NodeStatus = p.routingTable.addNode(node) @@ -127,7 +119,8 @@ func handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] = let enrs = List[ByteList, 32](@[]) encodeMessage(NodesMessage(total: 1, enrs: enrs)) -proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = +proc handleFindContent( + p: PortalProtocol, fc: FindContentMessage, srcId: NodeId): seq[byte] = let contentIdOpt = p.toContentId(fc.contentKey) if contentIdOpt.isSome(): let @@ -142,8 +135,7 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = encodeMessage(ContentMessage( contentMessageType: contentType, content: ByteList(content))) else: - var connectionId: Bytes2 - brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId) + let connectionId = p.stream.addContentRequest(srcId, ByteList(content)) encodeMessage(ContentMessage( contentMessageType: connectionIdType, connectionId: connectionId)) @@ -162,8 +154,9 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = # discv5 layer. @[] -proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] = - var contentKeys = ContentKeysBitList.init(o.contentKeys.len) +proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] = + var contentKeysBitList = ContentKeysBitList.init(o.contentKeys.len) + var contentKeys = ContentKeysList.init(@[]) # TODO: Do we need some protection against a peer offering lots (64x) of # content that fits our Radius but is actually bogus? # Additional TODO, but more of a specification clarification: What if we don't @@ -175,16 +168,16 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] = let contentId = contentIdOpt.get() if p.inRange(contentId): if not p.contentDB.contains(contentId): - contentKeys.setBit(i) + contentKeysBitList.setBit(i) + discard contentKeys.add(contentKey) else: # Return empty response when content key validation fails return @[] - var connectionId: Bytes2 - brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId) + let connectionId = p.stream.addContentOffer(srcId, contentKeys) encodeMessage( - AcceptMessage(connectionId: connectionId, contentKeys: contentKeys)) + AcceptMessage(connectionId: connectionId, contentKeys: contentKeysBitList)) # TODO: Neighborhood gossip # After data has been received and validated from an offer, we need to @@ -220,9 +213,9 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte], of MessageKind.findnodes: p.handleFindNodes(message.findNodes) of MessageKind.findcontent: - p.handleFindContent(message.findcontent) + p.handleFindContent(message.findcontent, srcId) of MessageKind.offer: - p.handleOffer(message.offer) + p.handleOffer(message.offer, srcId) else: # This would mean a that Portal wire response message is being send over a # discv5 talkreq message. @@ -237,6 +230,7 @@ proc new*(T: type PortalProtocol, protocolId: PortalProtocolId, contentDB: ContentDB, toContentId: ToContentIdHandler, + stream: PortalStream, dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], distanceCalculator: DistanceCalculator = XorDistanceCalculator @@ -249,6 +243,7 @@ proc new*(T: type PortalProtocol, baseProtocol: baseProtocol, contentDB: contentDB, toContentId: toContentId, + stream: stream, dataRadius: dataRadius, bootstrapRecords: @bootstrapRecords) @@ -306,14 +301,14 @@ proc findNodes*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]): # TODO Add nodes validation return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn) -proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): +proc findContentImpl*(p: PortalProtocol, dst: Node, contentKey: ByteList): Future[PortalResult[ContentMessage]] {.async.} = let fc = FindContentMessage(contentKey: contentKey) trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc) -proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): +proc offerImpl*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): Future[PortalResult[AcceptMessage]] {.async.} = let offer = OfferMessage(contentKeys: contentKeys) @@ -321,10 +316,6 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer) - # TODO: Actually have to parse the accept message and get the uTP connection - # id, and initiate an uTP stream with given uTP connection id to get the data - # out. - proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]] = var records: seq[Record] for r in rawRecords.asSeq(): @@ -338,6 +329,96 @@ proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record] ok(records) +proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): + Future[PortalResult[FoundContent]] {.async.} = + let contentMessageResponse = await p.findContentImpl(dst, contentKey) + + if contentMessageResponse.isOk(): + let m = contentMessageResponse.get() + case m.contentMessageType: + of connectionIdType: + # uTP protocol uses BE for all values in the header, incl. connection id + let socketRes = await p.stream.transport.connectTo( + dst, uint16.fromBytesBE(m.connectionId)) + if socketRes.isErr(): + # TODO: get proper error mapped + return err("Error connecting to uTP socket") + let socket = socketRes.get() + if not socket.isConnected(): + socket.close() + return err("Portal uTP socket is not in connected state") + + # Read all bytes from the socket + # This will either end with a FIN, or because the read action times out. + # A FIN does not necessarily mean that the data read is complete. Further + # validation is required, using a length prefix here might be beneficial for + # this. + let readData = socket.read() + if await readData.withTimeout(1.seconds): + let content = readData.read + await socket.closeWait() + return ok(FoundContent(kind: Content, content: ByteList(content))) + else: + await socket.closeWait() + return err("Reading data from socket timed out, content request failed") + of contentType: + return ok(FoundContent(kind: Content, content: m.content)) + of enrsType: + let records = recordsFromBytes(m.enrs) + if records.isOk(): + let verifiedNodes = + verifyNodesRecords(records.get(), dst, enrsResultLimit) + + return ok(FoundContent(kind: Nodes, nodes: verifiedNodes)) + else: + return err("Content message returned invalid ENRs") + +# TODO: Depending on how this gets used, it might be better not to request +# the data from the database here, but pass it as parameter. (like, if it was +# just received it and now needs to be forwarded) +proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): + Future[PortalResult[void]] {.async.} = + let acceptMessageResponse = await p.offerImpl(dst, contentKeys) + + if acceptMessageResponse.isOk(): + let m = acceptMessageResponse.get() + + let clientSocketRes = await p.stream.transport.connectTo( + dst, uint16.fromBytesBE(m.connectionId)) + if clientSocketRes.isErr(): + # TODO: get proper error mapped + return err("Error connecting to uTP socket") + let clientSocket = clientSocketRes.get() + if not clientSocket.isConnected(): + clientSocket.close() + return err("Portal uTP socket is not in connected state") + + # Filter contentKeys with bitlist + var requestedContentKeys: seq[ByteList] + for i, b in m.contentKeys: + if b: + requestedContentKeys.add(contentKeys[i]) + + for contentKey in requestedContentKeys: + let contentIdOpt = p.toContentId(contentKey) + if contentIdOpt.isSome(): + let + contentId = contentIdOpt.get() + maybeContent = p.contentDB.get(contentId) + if maybeContent.isSome(): + let content = maybeContent.get() + let dataWritten = await clientSocket.write(content) + if dataWritten.isErr: + error "Error writing requested data", error = dataWritten.error + # No point in trying to continue writing data + clientSocket.close() + return err("Error writing requested data") + + await clientSocket.closeWait() + return ok() + else: + return err("No accept response") + proc findNodesVerified*( p: PortalProtocol, dst: Node, distances: seq[uint16]): Future[PortalResult[seq[Node]]] {.async.} = @@ -424,42 +505,6 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} = p.lastLookup = now(chronos.Moment) return closestNodes -proc handleFoundContentMessage(p: PortalProtocol, m: ContentMessage, - dst: Node, nodes: var seq[Node]): LookupResult = - case m.contentMessageType: - of connectionIdType: - # TODO: We'd have to get the data through uTP, or wrap some proc around - # this call that does that. - LookupResult(kind: Content) - of contentType: - LookupResult(kind: Content, content: m.content) - of enrsType: - let records = recordsFromBytes(m.enrs) - if records.isOk(): - let verifiedNodes = - verifyNodesRecords(records.get(), dst, enrsResultLimit) - nodes.add(verifiedNodes) - - for n in nodes: - # Attempt to add all nodes discovered - discard p.routingTable.addNode(n) - - LookupResult(kind: Nodes, nodes: nodes) - else: - LookupResult(kind: Content) - -proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList): - Future[LookupResult] {.async.} = - var nodes: seq[Node] - - let contentMessageResponse = await p.findContent(destNode, target) - - if contentMessageResponse.isOk(): - return handleFoundContentMessage( - p, contentMessageResponse.get(), destNode, nodes) - else: - return LookupResult(kind: Nodes, nodes: nodes) - # TODO ContentLookup and Lookup look almost exactly the same, also lookups in other # networks will probably be very similar. Extract lookup function to separate module # and make it more generaic @@ -478,7 +523,7 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): for node in closestNodes: seen.incl(node.id) - var pendingQueries = newSeqOfCap[Future[LookupResult]](alpha) + var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha) while true: var i = 0 @@ -487,7 +532,7 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): while i < closestNodes.len and pendingQueries.len < alpha: let n = closestNodes[i] if not asked.containsOrIncl(n.id): - pendingQueries.add(p.contentLookupWorker(n, target)) + pendingQueries.add(p.findContent(n, target)) inc i trace "Pending lookup queries", total = pendingQueries.len @@ -504,29 +549,35 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): else: error "Resulting query should have been in the pending queries" - let lookupResult = query.read + let contentResult = query.read - # TODO: Remove node on timed-out query? To handle failure better, LookUpResult - # should have third enum option like failure. - case lookupResult.kind - of Nodes: - for n in lookupResult.nodes: - if not seen.containsOrIncl(n.id): - # If it wasn't seen before, insert node while remaining sorted - closestNodes.insert(n, closestNodes.lowerBound(n, - proc(x: Node, n: Node): int = - cmp(p.routingTable.distance(x.id, targetId), - p.routingTable.distance(n.id, targetId)) - )) + if contentResult.isOk(): + let content = contentResult.get() - if closestNodes.len > BUCKET_SIZE: - closestNodes.del(closestNodes.high()) - of Content: - # cancel any pending queries as we have find the content - for f in pendingQueries: - f.cancel() + case content.kind + of Nodes: + for n in content.nodes: + if not seen.containsOrIncl(n.id): + discard p.routingTable.addNode(n) + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(p.routingTable.distance(x.id, targetId), + p.routingTable.distance(n.id, targetId)) + )) - return some(lookupResult.content) + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + of Content: + # cancel any pending queries as we have find the content + for f in pendingQueries: + f.cancel() + + return some(content.content) + else: + # TODO: Should we do something with the node that failed responding our + # query? + discard return none[ByteList]() diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim new file mode 100644 index 000000000..478bef0ed --- /dev/null +++ b/fluffy/network/wire/portal_stream.nim @@ -0,0 +1,180 @@ +# Nimbus +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + std/sequtils, + chronos, stew/byteutils, chronicles, + eth/utp/utp_discv5_protocol, + # even though utp_discv5_protocol exports this, import is still needed, + # perhaps protocol.Protocol type of usage? + eth/p2p/discoveryv5/protocol, + ./messages + +export utp_discv5_protocol + +const + utpProtocolId = "utp".toBytes() + connectionTimeout = 5.seconds + +type + ContentRequest = object + connectionId: uint16 + nodeId: NodeId + content: ByteList + timeout: Moment + + ContentOffer = object + connectionId: uint16 + nodeId: NodeId + contentKeys: ContentKeysList + timeout: Moment + + PortalStream* = ref object + transport*: UtpDiscv5Protocol + # TODO: + # Decide on what's the better collection to use and set some limits in them + # on how many uTP transfers allowed to happen concurrently. + # Either set some limit, and drop whatever comes next. Unsure how to + # communicate that with the peer however. Or have some more async waiting + # until a spot becomes free, like with an AsyncQueue. Although the latter + # probably can not be used here directly. This system however does needs + # some agreement on timeout values of how long a uTP socket may be + # "listening" before it times out because of inactivity. + # Or, depending on the direction, it might also depend on the time out + # values of the discovery v5 talkresp message. + # TODO: Should the content key also be stored to be able to validate the + # received data? + contentRequests: seq[ContentRequest] + contentOffers: seq[ContentOffer] + rng: ref BrHmacDrbgContext + +proc addContentOffer*( + stream: PortalStream, nodeId: NodeId, contentKeys: ContentKeysList): Bytes2 = + var connectionId: Bytes2 + brHmacDrbgGenerate(stream.rng[], connectionId) + + # uTP protocol uses BE for all values in the header, incl. connection id. + let id = uint16.fromBytesBE(connectionId) + let contentOffer = ContentOffer( + connectionId: id, + nodeId: nodeId, + contentKeys: contentKeys, + timeout: Moment.now() + connectionTimeout) + stream.contentOffers.add(contentOffer) + + return connectionId + +proc addContentRequest*( + stream: PortalStream, nodeId: NodeId, content: ByteList): Bytes2 = + var connectionId: Bytes2 + brHmacDrbgGenerate(stream.rng[], connectionId) + + # uTP protocol uses BE for all values in the header, incl. connection id. + let id = uint16.fromBytesBE(connectionId) + let contentRequest = ContentRequest( + connectionId: id, + nodeId: nodeId, + content: content, + timeout: Moment.now() + connectionTimeout) + stream.contentRequests.add(contentRequest) + + return connectionId + +proc writeAndClose(socket: UtpSocket[Node], data: seq[byte]) {.async.} = + let dataWritten = await socket.write(data) + if dataWritten.isErr(): + debug "Error writing requested data", error = dataWritten.error + + await socket.closeWait() + +proc readAndClose(socket: UtpSocket[Node]) {.async.} = + # Read all bytes from the socket + # This will either end with a FIN, or because the read action times out. + # A FIN does not necessarily mean that the data read is complete. Further + # validation is required, using a length prefix here might be beneficial for + # this. + var readData = socket.read() + if await readData.withTimeout(1.seconds): + # TODO: Content needs to be validated, stored and also offered again as part + # of the neighborhood gossip. This will require access to the specific + # Portal wire protocol for the network it was received on. Some async event + # will probably be required for this. + let content = readData.read + echo content.toHex() + else: + debug "Reading data from socket timed out, content request failed" + + await socket.closeWait() + +proc pruneAllowedConnections(stream: PortalStream) = + # Prune requests and offers that didn't receive a connection request + # before `connectionTimeout`. + let now = Moment.now() + stream.contentRequests.keepIf(proc(x: ContentRequest): bool = + x.timeout > now) + stream.contentOffers.keepIf(proc(x: ContentOffer): bool = + x.timeout > now) + +# TODO: I think I'd like it more if we weren't to capture the stream. +proc registerIncomingSocketCallback( + stream: PortalStream): AcceptConnectionCallback[Node] = + return ( + proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] = + # Note: Connection id of uTP SYN is different from other packets, it is + # actually the peers `send_conn_id`, opposed to `receive_conn_id` for all + # other packets. + for i, request in stream.contentRequests: + if request.connectionId == client.connectionId and + request.nodeId == client.remoteAddress.id: + let fut = client.writeAndClose(request.content.asSeq()) + stream.contentRequests.del(i) + return fut + + for i, offer in stream.contentOffers: + if offer.connectionId == client.connectionId and + offer.nodeId == client.remoteAddress.id: + let fut = client.readAndClose() + stream.contentOffers.del(i) + return fut + + # TODO: Is there a scenario where this can happen, + # considering `allowRegisteredIdCallback`? If not, doAssert? + var fut = newFuture[void]("fluffy.AcceptConnectionCallback") + fut.complete() + return fut + ) + +proc allowRegisteredIdCallback( + stream: PortalStream): AllowConnectionCallback[Node] = + return ( + proc(r: UtpRouter[Node], remoteAddress: Node, connectionId: uint16): bool = + # stream.pruneAllowedConnections() + # `connectionId` is the connection id ofthe uTP SYN packet header, thus + # the peers `send_conn_id`. + return + stream.contentRequests.any( + proc (x: ContentRequest): bool = + x.connectionId == connectionId and x.nodeId == remoteAddress.id) or + stream.contentOffers.any( + proc (x: ContentOffer): bool = + x.connectionId == connectionId and x.nodeId == remoteAddress.id) + ) + +proc new*(T: type PortalStream, baseProtocol: protocol.Protocol): T = + let + stream = PortalStream(rng: baseProtocol.rng) + socketConfig = SocketConfig.init( + incomingSocketReceiveTimeout = none(Duration)) + + stream.transport = UtpDiscv5Protocol.new( + baseProtocol, + utpProtocolId, + registerIncomingSocketCallback(stream), + allowRegisteredIdCallback(stream), + socketConfig) + + stream diff --git a/fluffy/rpc/rpc_portal_api.nim b/fluffy/rpc/rpc_portal_api.nim index 93a3b966d..cfb9a3c71 100644 --- a/fluffy/rpc/rpc_portal_api.nim +++ b/fluffy/rpc/rpc_portal_api.nim @@ -76,7 +76,7 @@ proc installPortalApiHandlers*( enrs: Option[seq[Record]]]: let node = toNodeWithAddress(enr) - content = await p.findContent( + content = await p.findContentImpl( node, ByteList.init(hexToSeqByte(contentKey))) if content.isErr(): @@ -107,6 +107,40 @@ proc installPortalApiHandlers*( records.get(), node, enrsResultLimit).map( proc(n: Node): Record = n.record))) + rpcServer.rpc("portal_" & network & "_findContentExt") do( + enr: Record, contentKey: string) -> tuple[ + content: Option[string], enrs: Option[seq[Record]]]: + let + node = toNodeWithAddress(enr) + foundContentResult = await p.findContent( + node, ByteList.init(hexToSeqByte(contentKey))) + + if foundContentResult.isErr(): + raise newException(ValueError, $foundContentResult.error) + else: + let foundContent = foundContentResult.get() + case foundContent.kind: + of Content: + return ( + some("0x" & foundContent.content.asSeq().toHex()), + none(seq[Record])) + of Nodes: + return ( + none(string), + some(foundContent.nodes.map(proc(n: Node): Record = n.record))) + + rpcServer.rpc("portal_" & network & "_offerExt") do( + enr: Record, contentKey: string) -> bool: + # Only allow 1 content key for now + let + node = toNodeWithAddress(enr) + contentKeys = ContentKeysList(@[ByteList.init(hexToSeqByte(contentKey))]) + accept = await p.offer(node, contentKeys) + if accept.isErr(): + raise newException(ValueError, $accept.error) + else: + return true + rpcServer.rpc("portal_" & network & "_recursiveFindNodes") do() -> seq[Record]: let discovered = await p.queryRandom() return discovered.map(proc(n: Node): Record = n.record) diff --git a/fluffy/rpc/rpc_portal_debug_api.nim b/fluffy/rpc/rpc_portal_debug_api.nim new file mode 100644 index 000000000..1ff56322c --- /dev/null +++ b/fluffy/rpc/rpc_portal_debug_api.nim @@ -0,0 +1,28 @@ +# Nimbus +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + json_rpc/[rpcproxy, rpcserver], stew/byteutils, + ../network/wire/portal_protocol, + ../content_db + +export rpcserver + +# Some RPCs that are (currently) useful for testing & debugging +proc installPortalDebugApiHandlers*( + rpcServer: RpcServer|RpcProxy, p: PortalProtocol, network: static string) + {.raises: [Defect, CatchableError].} = + + rpcServer.rpc("portal_" & network & "_store") do( + contentId: string, content: string) -> bool: + # Using content id as parameter to make it more easy to store. Might evolve + # in using content key. + p.contentDB.put(hexToSeqByte(contentId), hexToSeqByte(content)) + + return true diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index e7333785c..2725e968f 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -11,7 +11,7 @@ import chronos, testutils/unittests, stew/shims/net, eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2], eth/p2p/discoveryv5/protocol as discv5_protocol, - ../network/wire/portal_protocol, + ../network/wire/[portal_protocol, portal_stream], ../content_db, ./test_helpers @@ -41,8 +41,11 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = db1 = ContentDB.new("", inMemory = true) db2 = ContentDB.new("", inMemory = true) - proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler) + stream1 = PortalStream.new(node1) + stream2 = PortalStream.new(node2) + + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, stream1) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, stream2) Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2) @@ -128,7 +131,7 @@ procSuite "Portal Wire Protocol Tests": # content does not exist so this should provide us with the closest nodes # to the content, which is the only node in the routing table. - let content = await test.proto1.findContent(test.proto2.localNode, + let content = await test.proto1.findContentImpl(test.proto2.localNode, contentKey) check: @@ -141,7 +144,7 @@ procSuite "Portal Wire Protocol Tests": let test = defaultTestCase(rng) let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])])) - let accept = await test.proto1.offer( + let accept = await test.proto1.offerImpl( test.proto2.baseProtocol.localNode, contentKeys) check: @@ -198,9 +201,9 @@ procSuite "Portal Wire Protocol Tests": db2 = ContentDB.new("", inMemory = true) db3 = ContentDB.new("", inMemory = true) - proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler) - proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler) + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, nil) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, nil) + proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler, nil) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content check proto1.addNode(node2.localNode) == Added @@ -228,8 +231,8 @@ procSuite "Portal Wire Protocol Tests": db1 = ContentDB.new("", inMemory = true) db2 = ContentDB.new("", inMemory = true) - proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, nil) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, nil, bootstrapRecords = [node1.localNode.record]) proto1.start() @@ -251,7 +254,7 @@ procSuite "Portal Wire Protocol Tests": db = ContentDB.new("", inMemory = true) # No portal protocol for node1, hence an invalid bootstrap node - proto2 = PortalProtocol.new(node2, protocolId, db, testHandler, + proto2 = PortalProtocol.new(node2, protocolId, db, testHandler, nil, bootstrapRecords = [node1.localNode.record]) # seedTable to add node1 to the routing table diff --git a/fluffy/tests/test_state_network.nim b/fluffy/tests/test_state_network.nim index f7431c3a3..424c6e78c 100644 --- a/fluffy/tests/test_state_network.nim +++ b/fluffy/tests/test_state_network.nim @@ -46,8 +46,8 @@ procSuite "State Content Network": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil) + proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil) check proto2.portalProtocol.addNode(node1.localNode) == Added @@ -102,9 +102,9 @@ procSuite "State Content Network": node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) - proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil) + proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil) + proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true), nil) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content check proto1.portalProtocol.addNode(node2.localNode) == Added @@ -161,8 +161,8 @@ procSuite "State Content Network": rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true), nil) + proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true), nil) check (await node1.ping(node2.localNode)).isOk() check (await node2.ping(node1.localNode)).isOk() diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index a2b5d64f9..fa4ba25d1 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -15,7 +15,7 @@ import eth/p2p/discoveryv5/protocol as discv5_protocol, ../common/common_utils, ../content_db, - ../network/wire/[messages, portal_protocol], + ../network/wire/[portal_protocol, portal_stream], ../network/state/[state_content, state_network] const @@ -210,10 +210,11 @@ proc run(config: PortalCliConf) = d.open() - let db = ContentDB.new("", inMemory = true) - - let portal = PortalProtocol.new(d, config.protocolId, db, testHandler, - bootstrapRecords = bootstrapRecords) + let + db = ContentDB.new("", inMemory = true) + portalStream = PortalStream.new(d) + portal = PortalProtocol.new(d, config.protocolId, db, testHandler, + portalStream, bootstrapRecords = bootstrapRecords) if config.metricsEnabled: let