Generalize network layer for portal (#814)

* Generalize netork layer for portal

* Make messages free from any content references

* Use portal network in main fluffy module

* Fix cli

* Use lookup in portal network

* Avoid using result
This commit is contained in:
KonradStaniec 2021-09-03 10:57:19 +02:00 committed by GitHub
parent df3e7bb368
commit a083fb30fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 138 additions and 75 deletions

View File

@ -13,7 +13,8 @@ import
json_rpc/rpcproxy, json_rpc/rpcproxy,
eth/keys, eth/net/nat, eth/keys, eth/net/nat,
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
./conf, ./network/state/portal_protocol, ./rpc/[eth_api, bridge_client, discovery_api] ./conf, ./rpc/[eth_api, bridge_client, discovery_api],
./network/state/[portal_network, content]
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
try: try:
@ -52,7 +53,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
d.open() d.open()
let portal = PortalProtocol.new(d) let portal = PortalNetwork.new(d, newEmptyInMemoryStorage())
if config.metricsEnabled: if config.metricsEnabled:
let let

View File

@ -9,6 +9,7 @@ import
std/[options, strutils, tables], std/[options, strutils, tables],
confutils, confutils/std/net, chronicles, chronicles/topics_registry, confutils, confutils/std/net, chronicles, chronicles/topics_registry,
chronos, metrics, metrics/chronos_httpserver, stew/byteutils, chronos, metrics, metrics/chronos_httpserver, stew/byteutils,
nimcrypto/[hash, sha2],
eth/[keys, net/nat], eth/[keys, net/nat],
eth/p2p/discoveryv5/[enr, node], eth/p2p/discoveryv5/[enr, node],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
@ -146,6 +147,11 @@ proc discover(d: discv5_protocol.Protocol) {.async.} =
info "Lookup finished", nodes = discovered.len info "Lookup finished", nodes = discovered.len
await sleepAsync(30.seconds) await sleepAsync(30.seconds)
# TODO for now just return some random id
proc testHandler(contentKey: ByteList): ContentResult =
let id = sha256.digest("test")
ContentResult(kind: ContentMissing, contentId: id)
proc run(config: DiscoveryConf) = proc run(config: DiscoveryConf) =
let let
rng = newRng() rng = newRng()
@ -164,7 +170,7 @@ proc run(config: DiscoveryConf) =
d.open() d.open()
let portal = PortalProtocol.new(d) let portal = PortalProtocol.new(d, testHandler)
if config.metricsEnabled: if config.metricsEnabled:
let let
@ -200,11 +206,9 @@ proc run(config: DiscoveryConf) =
key key
# For now just zeroes content node hash # For now just some random bytes
var nodeHash: NodeHash let contentKey = List.init(@[1'u8], 2048)
let contentKey = ContentKey(networkId: 0'u16,
contentType: messages.ContentType.Account,
nodeHash: nodeHash)
let foundContent = waitFor portal.findContent(config.findContentTarget, let foundContent = waitFor portal.findContent(config.findContentTarget,
contentKey) contentKey)

View File

@ -107,3 +107,7 @@ proc getContent*(storage: ContentStorage, key: ContentKey): Option[seq[byte]] =
proc getContent*(storage: ContentStorage, contentKey: ByteList): Option[seq[byte]] = proc getContent*(storage: ContentStorage, contentKey: ByteList): Option[seq[byte]] =
decodeKey(contentKey).flatMap((key: ContentKey) => getContent(storage, key)) decodeKey(contentKey).flatMap((key: ContentKey) => getContent(storage, key))
proc newEmptyInMemoryStorage*(): ContentStorage =
let trie = initHexaryTrie(newMemoryDb())
ContentStorage(trie: trie)

View File

@ -13,12 +13,13 @@
import import
options, options,
stint, stew/[results, objects], stint, stew/[results, objects],
eth/ssz/ssz_serialization, eth/ssz/ssz_serialization
./content
export ssz_serialization, stint, content export ssz_serialization, stint
type type
ByteList* = List[byte, 2048]
MessageKind* = enum MessageKind* = enum
unused = 0x00 unused = 0x00
@ -104,6 +105,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
template toSszType*(x: UInt256): array[32, byte] = template toSszType*(x: UInt256): array[32, byte] =
toBytesLE(x) toBytesLE(x)
template toSszType*(x: auto): auto =
x
func fromSszBytes*(T: type UInt256, data: openArray[byte]): func fromSszBytes*(T: type UInt256, data: openArray[byte]):
T {.raises: [MalformedSszError, Defect].} = T {.raises: [MalformedSszError, Defect].} =
if data.len != sizeof(result): if data.len != sizeof(result):

View File

@ -0,0 +1,41 @@
import
std/[options, sugar],
stew/results,
eth/p2p/discoveryv5/[protocol, node],
./content, ./portal_protocol
# TODO expose function in domain specific way i.e operating od state network objects i.e
# nodes, tries, hashes
type PortalNetwork* = ref object
storage: ContentStorage
portalProtocol*: PortalProtocol
proc getHandler(storage: ContentStorage): ContentHandler =
return (proc (contentKey: content.ByteList): ContentResult =
let maybeContent = storage.getContent(contentKey)
if (maybeContent.isSome()):
ContentResult(kind: ContentFound, content: maybeContent.unsafeGet())
else:
ContentResult(kind: ContentMissing, contentId: toContentId(contentKey)))
# Further improvements which may be necessary:
# 1. Return proper domain types instead of bytes
# 2. First check if item is in storage instead of doing lookup
# 3. Put item into storage (if in radius) after succesful lookup
proc getContent*(p:PortalNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} =
let keyAsBytes = encodeKeyAsList(key)
let id = contentIdAsUint256(toContentId(keyAsBytes))
let result = await p.portalProtocol.contentLookup(keyAsBytes, id)
# for now returning bytes, ultimatly it would be nice to return proper domain types from here
return result.map(x => x.asSeq())
proc new*(T: type PortalNetwork, baseProtocol: protocol.Protocol, storage: ContentStorage , dataRadius = UInt256.high()): T =
let portalProto = PortalProtocol.new(baseProtocol, getHandler(storage), dataRadius)
return PortalNetwork(storage: storage, portalProtocol: portalProto)
proc start*(p: PortalNetwork) =
p.portalProtocol.start()
proc stop*(p: PortalNetwork) =
p.portalProtocol.stop()

View File

@ -9,9 +9,9 @@
import import
std/[sequtils, sets, algorithm], std/[sequtils, sets, algorithm],
stew/[results, byteutils], chronicles, chronos, stew/[results, byteutils], chronicles, chronos, nimcrypto/hash,
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2], eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2],
./content, ./messages ./messages
export messages export messages
@ -30,11 +30,27 @@ const
InitialLookups = 1 ## Amount of lookups done when populating the routing table InitialLookups = 1 ## Amount of lookups done when populating the routing table
type type
ContentResultKind* = enum
ContentFound, ContentMissing, ContentKeyValidationFailure
ContentResult* = object
case kind*: ContentResultKind
of ContentFound:
content*: seq[byte]
of ContentMissing:
contentId*: MDigest[32 * 8]
of ContentKeyValidationFailure:
error*: string
# Treating Result as typed union type. If content is present handler should return
# it, if not it should return content id so that closest neighbours can be localized.
ContentHandler* = proc (contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.}
PortalProtocol* = ref object of TalkProtocol PortalProtocol* = ref object of TalkProtocol
routingTable: RoutingTable routingTable: RoutingTable
baseProtocol*: protocol.Protocol baseProtocol*: protocol.Protocol
dataRadius*: UInt256 dataRadius*: UInt256
contentStorage*: ContentStorage handleContentRequest: ContentHandler
lastLookup: chronos.Moment lastLookup: chronos.Moment
refreshLoop: Future[void] refreshLoop: Future[void]
revalidateLoop: Future[void] revalidateLoop: Future[void]
@ -96,17 +112,18 @@ proc handleFindNode(p: PortalProtocol, fn: FindNodeMessage): seq[byte] =
encodeMessage(NodesMessage(total: 1, enrs: enrs)) encodeMessage(NodesMessage(total: 1, enrs: enrs))
proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
let contentHandlingResult = p.handleContentRequest(fc.contentKey)
# TODO: Need to check networkId, type, trie path # TODO: Need to check networkId, type, trie path
let case contentHandlingResult.kind
# TODO: Should we first do a simple check on ContentId versus Radius? of ContentFound:
contentId = toContentId(fc.contentKey) let content = contentHandlingResult.content
content = p.contentStorage.getContent(fc.contentKey)
if content.isSome():
let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send
encodeMessage(FoundContentMessage( encodeMessage(FoundContentMessage(
enrs: enrs, payload: ByteList(content.get()))) enrs: enrs, payload: ByteList(content)))
else: of ContentMissing:
let let
contentId = contentHandlingResult.contentId
# TODO: Should we first do a simple check on ContentId versus Radius?
closestNodes = p.routingTable.neighbours( closestNodes = p.routingTable.neighbours(
NodeId(readUintBE[256](contentId.data)), seenOnly = true) NodeId(readUintBE[256](contentId.data)), seenOnly = true)
payload = ByteList(@[]) # Empty payload when enrs are send payload = ByteList(@[]) # Empty payload when enrs are send
@ -115,6 +132,13 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
encodeMessage(FoundContentMessage( encodeMessage(FoundContentMessage(
enrs: List[ByteList, 32](List(enrs)), payload: payload)) enrs: List[ByteList, 32](List(enrs)), payload: payload))
of ContentKeyValidationFailure:
# Retrun empty response when content key validation fail
let content = ByteList(@[])
let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send
encodeMessage(FoundContentMessage(
enrs: enrs, payload: content))
proc handleAdvertise(p: PortalProtocol, a: AdvertiseMessage): seq[byte] = proc handleAdvertise(p: PortalProtocol, a: AdvertiseMessage): seq[byte] =
# TODO: Not implemented # TODO: Not implemented
let let
@ -147,11 +171,13 @@ proc messageHandler*(protocol: TalkProtocol, request: seq[byte]): seq[byte] =
@[] @[]
proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol,
contentHandler: ContentHandler,
dataRadius = UInt256.high()): T = dataRadius = UInt256.high()): T =
let proto = PortalProtocol( let proto = PortalProtocol(
protocolHandler: messageHandler, protocolHandler: messageHandler,
baseProtocol: baseProtocol, baseProtocol: baseProtocol,
dataRadius: dataRadius) dataRadius: dataRadius,
handleContentRequest: contentHandler)
proto.routingTable.init(baseProtocol.localNode, DefaultBitsPerHop, proto.routingTable.init(baseProtocol.localNode, DefaultBitsPerHop,
DefaultTableIpLimits, baseProtocol.rng) DefaultTableIpLimits, baseProtocol.rng)
@ -199,12 +225,9 @@ proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
# TODO Add nodes validation # TODO Add nodes validation
return await reqResponse[FindNodeMessage, NodesMessage](p, dst, PortalProtocolId, fn) return await reqResponse[FindNodeMessage, NodesMessage](p, dst, PortalProtocolId, fn)
# TODO It maybe better to accept bytelist, to not tie network layer to particular proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
# content
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ContentKey):
Future[PortalResult[FoundContentMessage]] {.async.} = Future[PortalResult[FoundContentMessage]] {.async.} =
let encKey = encodeKeyAsList(contentKey) let fc = FindContentMessage(contentKey: contentKey)
let fc = FindContentMessage(contentKey: encKey)
trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent
return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, PortalProtocolId, fc) return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, PortalProtocolId, fc)
@ -329,7 +352,7 @@ proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, dst: N
else: else:
return LookupResult(kind: Nodes, nodes: nodes) return LookupResult(kind: Nodes, nodes: nodes)
proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey): proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList):
Future[LookupResult] {.async.} = Future[LookupResult] {.async.} =
var nodes: seq[Node] var nodes: seq[Node]
@ -343,8 +366,7 @@ proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey):
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other # TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
# networks will probably be very similar. Extract lookup function to separate module # networks will probably be very similar. Extract lookup function to separate module
# and make it more generaic # and make it more generaic
proc contentLookup*(p: PortalProtocol, target: ContentKey): Future[Option[ByteList]] {.async.} = proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): Future[Option[ByteList]] {.async.} =
let targetId = contentIdAsUint256(toContentId(target))
## Perform a lookup for the given target, return the closest n nodes to the ## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`. ## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance # `closestNodes` holds the k closest nodes to target found, sorted by distance

View File

@ -11,7 +11,7 @@ import
eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization], eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table, eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
../../nimbus/[genesis, chain_config, db/db_chain], ../../nimbus/[genesis, chain_config, db/db_chain],
../network/state/portal_protocol, ../network/state/content, ../network/state/portal_protocol, ../network/state/content, ../network/state/portal_network,
./test_helpers ./test_helpers
proc genesisToTrie(filePath: string): HexaryTrie = proc genesisToTrie(filePath: string): HexaryTrie =
@ -37,18 +37,17 @@ procSuite "Content Network":
let rng = newRng() let rng = newRng()
asyncTest "Test Share Full State": asyncTest "Test Share Full State":
let let
trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303)) rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = PortalProtocol.new(node1) proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie))
proto2 = PortalProtocol.new(node2) proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie))
let trie = check proto2.portalProtocol.addNode(node1.localNode) == Added
genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
proto1.contentStorage = ContentStorage(trie: trie)
var keys: seq[seq[byte]] var keys: seq[seq[byte]]
for k, v in trie.replicate: for k, v in trie.replicate:
@ -64,15 +63,12 @@ procSuite "Content Network":
contentType: content.ContentType.Account, contentType: content.ContentType.Account,
nodeHash: nodeHash) nodeHash: nodeHash)
let foundContent = await proto2.findContent(proto1.baseProtocol.localNode, let foundContent = await proto2.getContent(contentKey)
contentKey)
check: check:
foundContent.isOk() foundContent.isSome()
foundContent.get().payload.len() != 0
foundContent.get().enrs.len() == 0
let hash = hexary.keccak(foundContent.get().payload.asSeq()) let hash = hexary.keccak(foundContent.get())
check hash.data == key check hash.data == key
await node1.closeWait() await node1.closeWait()
@ -80,6 +76,7 @@ procSuite "Content Network":
asyncTest "Find content in the network via content lookup": asyncTest "Find content in the network via content lookup":
let let
trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
@ -88,20 +85,16 @@ procSuite "Content Network":
rng, PrivateKey.random(rng[]), localAddress(20304)) rng, PrivateKey.random(rng[]), localAddress(20304))
proto1 = PortalProtocol.new(node1) proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie))
proto2 = PortalProtocol.new(node2) proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie))
proto3 = PortalProtocol.new(node3) proto3 = PortalNetwork.new(node3, ContentStorage(trie: trie))
let trie =
genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
proto3.contentStorage = ContentStorage(trie: trie)
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content # Node1 knows about Node2, and Node2 knows about Node3 which hold all content
check proto1.addNode(proto2.baseProtocol.localNode) == Added check proto1.portalProtocol.addNode(node2.localNode) == Added
check proto2.addNode(proto3.baseProtocol.localNode) == Added check proto2.portalProtocol.addNode(node3.localNode) == Added
check (await proto2.ping(proto3.baseProtocol.localNode)).isOk() check (await proto2.portalProtocol.ping(node3.localNode)).isOk()
var keys: seq[seq[byte]] var keys: seq[seq[byte]]
for k, v in trie.replicate: for k, v in trie.replicate:
@ -117,12 +110,12 @@ procSuite "Content Network":
contentType: content.ContentType.Account, contentType: content.ContentType.Account,
nodeHash: nodeHash) nodeHash: nodeHash)
let foundContent = await proto1.contentLookup(contentKey) let foundContent = await proto1.getContent(contentKey)
check: check:
foundContent.isSome() foundContent.isSome()
let hash = hexary.keccak(foundContent.get().asSeq()) let hash = hexary.keccak(foundContent.get())
check hash.data == firstKey check hash.data == firstKey

