From 79647e55802611939dac03f4923f68cd9dc8431a Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Thu, 6 Jan 2022 09:06:05 +0100 Subject: [PATCH] Allow access to contentDB from portal wire protocol (#920) - Allow access to contentDB from portal wire protocol - Use this to do the db.get in `handleFindContent` directly - Use this to check the `contentKeys` list in `handleOffer` --- fluffy/common/common_types.nim | 2 + fluffy/network/history/history_content.nim | 2 - fluffy/network/history/history_network.nim | 36 +++---- fluffy/network/state/state_content.nim | 2 - fluffy/network/state/state_network.nim | 39 +++---- fluffy/network/wire/portal_protocol.nim | 113 ++++++++++++--------- fluffy/tests/test_discovery_rpc.nim | 7 +- fluffy/tests/test_helpers.nim | 6 -- fluffy/tests/test_portal_wire_protocol.nim | 45 ++++---- fluffy/tools/portalcli.nim | 22 ++-- 10 files changed, 139 insertions(+), 135 deletions(-) diff --git a/fluffy/common/common_types.nim b/fluffy/common/common_types.nim index d0b1ccfb7..e164df74e 100644 --- a/fluffy/common/common_types.nim +++ b/fluffy/common/common_types.nim @@ -14,3 +14,5 @@ type ByteList* = List[byte, 2048] Bytes2* = array[2, byte] Bytes32* = array[32, byte] + + ContentId* = Uint256 diff --git a/fluffy/network/history/history_content.nim b/fluffy/network/history/history_content.nim index a4c7ad277..3a09af7bf 100644 --- a/fluffy/network/history/history_content.nim +++ b/fluffy/network/history/history_content.nim @@ -38,8 +38,6 @@ type of receipts: receiptsKey*: ContentKeyType - ContentId* = Uint256 - func encode*(contentKey: ContentKey): ByteList = ByteList.init(SSZ.encode(contentKey)) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index b150d3329..8ecf9975f 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -8,7 +8,7 @@ import std/[options, sugar], stew/results, chronos, - eth/p2p/discoveryv5/[protocol, node, enr], + eth/p2p/discoveryv5/[protocol, enr], ../../content_db, ../wire/portal_protocol, ./history_content @@ -21,47 +21,41 @@ type HistoryNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB -proc getHandler(contentDB: ContentDB): ContentHandler = - return (proc (contentKey: history_content.ByteList): ContentResult = - let contentId = toContentId(contentKey) - let maybeContent = contentDB.get(contentId) - if (maybeContent.isSome()): - ContentResult(kind: ContentFound, content: maybeContent.unsafeGet()) - else: - ContentResult(kind: ContentMissing, contentId: contentId)) +proc toContentIdHandler(contentKey: ByteList): Option[ContentId] = + some(toContentId(contentKey)) proc getContent*(n: HistoryNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} = let keyEncoded = encode(key) contentId = toContentId(keyEncoded) + contentInRange = n.portalProtocol.inRange(contentId) - let nodeId = n.portalProtocol.localNode.id - - let distance = n.portalProtocol.routingTable.distance(nodeId, contentId) - let inRange = distance <= n.portalProtocol.dataRadius - - # When the content id is in our radius range, try to look it up in our db. - if inRange: + # When the content id is in the radius range, try to look it up in the db. + if contentInRange: let contentFromDB = n.contentDB.get(contentId) if contentFromDB.isSome(): return contentFromDB let content = await n.portalProtocol.contentLookup(keyEncoded, contentId) - if content.isSome() and inRange: + # When content is found and is in the radius range, store it. + if content.isSome() and contentInRange: n.contentDB.put(contentId, content.get().asSeq()) # TODO: for now returning bytes, ultimately it would be nice to return proper # domain types. return content.map(x => x.asSeq()) -proc new*(T: type HistoryNetwork, baseProtocol: protocol.Protocol, - contentDB: ContentDB , dataRadius = UInt256.high(), +proc new*( + T: type HistoryNetwork, + baseProtocol: protocol.Protocol, + contentDB: ContentDB, + dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = []): T = let portalProtocol = PortalProtocol.new( - baseProtocol, historyProtocolId, getHandler(contentDB), dataRadius, - bootstrapRecords) + baseProtocol, historyProtocolId, contentDB, toContentIdHandler, + dataRadius, bootstrapRecords) return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/state/state_content.nim b/fluffy/network/state/state_content.nim index 86762823f..ff581c87d 100644 --- a/fluffy/network/state/state_content.nim +++ b/fluffy/network/state/state_content.nim @@ -67,8 +67,6 @@ type of contractBytecode: contractBytecodeKey*: ContractBytecodeKey - ContentId* = Uint256 - func encode*(contentKey: ContentKey): ByteList = ByteList.init(SSZ.encode(contentKey)) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 7fb4a23c7..903e2fd5c 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -8,7 +8,7 @@ import std/[options, sugar], stew/results, chronos, - eth/p2p/discoveryv5/[protocol, node, enr], + eth/p2p/discoveryv5/[protocol, enr], ../../content_db, ../wire/portal_protocol, ./state_content, @@ -21,50 +21,41 @@ type StateNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB -proc getHandler(contentDB: ContentDB): ContentHandler = - return (proc (contentKey: state_content.ByteList): ContentResult = - let contentId = toContentId(contentKey) - if contentId.isSome(): - let maybeContent = contentDB.get(contentId.get()) - if (maybeContent.isSome()): - ContentResult(kind: ContentFound, content: maybeContent.unsafeGet()) - else: - ContentResult(kind: ContentMissing, contentId: contentId.get()) - else: - ContentResult(kind: ContentKeyValidationFailure, error: "")) +proc toContentIdHandler(contentKey: ByteList): Option[ContentId] = + toContentId(contentKey) proc getContent*(n: StateNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} = let keyEncoded = encode(key) contentId = toContentId(key) + contentInRange = n.portalProtocol.inRange(contentId) - let nodeId = n.portalProtocol.localNode.id - - let distance = n.portalProtocol.routingTable.distance(nodeId, contentId) - let inRange = distance <= n.portalProtocol.dataRadius - - # When the content id is in our radius range, try to look it up in our db. - if inRange: + # When the content id is in the radius range, try to look it up in the db. + if contentInRange: let contentFromDB = n.contentDB.get(contentId) if contentFromDB.isSome(): return contentFromDB let content = await n.portalProtocol.contentLookup(keyEncoded, contentId) - if content.isSome() and inRange: + # When content is found on the network and is in the radius range, store it. + if content.isSome() and contentInRange: n.contentDB.put(contentId, content.get().asSeq()) # TODO: for now returning bytes, ultimately it would be nice to return proper # domain types. return content.map(x => x.asSeq()) -proc new*(T: type StateNetwork, baseProtocol: protocol.Protocol, - contentDB: ContentDB , dataRadius = UInt256.high(), +proc new*( + T: type StateNetwork, + baseProtocol: protocol.Protocol, + contentDB: ContentDB, + dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = []): T = let portalProtocol = PortalProtocol.new( - baseProtocol, stateProtocolId, getHandler(contentDB), dataRadius, - bootstrapRecords, stateDistanceCalculator) + baseProtocol, stateProtocolId, contentDB, toContentIdHandler, + dataRadius, bootstrapRecords, stateDistanceCalculator) return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 8dc6e8e70..32097ce2d 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -15,6 +15,7 @@ import stew/results, chronicles, chronos, nimcrypto/hash, bearssl, ssz_serialization, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification], + ../../content_db, ./messages export messages, routing_table @@ -45,11 +46,8 @@ type of ContentKeyValidationFailure: error*: string - # Treating Result as typed union type. If the content is present the handler - # should return it, if not it should return the content id so that closest - # neighbours can be localized. - ContentHandler* = - proc(contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.} + ToContentIdHandler* = + proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.} PortalProtocolId* = array[2, byte] @@ -57,8 +55,9 @@ type protocolId: PortalProtocolId routingTable*: RoutingTable baseProtocol*: protocol.Protocol + contentDB*: ContentDB + toContentId: ToContentIdHandler dataRadius*: UInt256 - handleContentRequest: ContentHandler bootstrapRecords*: seq[Record] lastLookup: chronos.Moment refreshLoop: Future[void] @@ -91,6 +90,10 @@ func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] = p.routingTable.neighbours(id = id, seenOnly = seenOnly) +func inRange*(p: PortalProtocol, contentId: ContentId): bool = + let distance = p.routingTable.distance(p.localNode.id, contentId) + distance <= p.dataRadius + func handlePing(p: PortalProtocol, ping: PingMessage): seq[byte] = let customPayload = CustomPayload(dataRadius: p.dataRadius) let p = PongMessage(enrSeq: p.baseProtocol.localNode.record.seqNum, @@ -125,49 +128,61 @@ func handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] = encodeMessage(NodesMessage(total: 1, enrs: enrs)) proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = - # TODO: Should we first do a simple check on ContentId versus Radius? - # That would needs access to specific toContentId call, or we need to move it - # to handleContentRequest, which would need access to the Radius value. - let contentHandlingResult = p.handleContentRequest(fc.contentKey) - case contentHandlingResult.kind - of ContentFound: - # TODO: Need to provide uTP connectionId when content is too large for a - # single response. - let content = contentHandlingResult.content - encodeMessage(ContentMessage( - contentMessageType: contentType, content: ByteList(content))) - of ContentMissing: + let contentIdOpt = p.toContentId(fc.contentKey) + if contentIdOpt.isSome(): let - contentId = contentHandlingResult.contentId - closestNodes = p.routingTable.neighbours( - NodeId(contentId), seenOnly = true) - enrs = - closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) - encodeMessage(ContentMessage( - contentMessageType: enrsType, enrs: List[ByteList, 32](List(enrs)))) + contentId = contentIdOpt.get() + # TODO: Should we first do a simple check on ContentId versus Radius + # before accessing the database? + maybeContent = p.contentDB.get(contentId) + if maybeContent.isSome(): + let content = maybeContent.get() + # TODO: properly calculate max content size + if content.len <= 1000: + encodeMessage(ContentMessage( + contentMessageType: contentType, content: ByteList(content))) + else: + var connectionId: Bytes2 + brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId) - of ContentKeyValidationFailure: - # Return empty content response when content key validation fails - # TODO: Better would be to return no message at all, or we need to add a - # None type or so. - let content = ByteList(@[]) - encodeMessage(ContentMessage( - contentMessageType: contentType, content: content)) + encodeMessage(ContentMessage( + contentMessageType: connectionIdType, connectionId: connectionId)) + else: + # Don't have the content, send closest neighbours to content id. + let + closestNodes = p.routingTable.neighbours( + NodeId(contentId), seenOnly = true) + enrs = + closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) + encodeMessage(ContentMessage( + contentMessageType: enrsType, enrs: List[ByteList, 32](List(enrs)))) + else: + # Return empty response when content key validation fails + # TODO: Better would be to return no message at all, needs changes on + # discv5 layer. + @[] + +proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] = + var contentKeys = ContentKeysBitList.init(o.contentKeys.len) + # TODO: Do we need some protection against a peer offering lots (64x) of + # content that fits our Radius but is actually bogus? + # Additional TODO, but more of a specification clarification: What if we don't + # want any of the content? Reply with empty bitlist and a connectionId of + # all zeroes but don't actually allow an uTP connection? + for i, contentKey in o.contentKeys: + let contentIdOpt = p.toContentId(contentKey) + if contentIdOpt.isSome(): + let contentId = contentIdOpt.get() + if p.inRange(contentId): + if not p.contentDB.contains(contentId): + contentKeys.setBit(i) + else: + # Return empty response when content key validation fails + return @[] -func handleOffer(p: PortalProtocol, a: OfferMessage): seq[byte] = - let - # TODO: Not implemented: Based on the content radius and the content that is - # already stored, interest in provided content keys needs to be indicated - # by setting bits in this BitList. - # Do we need some protection here on a peer offering lots (64x) of content - # that fits our Radius but is actually bogus? - contentKeys = ContentKeysBitList.init(a.contentKeys.len) - # TODO: What if we don't want any of the content? Reply with empty bitlist - # and a connectionId of all zeroes? var connectionId: Bytes2 brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId) - # TODO: Random connection ID needs to be stored and linked with the uTP - # session that needs to be set up (start listening). + encodeMessage( AcceptMessage(connectionId: connectionId, contentKeys: contentKeys)) @@ -176,7 +191,7 @@ func handleOffer(p: PortalProtocol, a: OfferMessage): seq[byte] = # get the closest neighbours of that data from our routing table, select a # random subset and offer the same data to them. -proc messageHandler*(protocol: TalkProtocol, request: seq[byte], +proc messageHandler(protocol: TalkProtocol, request: seq[byte], srcId: NodeId, srcUdpAddress: Address): seq[byte] = doAssert(protocol of PortalProtocol) @@ -220,7 +235,8 @@ proc messageHandler*(protocol: TalkProtocol, request: seq[byte], proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, protocolId: PortalProtocolId, - contentHandler: ContentHandler, + contentDB: ContentDB, + toContentId: ToContentIdHandler, dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], distanceCalculator: DistanceCalculator = XorDistanceCalculator @@ -231,14 +247,15 @@ proc new*(T: type PortalProtocol, routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop, DefaultTableIpLimits, baseProtocol.rng, distanceCalculator), baseProtocol: baseProtocol, + contentDB: contentDB, + toContentId: toContentId, dataRadius: dataRadius, - handleContentRequest: contentHandler, bootstrapRecords: @bootstrapRecords) proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect( "Only one protocol should have this id") - return proto + proto # Sends the discv5 talkreq nessage with provided Portal message, awaits and # validates the proper response, and updates the Portal Network routing table. diff --git a/fluffy/tests/test_discovery_rpc.nim b/fluffy/tests/test_discovery_rpc.nim index f337f109a..c360d8904 100644 --- a/fluffy/tests/test_discovery_rpc.nim +++ b/fluffy/tests/test_discovery_rpc.nim @@ -12,15 +12,16 @@ import json_rpc/[rpcproxy, rpcserver], json_rpc/clients/httpclient, stint,eth/p2p/discoveryv5/enr, eth/keys, eth/p2p/discoveryv5/protocol as discv5_protocol, - ../rpc/rpc_discovery_api, ./test_helpers + ../rpc/rpc_discovery_api, + ./test_helpers type TestCase = ref object - localDiscovery: discv5_protocol.Protocol + localDiscovery: discv5_protocol.Protocol server: RpcProxy client: RpcHttpClient proc setupTest(rng: ref BrHmacDrbgContext): Future[TestCase] {.async.} = - let + let localSrvAddress = "127.0.0.1" localSrvPort = 8545 ta = initTAddress(localSrvAddress, localSrvPort) diff --git a/fluffy/tests/test_helpers.nim b/fluffy/tests/test_helpers.nim index 1048ae88d..9fe8bdf8f 100644 --- a/fluffy/tests/test_helpers.nim +++ b/fluffy/tests/test_helpers.nim @@ -34,9 +34,3 @@ proc initDiscoveryNode*(rng: ref BrHmacDrbgContext, rng = rng) result.open() - -proc random*(T: type UInt256, rng: var BrHmacDrbgContext): T = - var key: UInt256 - brHmacDrbgGenerate(addr rng, addr key, csize_t(sizeof(key))) - - key diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 695b8f988..e7333785c 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -12,6 +12,7 @@ import eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2], eth/p2p/discoveryv5/protocol as discv5_protocol, ../network/wire/portal_protocol, + ../content_db, ./test_helpers const protocolId = [byte 0x50, 0x00] @@ -22,16 +23,13 @@ type Default2NodeTest = ref object proto1: PortalProtocol proto2: PortalProtocol -proc testHandler(contentKey: ByteList): ContentResult = - let - idHash = sha256.digest("test") - id = readUintBE[256](idHash.data) - # TODO: Ideally we can return here a more valid content id. But that depends +proc testHandler(contentKey: ByteList): Option[ContentId] = + # Note: Returning a static content id here, as in practice this depends # on the content key to content id derivation, which is different for the # different content networks. And we want these tests to be independent from - # that. Could do something specifically for these tests, when there is a test - # case that would actually test this. - ContentResult(kind: ContentMissing, contentId: id) + # that. + let idHash = sha256.digest("test") + some(readUintBE[256](idHash.data)) proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = let @@ -40,8 +38,11 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = PortalProtocol.new(node1, protocolId, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, testHandler) + db1 = ContentDB.new("", inMemory = true) + db2 = ContentDB.new("", inMemory = true) + + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler) Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2) @@ -70,7 +71,7 @@ procSuite "Portal Wire Protocol Tests": asyncTest "FindNodes/Nodes": let test = defaultTestCase(rng) - + block: # Find itself let nodes = await test.proto1.findNodes(test.proto2.localNode, List[uint16, 256](@[0'u16])) @@ -109,7 +110,7 @@ procSuite "Portal Wire Protocol Tests": nodes.isOk() nodes.get().total == 1'u8 nodes.get().enrs.len() == 1 - + await test.stopTest() asyncTest "FindContent/Content - send enrs": @@ -193,9 +194,13 @@ procSuite "Portal Wire Protocol Tests": node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) - proto1 = PortalProtocol.new(node1, protocolId, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, testHandler) - proto3 = PortalProtocol.new(node3, protocolId, testHandler) + db1 = ContentDB.new("", inMemory = true) + db2 = ContentDB.new("", inMemory = true) + db3 = ContentDB.new("", inMemory = true) + + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler) + proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content check proto1.addNode(node2.localNode) == Added @@ -220,8 +225,11 @@ procSuite "Portal Wire Protocol Tests": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = PortalProtocol.new(node1, protocolId, testHandler) - proto2 = PortalProtocol.new(node2, protocolId, testHandler, + db1 = ContentDB.new("", inMemory = true) + db2 = ContentDB.new("", inMemory = true) + + proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler) + proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler, bootstrapRecords = [node1.localNode.record]) proto1.start() @@ -241,8 +249,9 @@ procSuite "Portal Wire Protocol Tests": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) + db = ContentDB.new("", inMemory = true) # No portal protocol for node1, hence an invalid bootstrap node - proto2 = PortalProtocol.new(node2, protocolId, testHandler, + proto2 = PortalProtocol.new(node2, protocolId, db, testHandler, bootstrapRecords = [node1.localNode.record]) # seedTable to add node1 to the routing table diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index 765a19d55..a2b5d64f9 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -14,6 +14,7 @@ import eth/p2p/discoveryv5/[enr, node], eth/p2p/discoveryv5/protocol as discv5_protocol, ../common/common_utils, + ../content_db, ../network/wire/[messages, portal_protocol], ../network/state/[state_content, state_network] @@ -178,16 +179,13 @@ proc discover(d: discv5_protocol.Protocol) {.async.} = info "Lookup finished", nodes = discovered.len await sleepAsync(30.seconds) -proc testHandler(contentKey: ByteList): ContentResult = - # Note: We don't incorperate storage in this tool so we always return - # missing content. For now we are using the state network derivation but it - # could be selective based on the network the tool is used for. - let contentId = toContentId(contentKey) - if contentId.isSome(): - ContentResult(kind: ContentMissing, contentId: contentId.get()) - else: - ContentResult(kind: ContentKeyValidationFailure, - error: "Failed decoding content key") +proc testHandler(contentKey: ByteList): Option[ContentId] = + # Note: Returning a static content id here, as in practice this depends + # on the content key to content id derivation, which is different for the + # different content networks. And we want these tests to be independent from + # that. + let idHash = sha256.digest("test") + some(readUintBE[256](idHash.data)) proc run(config: PortalCliConf) = let @@ -212,7 +210,9 @@ proc run(config: PortalCliConf) = d.open() - let portal = PortalProtocol.new(d, config.protocolId, testHandler, + let db = ContentDB.new("", inMemory = true) + + let portal = PortalProtocol.new(d, config.protocolId, db, testHandler, bootstrapRecords = bootstrapRecords) if config.metricsEnabled: