Naive implementation of recursive gossip (#2024)

This commit is contained in:
Daniel Sobol 2024-02-15 18:37:05 +03:00 committed by GitHub
parent 5642662850
commit 3305c02856
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 172 additions and 17 deletions

View File

@ -157,7 +157,7 @@ func packNibbles*(nibbles: seq[byte]): Nibbles =
Nibbles(output)
func unpackNibbles*(nibbles: Nibbles): seq[byte] =
doAssert(nibbles.len() <= MAX_PACKED_NIBBLES_LEN, "Can't unpack more than 32 nibbles")
doAssert(nibbles.len() <= MAX_PACKED_NIBBLES_LEN, "Packed nibbles length is too long")
var output = newSeq[byte]()

View File

@ -7,6 +7,8 @@
import
stew/results, chronos, chronicles,
eth/common/eth_hash,
eth/common,
eth/p2p/discoveryv5/[protocol, enr],
../../database/content_db,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
@ -58,31 +60,49 @@ proc getContent*(n: StateNetwork, key: ContentKey):
# domain types.
return Opt.some(contentResult.content)
proc validateContent(
proc validateAccountTrieNode(key: ContentKey, contentValue: seq[byte]): bool =
let value = decodeSsz(contentValue, AccountTrieNodeOffer).valueOr:
warn "Received invalid account trie proof", error
return false
true
proc validateContractTrieNode(key: ContentKey, contentValue: seq[byte]): bool =
let value = decodeSsz(contentValue, ContractTrieNodeOffer).valueOr:
warn "Received invalid contract trie proof", error
return false
true
proc validateContractCode(key: ContentKey, contentValue: seq[byte]): bool =
let value = decodeSsz(contentValue, ContractCodeOffer).valueOr:
warn "Received invalid contract code", error
return false
true
proc validateContent*(
n: StateNetwork,
contentKey: ByteList,
contentValue: seq[byte]): Future[bool] {.async.} =
contentValue: seq[byte]): bool =
let key = contentKey.decode().valueOr:
return false
case key.contentType:
of unused:
warn "Received content with unused content type"
false
of accountTrieNode:
true
validateAccountTrieNode(key, contentValue)
of contractTrieNode:
true
validateContractTrieNode(key, contentValue)
of contractCode:
true
validateContractCode(key, contentValue)
proc validateContent(
n: StateNetwork,
contentKeys: ContentKeysList,
contentValues: seq[seq[byte]]): Future[bool] {.async.} =
contentValues: seq[seq[byte]]): bool =
for i, contentValue in contentValues:
let contentKey = contentKeys[i]
if await n.validateContent(contentKey, contentValue):
if n.validateContent(contentKey, contentValue):
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
error "Received offered content with invalid content key", contentKey
return false
@ -94,6 +114,64 @@ proc validateContent(
error "Received offered content failed validation", contentKey
return false
proc recursiveGossipAccountTrieNode(
p: PortalProtocol,
maybeSrcNodeId: Opt[NodeId],
decodedKey: ContentKey,
contentValue: seq[byte]
): Future[void] {.async.} =
var nibbles = decodedKey.accountTrieNodeKey.path.unpackNibbles()
let decodedValue = decodeSsz(contentValue, AccountTrieNodeOffer).valueOr:
raiseAssert "Received offered content failed validation"
var proof = decodedValue.proof
discard nibbles.pop()
discard (distinctBase proof).pop()
let
updatedValue = AccountTrieNodeOffer(
proof: proof,
blockHash: decodedValue.blockHash,
)
updatedNodeHash = keccakHash(distinctBase proof[^1])
encodedValue = SSZ.encode(updatedValue)
updatedKey = AccountTrieNodeKey(path: nibbles.packNibbles(), nodeHash: updatedNodeHash)
encodedKey = ContentKey(accountTrieNodeKey: updatedKey, contentType: accountTrieNode).encode()
await neighborhoodGossipDiscardPeers(
p, maybeSrcNodeId, ContentKeysList.init(@[encodedKey]), @[encodedValue]
)
proc recursiveGossipContractTrieNode(
p: PortalProtocol,
maybeSrcNodeId: Opt[NodeId],
decodedKey: ContentKey,
contentValue: seq[byte]
): Future[void] {.async.} =
return
proc gossipContent*(
p: PortalProtocol,
maybeSrcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
contentValues: seq[seq[byte]]
): Future[void] {.async.} =
for i, contentValue in contentValues:
let
contentKey = contentKeys[i]
decodedKey = contentKey.decode().valueOr:
raiseAssert "Received offered content with invalid content key"
case decodedKey.contentType:
of unused:
warn("Gossiping content with unused content type")
continue
of accountTrieNode:
await recursiveGossipAccountTrieNode(p, maybeSrcNodeId, decodedKey, contentValue)
of contractTrieNode:
await recursiveGossipContractTrieNode(p, maybeSrcNodeId, decodedKey, contentValue)
of contractCode:
await neighborhoodGossipDiscardPeers(
p, maybeSrcNodeId, ContentKeysList.init(@[contentKey]), @[contentValue]
)
proc new*(
T: type StateNetwork,
baseProtocol: protocol.Protocol,
@ -122,11 +200,14 @@ proc new*(
proc processContentLoop(n: StateNetwork) {.async.} =
try:
while true:
let (maybeContentId, contentKeys, contentValues) = await n.contentQueue.popFirst()
if await n.validateContent(contentKeys, contentValues):
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
maybeContentId, contentKeys, contentValues
)
let (maybeSrcNodeId, contentKeys, contentValues) = await n.contentQueue.popFirst()
if n.validateContent(contentKeys, contentValues):
await gossipContent(
n.portalProtocol,
maybeSrcNodeId,
contentKeys,
contentValues
)
except CancelledError:
trace "processContentLoop canceled"

View File

@ -11,6 +11,7 @@ import
./test_portal_wire_protocol,
./state_network_tests/test_state_content_keys,
./state_network_tests/test_state_content_values,
./state_network_tests/test_state_network_gossip,
./test_state_proof_verification,
./test_accumulator,
./test_history_network,

View File

@ -5,9 +5,6 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
../../network/state/state_content
type JsonBlockInfo* = object
number*: uint64
block_hash*: string
@ -46,3 +43,9 @@ type JsonContractBytecode* = object
content_id*: string
content_value_offer*: string
content_value_retrieval*: string
type JsonGossipKVPair* = object
content_key*: string
content_value*: string
type JsonRecursiveGossip* = seq[JsonGossipKVPair]

View File

@ -0,0 +1,70 @@
# Fluffy
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
stew/[byteutils, results],
testutils/unittests,
chronos,
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
./helpers,
../../network/wire/[portal_protocol, portal_stream],
../../network/state/[state_content, state_network],
../../database/content_db,
.././test_helpers,
../../eth_data/history_data_json_store
const testVectorDir = "./vendor/portal-spec-tests/tests/mainnet/state/"
procSuite "State Network Gossip":
let rng = newRng()
asyncTest "Test Gossip of Account Trie Node Offer":
let
recursiveGossipSteps = readJsonType(testVectorDir & "recursive_gossip.json", JsonRecursiveGossip).valueOr:
raiseAssert "Cannot read test vector: " & error
numOfClients = recursiveGossipSteps.len() - 1
var clients: seq[StateNetwork]
for i in 0..numOfClients:
let
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(20400 + i))
sm = StreamManager.new(node)
proto = StateNetwork.new(node, ContentDB.new("", uint32.high, inMemory = true), sm)
proto.start()
clients.add(proto)
for i, pair in recursiveGossipSteps[0..^2]:
let
currentNode = clients[i]
nextNode = clients[i+1]
key = ByteList.init(pair.content_key.hexToSeqByte())
decodedKey = key.decode().valueOr:
raiseAssert "Cannot decode key"
nextKey = ByteList.init(recursiveGossipSteps[1].content_key.hexToSeqByte())
decodedNextKey = nextKey.decode().valueOr:
raiseAssert "Cannot decode key"
value = pair.content_value.hexToSeqByte()
nextValue = recursiveGossipSteps[1].content_value.hexToSeqByte()
check:
currentNode.portalProtocol.addNode(nextNode.portalProtocol.localNode) == Added
(await currentNode.portalProtocol.ping(nextNode.portalProtocol.localNode)).isOk()
await currentNode.portalProtocol.gossipContent(Opt.none(NodeId), ContentKeysList.init(@[key]), @[value])
await sleepAsync(100.milliseconds)
let gossipedValue = await nextNode.getContent(decodedNextKey)
check:
gossipedValue.isSome()
gossipedValue.get() == nextValue
for i in 0..numOfClients:
await clients[i].portalProtocol.baseProtocol.closeWait()
# TODO Add tests for Contract Trie Node Offer & Contract Code Offer