Fluffy Portal RPC Client (#2649)

* Portal rpc client implementation.
This commit is contained in:
bhartnett 2024-10-02 12:23:13 +08:00 committed by GitHub
parent 216604d0d6
commit 44fea2348b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 449 additions and 6 deletions

View File

@ -6,7 +6,146 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import import
std/strutils,
chronos,
stew/byteutils,
results,
eth/common/eth_types,
json_rpc/rpcclient, json_rpc/rpcclient,
../common/common_types,
../network/history/[history_content, history_network],
./rpc_calls/[rpc_discovery_calls, rpc_portal_calls, rpc_portal_debug_calls] ./rpc_calls/[rpc_discovery_calls, rpc_portal_calls, rpc_portal_debug_calls]
export rpcclient, rpc_discovery_calls, rpc_portal_calls, rpc_portal_debug_calls export
rpcclient, rpc_discovery_calls, rpc_portal_calls, rpc_portal_debug_calls, results,
eth_types
type
PortalRpcClient* = distinct RpcClient
PortalRpcError* = enum
ContentNotFound
InvalidContentKey
InvalidContentValue
ContentValidationFailed
proc init*(T: type PortalRpcClient, rpcClient: RpcClient): T =
T(rpcClient)
func toPortalRpcError(e: ref CatchableError): PortalRpcError =
# TODO: Update this to parse the error message json
if e.msg.contains("-39001"):
ContentNotFound
elif e.msg.contains("-32602"):
InvalidContentKey
else:
raiseAssert(e.msg) # Shouldn't happen
proc historyLocalContent(
client: PortalRpcClient, contentKey: string
): Future[Result[string, PortalRpcError]] {.async: (raises: []).} =
try:
let content = await RpcClient(client).portal_historyLocalContent(contentKey)
ok(content)
except CatchableError as e:
err(e.toPortalRpcError())
proc historyRecursiveFindContent(
client: PortalRpcClient, contentKey: string
): Future[Result[string, PortalRpcError]] {.async: (raises: []).} =
try:
let contentInfo =
await RpcClient(client).portal_historyRecursiveFindContent(contentKey)
ok(contentInfo.content)
except CatchableError as e:
err(e.toPortalRpcError())
template toBytes(content: string): seq[byte] =
try:
hexToSeqByte(content)
except ValueError as e:
raiseAssert(e.msg)
template valueOrErr[T](res: Result[T, string], error: PortalRpcError): auto =
if res.isOk():
ok(res.value)
else:
err(error)
proc historyGetContent(
client: PortalRpcClient, contentKey: string
): Future[Result[string, PortalRpcError]] {.async: (raises: []).} =
# Look up the content from the local db before trying to get it from the network
let content = (await client.historyLocalContent(contentKey)).valueOr:
if error == ContentNotFound:
?await client.historyRecursiveFindContent(contentKey)
else:
return err(error)
ok(content)
proc historyGetBlockHeader*(
client: PortalRpcClient, blockHash: BlockHash, validateContent = true
): Future[Result[BlockHeader, PortalRpcError]] {.async: (raises: []).} =
## Fetches the block header for the given hash from the Portal History Network.
## The data is first looked up in the node's local database before trying to
## fetch it from the network.
##
## Note: This does not validate that the returned header is part of the canonical
## chain, it only validates that the header matches the block hash. For example,
## a malicious portal node could return a valid but non-canonical header such
## as an uncle block that matches the block hash. For this reason the caller
## needs to use another method to verify the header is part of the canonical chain.
let
contentKey = blockHeaderContentKey(blockHash).encode().asSeq().to0xHex()
content = ?await client.historyGetContent(contentKey)
headerWithProof = decodeSsz(content.toBytes(), BlockHeaderWithProof).valueOr:
return err(InvalidContentValue)
headerBytes = headerWithProof.header.asSeq()
if validateContent:
validateBlockHeaderBytes(headerBytes, blockHash).valueOrErr(ContentValidationFailed)
else:
decodeRlp(headerBytes, BlockHeader).valueOrErr(InvalidContentValue)
proc historyGetBlockBody*(
client: PortalRpcClient, blockHash: BlockHash, validateContent = true
): Future[Result[BlockBody, PortalRpcError]] {.async: (raises: []).} =
## Fetches the block body for the given block hash from the Portal History
## Network. The data is first looked up in the node's local database before
## trying to fetch it from the network. If validateContent is true, the
## block header is fetched first in order to run the content validation.
let
contentKey = blockBodyContentKey(blockHash).encode().asSeq().to0xHex()
content = ?await client.historyGetContent(contentKey)
if validateContent:
let blockHeader = ?await client.historyGetBlockHeader(blockHash)
validateBlockBodyBytes(content.toBytes(), blockHeader).valueOrErr(
ContentValidationFailed
)
else:
decodeBlockBodyBytes(content.toBytes()).valueOrErr(InvalidContentValue)
proc historyGetReceipts*(
client: PortalRpcClient, blockHash: BlockHash, validateContent = true
): Future[Result[seq[Receipt], PortalRpcError]] {.async: (raises: []).} =
## Fetches the receipts for the given block hash from the Portal History
## Network. The data is first looked up in the node's local database before
## trying to fetch it from the network. If validateContent is true, the
## block header is fetched first in order to run the content validation.
let
contentKey = receiptsContentKey(blockHash).encode().asSeq().to0xHex()
content = ?await client.historyGetContent(contentKey)
if validateContent:
let blockHeader = ?await client.historyGetBlockHeader(blockHash)
validateReceiptsBytes(content.toBytes(), blockHeader.receiptsRoot).valueOrErr(
ContentValidationFailed
)
else:
let receipts = decodeSsz(content.toBytes(), PortalReceipts).valueOr:
return err(InvalidContentValue)
seq[Receipt].fromPortalReceipts(receipts).valueOrErr(InvalidContentValue)

View File

@ -9,8 +9,8 @@
import import
./test_content_db, ./test_content_db,
./test_discovery_rpc,
./wire_protocol_tests/all_wire_protocol_tests, ./wire_protocol_tests/all_wire_protocol_tests,
./history_network_tests/all_history_network_tests, ./history_network_tests/all_history_network_tests,
./beacon_network_tests/all_beacon_network_tests, ./beacon_network_tests/all_beacon_network_tests,
./state_network_tests/all_state_network_tests ./state_network_tests/all_state_network_tests,
./rpc_tests/all_rpc_tests

View File

@ -0,0 +1,10 @@
# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.warning[UnusedImport]: off.}
import ./test_discovery_rpc, ./test_portal_rpc_client

View File

@ -16,8 +16,8 @@ import
eth/p2p/discoveryv5/enr, eth/p2p/discoveryv5/enr,
eth/keys, eth/keys,
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
../rpc/rpc_discovery_api, ../../rpc/rpc_discovery_api,
./test_helpers ../test_helpers
type TestCase = ref object type TestCase = ref object
localDiscovery: discv5_protocol.Protocol localDiscovery: discv5_protocol.Protocol
@ -30,7 +30,7 @@ proc setupTest(rng: ref HmacDrbgContext): Future[TestCase] {.async.} =
localSrvPort = 0 # let the OS choose a port localSrvPort = 0 # let the OS choose a port
ta = initTAddress(localSrvAddress, localSrvPort) ta = initTAddress(localSrvAddress, localSrvPort)
localDiscoveryNode = localDiscoveryNode =
initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(20302)) initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(20332))
client = newRpcHttpClient() client = newRpcHttpClient()
let rpcHttpServer = RpcHttpServer.new() let rpcHttpServer = RpcHttpServer.new()