View File

@ -9,7 +9,7 @@
import import
chronos, testutils/unittests, stew/shims/net, chronos, testutils/unittests, stew/shims/net,
eth/keys, eth/p2p/discoveryv5/routing_table, eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
../network/state/portal_protocol, ../network/state/portal_protocol,
./test_helpers ./test_helpers
@ -20,6 +20,10 @@ type Default2NodeTest = ref object
proto1: PortalProtocol proto1: PortalProtocol
proto2: PortalProtocol proto2: PortalProtocol
proc testHandler(contentKey: ByteList): ContentResult =
let id = sha256.digest("test")
ContentResult(kind: ContentMissing, contentId: id)
proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
let let
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
@ -27,8 +31,8 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303)) rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = PortalProtocol.new(node1) proto1 = PortalProtocol.new(node1, testHandler)
proto2 = PortalProtocol.new(node2) proto2 = PortalProtocol.new(node2, testHandler)
Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2) Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2)
@ -137,11 +141,7 @@ procSuite "Portal Tests":
# table. # table.
test.proto2.start() test.proto2.start()
var nodeHash: NodeHash let contentKey = List.init(@[1'u8], 2048)
let contentKey = ContentKey(networkId: 0'u16,
contentType: ContentType.Account,
nodeHash: nodeHash)
# content does not exist so this should provide us with the closest nodes # content does not exist so this should provide us with the closest nodes
# to the content, which is the only node in the routing table. # to the content, which is the only node in the routing table.

View File

@ -107,19 +107,13 @@ suite "Portal Protocol Message Encodings":
message.nodes.enrs[1] == ByteList(e2.raw) message.nodes.enrs[1] == ByteList(e2.raw)
test "FindContent Request": test "FindContent Request":
var nodeHash: NodeHash # zeroes hash
let let
contentKey = ContentKey( contentEncoded: ByteList = List.init(@[1'u8], 2048)
networkId: 0'u16,
contentType: ContentType.Account,
nodeHash: nodeHash)
contentEncoded: ByteList = encodeKeyAsList(contentKey)
fn = FindContentMessage(contentKey: contentEncoded) fn = FindContentMessage(contentKey: contentEncoded)
let encoded = encodeMessage(fn) let encoded = encodeMessage(fn)
check encoded.toHex == "05040000000000010000000000000000000000000000000000000000000000000000000000000000" check encoded.toHex == "050400000001"
let decoded = decodeMessage(encoded) let decoded = decodeMessage(encoded)
check decoded.isOk() check decoded.isOk()