From a083fb30fc7ef9e496a0dda6a103d46de4c99c96 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Fri, 3 Sep 2021 10:57:19 +0200 Subject: [PATCH] Generalize network layer for portal (#814) * Generalize netork layer for portal * Make messages free from any content references * Use portal network in main fluffy module * Fix cli * Use lookup in portal network * Avoid using result --- fluffy/fluffy.nim | 5 +- fluffy/network/portalcli.nim | 16 +++--- fluffy/network/state/content.nim | 4 ++ fluffy/network/state/messages.nim | 10 ++-- fluffy/network/state/portal_network.nim | 41 ++++++++++++++ fluffy/network/state/portal_protocol.nim | 68 ++++++++++++++++-------- fluffy/tests/test_content_network.nim | 43 +++++++-------- fluffy/tests/test_portal.nim | 16 +++--- fluffy/tests/test_portal_encoding.nim | 10 +--- 9 files changed, 138 insertions(+), 75 deletions(-) create mode 100644 fluffy/network/state/portal_network.nim diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index e8dc44fa3..361ab1181 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -13,7 +13,8 @@ import json_rpc/rpcproxy, eth/keys, eth/net/nat, eth/p2p/discoveryv5/protocol as discv5_protocol, - ./conf, ./network/state/portal_protocol, ./rpc/[eth_api, bridge_client, discovery_api] + ./conf, ./rpc/[eth_api, bridge_client, discovery_api], + ./network/state/[portal_network, content] proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = try: @@ -52,7 +53,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = d.open() - let portal = PortalProtocol.new(d) + let portal = PortalNetwork.new(d, newEmptyInMemoryStorage()) if config.metricsEnabled: let diff --git a/fluffy/network/portalcli.nim b/fluffy/network/portalcli.nim index fe6a4ab94..4b1385d71 100644 --- a/fluffy/network/portalcli.nim +++ b/fluffy/network/portalcli.nim @@ -9,6 +9,7 @@ import std/[options, strutils, tables], confutils, confutils/std/net, chronicles, chronicles/topics_registry, chronos, metrics, metrics/chronos_httpserver, stew/byteutils, + nimcrypto/[hash, sha2], eth/[keys, net/nat], eth/p2p/discoveryv5/[enr, node], eth/p2p/discoveryv5/protocol as discv5_protocol, @@ -146,6 +147,11 @@ proc discover(d: discv5_protocol.Protocol) {.async.} = info "Lookup finished", nodes = discovered.len await sleepAsync(30.seconds) +# TODO for now just return some random id +proc testHandler(contentKey: ByteList): ContentResult = + let id = sha256.digest("test") + ContentResult(kind: ContentMissing, contentId: id) + proc run(config: DiscoveryConf) = let rng = newRng() @@ -164,7 +170,7 @@ proc run(config: DiscoveryConf) = d.open() - let portal = PortalProtocol.new(d) + let portal = PortalProtocol.new(d, testHandler) if config.metricsEnabled: let @@ -200,11 +206,9 @@ proc run(config: DiscoveryConf) = key - # For now just zeroes content node hash - var nodeHash: NodeHash - let contentKey = ContentKey(networkId: 0'u16, - contentType: messages.ContentType.Account, - nodeHash: nodeHash) + # For now just some random bytes + let contentKey = List.init(@[1'u8], 2048) + let foundContent = waitFor portal.findContent(config.findContentTarget, contentKey) diff --git a/fluffy/network/state/content.nim b/fluffy/network/state/content.nim index 29c772984..c6b7fca9e 100644 --- a/fluffy/network/state/content.nim +++ b/fluffy/network/state/content.nim @@ -107,3 +107,7 @@ proc getContent*(storage: ContentStorage, key: ContentKey): Option[seq[byte]] = proc getContent*(storage: ContentStorage, contentKey: ByteList): Option[seq[byte]] = decodeKey(contentKey).flatMap((key: ContentKey) => getContent(storage, key)) + +proc newEmptyInMemoryStorage*(): ContentStorage = + let trie = initHexaryTrie(newMemoryDb()) + ContentStorage(trie: trie) diff --git a/fluffy/network/state/messages.nim b/fluffy/network/state/messages.nim index 0031775dd..f2c4c754b 100644 --- a/fluffy/network/state/messages.nim +++ b/fluffy/network/state/messages.nim @@ -13,12 +13,13 @@ import options, stint, stew/[results, objects], - eth/ssz/ssz_serialization, - ./content + eth/ssz/ssz_serialization -export ssz_serialization, stint, content +export ssz_serialization, stint type + ByteList* = List[byte, 2048] + MessageKind* = enum unused = 0x00 @@ -104,6 +105,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind = template toSszType*(x: UInt256): array[32, byte] = toBytesLE(x) +template toSszType*(x: auto): auto = + x + func fromSszBytes*(T: type UInt256, data: openArray[byte]): T {.raises: [MalformedSszError, Defect].} = if data.len != sizeof(result): diff --git a/fluffy/network/state/portal_network.nim b/fluffy/network/state/portal_network.nim new file mode 100644 index 000000000..745402462 --- /dev/null +++ b/fluffy/network/state/portal_network.nim @@ -0,0 +1,41 @@ +import + std/[options, sugar], + stew/results, + eth/p2p/discoveryv5/[protocol, node], + ./content, ./portal_protocol + +# TODO expose function in domain specific way i.e operating od state network objects i.e +# nodes, tries, hashes +type PortalNetwork* = ref object + storage: ContentStorage + portalProtocol*: PortalProtocol + +proc getHandler(storage: ContentStorage): ContentHandler = + return (proc (contentKey: content.ByteList): ContentResult = + let maybeContent = storage.getContent(contentKey) + if (maybeContent.isSome()): + ContentResult(kind: ContentFound, content: maybeContent.unsafeGet()) + else: + ContentResult(kind: ContentMissing, contentId: toContentId(contentKey))) + +# Further improvements which may be necessary: +# 1. Return proper domain types instead of bytes +# 2. First check if item is in storage instead of doing lookup +# 3. Put item into storage (if in radius) after succesful lookup +proc getContent*(p:PortalNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} = + let keyAsBytes = encodeKeyAsList(key) + let id = contentIdAsUint256(toContentId(keyAsBytes)) + let result = await p.portalProtocol.contentLookup(keyAsBytes, id) + # for now returning bytes, ultimatly it would be nice to return proper domain types from here + return result.map(x => x.asSeq()) + +proc new*(T: type PortalNetwork, baseProtocol: protocol.Protocol, storage: ContentStorage , dataRadius = UInt256.high()): T = + let portalProto = PortalProtocol.new(baseProtocol, getHandler(storage), dataRadius) + return PortalNetwork(storage: storage, portalProtocol: portalProto) + +proc start*(p: PortalNetwork) = + p.portalProtocol.start() + +proc stop*(p: PortalNetwork) = + p.portalProtocol.stop() + diff --git a/fluffy/network/state/portal_protocol.nim b/fluffy/network/state/portal_protocol.nim index 1df1c4dd6..d1a1892eb 100644 --- a/fluffy/network/state/portal_protocol.nim +++ b/fluffy/network/state/portal_protocol.nim @@ -9,9 +9,9 @@ import std/[sequtils, sets, algorithm], - stew/[results, byteutils], chronicles, chronos, + stew/[results, byteutils], chronicles, chronos, nimcrypto/hash, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2], - ./content, ./messages + ./messages export messages @@ -30,11 +30,27 @@ const InitialLookups = 1 ## Amount of lookups done when populating the routing table type + ContentResultKind* = enum + ContentFound, ContentMissing, ContentKeyValidationFailure + + ContentResult* = object + case kind*: ContentResultKind + of ContentFound: + content*: seq[byte] + of ContentMissing: + contentId*: MDigest[32 * 8] + of ContentKeyValidationFailure: + error*: string + + # Treating Result as typed union type. If content is present handler should return + # it, if not it should return content id so that closest neighbours can be localized. + ContentHandler* = proc (contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.} + PortalProtocol* = ref object of TalkProtocol routingTable: RoutingTable baseProtocol*: protocol.Protocol dataRadius*: UInt256 - contentStorage*: ContentStorage + handleContentRequest: ContentHandler lastLookup: chronos.Moment refreshLoop: Future[void] revalidateLoop: Future[void] @@ -49,7 +65,7 @@ type of Nodes: nodes: seq[Node] of Content: - content: ByteList + content: ByteList proc addNode*(p: PortalProtocol, node: Node): NodeStatus = p.routingTable.addNode(node) @@ -96,25 +112,33 @@ proc handleFindNode(p: PortalProtocol, fn: FindNodeMessage): seq[byte] = encodeMessage(NodesMessage(total: 1, enrs: enrs)) proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = + let contentHandlingResult = p.handleContentRequest(fc.contentKey) # TODO: Need to check networkId, type, trie path - let - # TODO: Should we first do a simple check on ContentId versus Radius? - contentId = toContentId(fc.contentKey) - content = p.contentStorage.getContent(fc.contentKey) - if content.isSome(): + case contentHandlingResult.kind + of ContentFound: + let content = contentHandlingResult.content let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send encodeMessage(FoundContentMessage( - enrs: enrs, payload: ByteList(content.get()))) - else: + enrs: enrs, payload: ByteList(content))) + of ContentMissing: let + contentId = contentHandlingResult.contentId + # TODO: Should we first do a simple check on ContentId versus Radius? closestNodes = p.routingTable.neighbours( NodeId(readUintBE[256](contentId.data)), seenOnly = true) payload = ByteList(@[]) # Empty payload when enrs are send enrs = - closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) + closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) encodeMessage(FoundContentMessage( enrs: List[ByteList, 32](List(enrs)), payload: payload)) + of ContentKeyValidationFailure: + # Retrun empty response when content key validation fail + let content = ByteList(@[]) + let enrs = List[ByteList, 32](@[]) # Empty enrs when payload is send + encodeMessage(FoundContentMessage( + enrs: enrs, payload: content)) + proc handleAdvertise(p: PortalProtocol, a: AdvertiseMessage): seq[byte] = # TODO: Not implemented let @@ -147,11 +171,13 @@ proc messageHandler*(protocol: TalkProtocol, request: seq[byte]): seq[byte] = @[] proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, + contentHandler: ContentHandler, dataRadius = UInt256.high()): T = let proto = PortalProtocol( protocolHandler: messageHandler, baseProtocol: baseProtocol, - dataRadius: dataRadius) + dataRadius: dataRadius, + handleContentRequest: contentHandler) proto.routingTable.init(baseProtocol.localNode, DefaultBitsPerHop, DefaultTableIpLimits, baseProtocol.rng) @@ -199,12 +225,9 @@ proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]): # TODO Add nodes validation return await reqResponse[FindNodeMessage, NodesMessage](p, dst, PortalProtocolId, fn) -# TODO It maybe better to accept bytelist, to not tie network layer to particular -# content -proc findContent*(p: PortalProtocol, dst: Node, contentKey: ContentKey): +proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): Future[PortalResult[FoundContentMessage]] {.async.} = - let encKey = encodeKeyAsList(contentKey) - let fc = FindContentMessage(contentKey: encKey) + let fc = FindContentMessage(contentKey: contentKey) trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, PortalProtocolId, fc) @@ -322,14 +345,14 @@ proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, dst: N elif (m.payload.len() != 0 and m.enrs.len() == 0): return LookupResult(kind: Content, content: m.payload) elif ((m.payload.len() != 0 and m.enrs.len() != 0)): - # Both payload and enrs are filled, which means protocol breach. For now + # Both payload and enrs are filled, which means protocol breach. For now # just logging offending node to quickly identify it warn "Invalid foundcontent response form node ", uri = toURI(dst.record) return LookupResult(kind: Nodes, nodes: nodes) else: return LookupResult(kind: Nodes, nodes: nodes) -proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey): +proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList): Future[LookupResult] {.async.} = var nodes: seq[Node] @@ -343,8 +366,7 @@ proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey): # TODO ContentLookup and Lookup look almost exactly the same, also lookups in other # networks will probably be very similar. Extract lookup function to separate module # and make it more generaic -proc contentLookup*(p: PortalProtocol, target: ContentKey): Future[Option[ByteList]] {.async.} = - let targetId = contentIdAsUint256(toContentId(target)) +proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): Future[Option[ByteList]] {.async.} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. # `closestNodes` holds the k closest nodes to target found, sorted by distance @@ -406,7 +428,7 @@ proc contentLookup*(p: PortalProtocol, target: ContentKey): Future[Option[ByteLi f.cancel() return some(lookupResult.content) - + return none[ByteList]() proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] diff --git a/fluffy/tests/test_content_network.nim b/fluffy/tests/test_content_network.nim index c229995bf..9bbdd29a6 100644 --- a/fluffy/tests/test_content_network.nim +++ b/fluffy/tests/test_content_network.nim @@ -11,7 +11,7 @@ import eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization], eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table, ../../nimbus/[genesis, chain_config, db/db_chain], - ../network/state/portal_protocol, ../network/state/content, + ../network/state/portal_protocol, ../network/state/content, ../network/state/portal_network, ./test_helpers proc genesisToTrie(filePath: string): HexaryTrie = @@ -37,18 +37,17 @@ procSuite "Content Network": let rng = newRng() asyncTest "Test Share Full State": let + trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") + node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = PortalProtocol.new(node1) - proto2 = PortalProtocol.new(node2) + proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie)) + proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie)) - let trie = - genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") - - proto1.contentStorage = ContentStorage(trie: trie) + check proto2.portalProtocol.addNode(node1.localNode) == Added var keys: seq[seq[byte]] for k, v in trie.replicate: @@ -64,15 +63,12 @@ procSuite "Content Network": contentType: content.ContentType.Account, nodeHash: nodeHash) - let foundContent = await proto2.findContent(proto1.baseProtocol.localNode, - contentKey) + let foundContent = await proto2.getContent(contentKey) check: - foundContent.isOk() - foundContent.get().payload.len() != 0 - foundContent.get().enrs.len() == 0 + foundContent.isSome() - let hash = hexary.keccak(foundContent.get().payload.asSeq()) + let hash = hexary.keccak(foundContent.get()) check hash.data == key await node1.closeWait() @@ -80,6 +76,7 @@ procSuite "Content Network": asyncTest "Find content in the network via content lookup": let + trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) node2 = initDiscoveryNode( @@ -88,20 +85,16 @@ procSuite "Content Network": rng, PrivateKey.random(rng[]), localAddress(20304)) - proto1 = PortalProtocol.new(node1) - proto2 = PortalProtocol.new(node2) - proto3 = PortalProtocol.new(node3) + proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie)) + proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie)) + proto3 = PortalNetwork.new(node3, ContentStorage(trie: trie)) - let trie = - genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") - - proto3.contentStorage = ContentStorage(trie: trie) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content - check proto1.addNode(proto2.baseProtocol.localNode) == Added - check proto2.addNode(proto3.baseProtocol.localNode) == Added + check proto1.portalProtocol.addNode(node2.localNode) == Added + check proto2.portalProtocol.addNode(node3.localNode) == Added - check (await proto2.ping(proto3.baseProtocol.localNode)).isOk() + check (await proto2.portalProtocol.ping(node3.localNode)).isOk() var keys: seq[seq[byte]] for k, v in trie.replicate: @@ -117,12 +110,12 @@ procSuite "Content Network": contentType: content.ContentType.Account, nodeHash: nodeHash) - let foundContent = await proto1.contentLookup(contentKey) + let foundContent = await proto1.getContent(contentKey) check: foundContent.isSome() - let hash = hexary.keccak(foundContent.get().asSeq()) + let hash = hexary.keccak(foundContent.get()) check hash.data == firstKey diff --git a/fluffy/tests/test_portal.nim b/fluffy/tests/test_portal.nim index 51d374190..540d1c669 100644 --- a/fluffy/tests/test_portal.nim +++ b/fluffy/tests/test_portal.nim @@ -9,7 +9,7 @@ import chronos, testutils/unittests, stew/shims/net, - eth/keys, eth/p2p/discoveryv5/routing_table, + eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2], eth/p2p/discoveryv5/protocol as discv5_protocol, ../network/state/portal_protocol, ./test_helpers @@ -20,6 +20,10 @@ type Default2NodeTest = ref object proto1: PortalProtocol proto2: PortalProtocol +proc testHandler(contentKey: ByteList): ContentResult = + let id = sha256.digest("test") + ContentResult(kind: ContentMissing, contentId: id) + proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = let node1 = initDiscoveryNode( @@ -27,8 +31,8 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = PortalProtocol.new(node1) - proto2 = PortalProtocol.new(node2) + proto1 = PortalProtocol.new(node1, testHandler) + proto2 = PortalProtocol.new(node2, testHandler) Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2) @@ -137,11 +141,7 @@ procSuite "Portal Tests": # table. test.proto2.start() - var nodeHash: NodeHash - - let contentKey = ContentKey(networkId: 0'u16, - contentType: ContentType.Account, - nodeHash: nodeHash) + let contentKey = List.init(@[1'u8], 2048) # content does not exist so this should provide us with the closest nodes # to the content, which is the only node in the routing table. diff --git a/fluffy/tests/test_portal_encoding.nim b/fluffy/tests/test_portal_encoding.nim index bed2bd194..9048b1c8d 100644 --- a/fluffy/tests/test_portal_encoding.nim +++ b/fluffy/tests/test_portal_encoding.nim @@ -107,19 +107,13 @@ suite "Portal Protocol Message Encodings": message.nodes.enrs[1] == ByteList(e2.raw) test "FindContent Request": - var nodeHash: NodeHash # zeroes hash let - contentKey = ContentKey( - networkId: 0'u16, - contentType: ContentType.Account, - nodeHash: nodeHash) - - contentEncoded: ByteList = encodeKeyAsList(contentKey) + contentEncoded: ByteList = List.init(@[1'u8], 2048) fn = FindContentMessage(contentKey: contentEncoded) let encoded = encodeMessage(fn) - check encoded.toHex == "05040000000000010000000000000000000000000000000000000000000000000000000000000000" + check encoded.toHex == "050400000001" let decoded = decodeMessage(encoded) check decoded.isOk()