View File

@ -0,0 +1,294 @@
# Nimbus - Portal Network
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronos,
testutils/unittests,
json_rpc/rpcserver,
json_rpc/clients/httpclient,
stint,
eth/p2p/discoveryv5/enr,
eth/keys,
eth/p2p/discoveryv5/protocol as discv5_protocol,
../../network/wire/[portal_protocol, portal_stream, portal_protocol_config],
../../network/history/
[history_network, history_content, validation/historical_hashes_accumulator],
../../database/content_db,
../../rpc/[portal_rpc_client, rpc_portal_api],
../test_helpers
type HistoryNode = ref object
discoveryProtocol*: discv5_protocol.Protocol
historyNetwork*: HistoryNetwork
proc newHistoryNode(rng: ref HmacDrbgContext, port: int): HistoryNode =
let
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
db = ContentDB.new(
"", uint32.high, RadiusConfig(kind: Dynamic), node.localNode.id, inMemory = true
)
streamManager = StreamManager.new(node)
historyNetwork = HistoryNetwork.new(
PortalNetwork.none, node, db, streamManager, FinishedHistoricalHashesAccumulator()
)
return HistoryNode(discoveryProtocol: node, historyNetwork: historyNetwork)
proc portalProtocol(hn: HistoryNode): PortalProtocol =
hn.historyNetwork.portalProtocol
proc localNode(hn: HistoryNode): Node =
hn.discoveryProtocol.localNode
proc start(hn: HistoryNode) =
hn.historyNetwork.start()
proc stop(hn: HistoryNode) {.async.} =
discard hn.historyNetwork.stop()
await hn.discoveryProtocol.closeWait()
proc containsId(hn: HistoryNode, contentId: ContentId): bool =
return hn.historyNetwork.contentDB.get(contentId).isSome()
proc store*(hn: HistoryNode, blockHash: BlockHash, blockHeader: BlockHeader) =
let
headerRlp = rlp.encode(blockHeader)
blockHeaderWithProof = BlockHeaderWithProof(
header: ByteList[2048].init(headerRlp), proof: BlockHeaderProof.init()
)
contentKeyBytes = blockHeaderContentKey(blockHash).encode()
contentId = history_content.toContentId(contentKeyBytes)
hn.portalProtocol().storeContent(
contentKeyBytes, contentId, SSZ.encode(blockHeaderWithProof)
)
proc store*(hn: HistoryNode, blockHash: BlockHash, blockBody: BlockBody) =
let
contentKeyBytes = blockBodyContentKey(blockHash).encode()
contentId = history_content.toContentId(contentKeyBytes)
hn.portalProtocol().storeContent(contentKeyBytes, contentId, blockBody.encode())
proc store*(hn: HistoryNode, blockHash: BlockHash, receipts: seq[Receipt]) =
let
contentKeyBytes = receiptsContentKey(blockHash).encode()
contentId = history_content.toContentId(contentKeyBytes)
hn.portalProtocol().storeContent(contentKeyBytes, contentId, receipts.encode())
type TestCase = ref object
historyNode: HistoryNode
server: RpcHttpServer
client: PortalRpcClient
proc setupTest(rng: ref HmacDrbgContext): Future[TestCase] {.async.} =
let
localSrvAddress = "127.0.0.1"
localSrvPort = 0 # let the OS choose a port
ta = initTAddress(localSrvAddress, localSrvPort)
client = newRpcHttpClient()
historyNode = newHistoryNode(rng, 20333)
let rpcHttpServer = RpcHttpServer.new()
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)
rpcHttpServer.installPortalApiHandlers(
historyNode.historyNetwork.portalProtocol, "history"
)
rpcHttpServer.start()
await client.connect(localSrvAddress, rpcHttpServer.localAddress[0].port, false)
return TestCase(
historyNode: historyNode,
server: rpcHttpServer,
client: PortalRpcClient.init(client),
)
proc stop(testCase: TestCase) {.async.} =
await testCase.server.stop()
await testCase.server.closeWait()
await testCase.historyNode.stop()
procSuite "Portal RPC Client":
let rng = newRng()
asyncTest "Test historyGetBlockHeader with validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 100)
blockHash = blockHeader.blockHash()
# Test content not found
block:
let blockHeaderRes =
await tc.client.historyGetBlockHeader(blockHash, validateContent = true)
check:
blockHeaderRes.isErr()
blockHeaderRes.error() == ContentNotFound
# Test content found
block:
tc.historyNode.store(blockHash, blockHeader)
let blockHeaderRes =
await tc.client.historyGetBlockHeader(blockHash, validateContent = true)
check:
blockHeaderRes.isOk()
blockHeaderRes.value() == blockHeader
# Test content validation failed
block:
tc.historyNode.store(blockHash, BlockHeader()) # bad header
let blockHeaderRes =
await tc.client.historyGetBlockHeader(blockHash, validateContent = true)
check:
blockHeaderRes.isErr()
blockHeaderRes.error() == ContentValidationFailed
waitFor tc.stop()
asyncTest "Test historyGetBlockHeader without validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 200)
blockHash = blockHeader.blockHash()
# Test content not found
block:
let blockHeaderRes =
await tc.client.historyGetBlockHeader(blockHash, validateContent = false)
check:
blockHeaderRes.isErr()
blockHeaderRes.error() == ContentNotFound
tc.historyNode.store(blockHash, blockHeader)
# Test content found
block:
let blockHeaderRes =
await tc.client.historyGetBlockHeader(blockHash, validateContent = false)
check:
blockHeaderRes.isOk()
blockHeaderRes.value() == blockHeader
waitFor tc.stop()
asyncTest "Test historyGetBlockBody with validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 300)
blockBody = BlockBody()
blockHash = blockHeader.blockHash()
# Test content not found
block:
let blockBodyRes =
await tc.client.historyGetBlockBody(blockHash, validateContent = true)
check:
blockBodyRes.isErr()
blockBodyRes.error() == ContentNotFound
# Test content validation failed
block:
tc.historyNode.store(blockHash, blockHeader)
tc.historyNode.store(blockHash, blockBody)
let blockBodyRes =
await tc.client.historyGetBlockBody(blockHash, validateContent = true)
check:
blockBodyRes.isErr()
blockBodyRes.error() == ContentValidationFailed
waitFor tc.stop()
asyncTest "Test historyGetBlockBody without validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 300)
blockBody = BlockBody()
blockHash = blockHeader.blockHash()
# Test content not found
block:
let blockBodyRes =
await tc.client.historyGetBlockBody(blockHash, validateContent = false)
check:
blockBodyRes.isErr()
blockBodyRes.error() == ContentNotFound
# Test content found
block:
tc.historyNode.store(blockHash, blockHeader)
tc.historyNode.store(blockHash, blockBody)
let blockBodyRes =
await tc.client.historyGetBlockBody(blockHash, validateContent = false)
check:
blockBodyRes.isOk()
blockBodyRes.value() == blockBody
waitFor tc.stop()
asyncTest "Test historyGetReceipts with validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 300)
receipts = @[Receipt()]
blockHash = blockHeader.blockHash()
# Test content not found
block:
let receiptsRes =
await tc.client.historyGetReceipts(blockHash, validateContent = true)
check:
receiptsRes.isErr()
receiptsRes.error() == ContentNotFound
# Test content validation failed
block:
tc.historyNode.store(blockHash, blockHeader)
tc.historyNode.store(blockHash, receipts)
let receiptsRes =
await tc.client.historyGetReceipts(blockHash, validateContent = true)
check:
receiptsRes.isErr()
receiptsRes.error() == ContentValidationFailed
waitFor tc.stop()
asyncTest "Test historyGetReceipts without validation":
let
tc = await setupTest(rng)
blockHeader = BlockHeader(number: 300)
receipts = @[Receipt()]
blockHash = blockHeader.blockHash()
# Test content not found
block:
let receiptsRes =
await tc.client.historyGetReceipts(blockHash, validateContent = false)
check:
receiptsRes.isErr()
receiptsRes.error() == ContentNotFound
# Test content found
block:
tc.historyNode.store(blockHash, blockHeader)
tc.historyNode.store(blockHash, receipts)
let receiptsRes =
await tc.client.historyGetReceipts(blockHash, validateContent = false)
check:
receiptsRes.isOk()
receiptsRes.value() == receipts
waitFor tc.stop()