Allow access to contentDB from portal wire protocol (#920)

- Allow access to contentDB from portal wire protocol
- Use this to do the db.get in `handleFindContent` directly
- Use this to check the `contentKeys` list in `handleOffer`
This commit is contained in:
Kim De Mey 2022-01-06 09:06:05 +01:00 committed by GitHub
parent a8640fe57c
commit 79647e5580
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 139 additions and 135 deletions

View File

@ -14,3 +14,5 @@ type
ByteList* = List[byte, 2048]
Bytes2* = array[2, byte]
Bytes32* = array[32, byte]
ContentId* = Uint256

View File

@ -38,8 +38,6 @@ type
of receipts:
receiptsKey*: ContentKeyType
ContentId* = Uint256
func encode*(contentKey: ContentKey): ByteList =
ByteList.init(SSZ.encode(contentKey))

View File

@ -8,7 +8,7 @@
import
std/[options, sugar],
stew/results, chronos,
eth/p2p/discoveryv5/[protocol, node, enr],
eth/p2p/discoveryv5/[protocol, enr],
../../content_db,
../wire/portal_protocol,
./history_content
@ -21,47 +21,41 @@ type HistoryNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
proc getHandler(contentDB: ContentDB): ContentHandler =
return (proc (contentKey: history_content.ByteList): ContentResult =
let contentId = toContentId(contentKey)
let maybeContent = contentDB.get(contentId)
if (maybeContent.isSome()):
ContentResult(kind: ContentFound, content: maybeContent.unsafeGet())
else:
ContentResult(kind: ContentMissing, contentId: contentId))
proc toContentIdHandler(contentKey: ByteList): Option[ContentId] =
some(toContentId(contentKey))
proc getContent*(n: HistoryNetwork, key: ContentKey):
Future[Option[seq[byte]]] {.async.} =
let
keyEncoded = encode(key)
contentId = toContentId(keyEncoded)
contentInRange = n.portalProtocol.inRange(contentId)
let nodeId = n.portalProtocol.localNode.id
let distance = n.portalProtocol.routingTable.distance(nodeId, contentId)
let inRange = distance <= n.portalProtocol.dataRadius
# When the content id is in our radius range, try to look it up in our db.
if inRange:
# When the content id is in the radius range, try to look it up in the db.
if contentInRange:
let contentFromDB = n.contentDB.get(contentId)
if contentFromDB.isSome():
return contentFromDB
let content = await n.portalProtocol.contentLookup(keyEncoded, contentId)
if content.isSome() and inRange:
# When content is found and is in the radius range, store it.
if content.isSome() and contentInRange:
n.contentDB.put(contentId, content.get().asSeq())
# TODO: for now returning bytes, ultimately it would be nice to return proper
# domain types.
return content.map(x => x.asSeq())
proc new*(T: type HistoryNetwork, baseProtocol: protocol.Protocol,
contentDB: ContentDB , dataRadius = UInt256.high(),
proc new*(
T: type HistoryNetwork,
baseProtocol: protocol.Protocol,
contentDB: ContentDB,
dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = []): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, historyProtocolId, getHandler(contentDB), dataRadius,
bootstrapRecords)
baseProtocol, historyProtocolId, contentDB, toContentIdHandler,
dataRadius, bootstrapRecords)
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)

View File

@ -67,8 +67,6 @@ type
of contractBytecode:
contractBytecodeKey*: ContractBytecodeKey
ContentId* = Uint256
func encode*(contentKey: ContentKey): ByteList =
ByteList.init(SSZ.encode(contentKey))

View File

@ -8,7 +8,7 @@
import
std/[options, sugar],
stew/results, chronos,
eth/p2p/discoveryv5/[protocol, node, enr],
eth/p2p/discoveryv5/[protocol, enr],
../../content_db,
../wire/portal_protocol,
./state_content,
@ -21,50 +21,41 @@ type StateNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
proc getHandler(contentDB: ContentDB): ContentHandler =
return (proc (contentKey: state_content.ByteList): ContentResult =
let contentId = toContentId(contentKey)
if contentId.isSome():
let maybeContent = contentDB.get(contentId.get())
if (maybeContent.isSome()):
ContentResult(kind: ContentFound, content: maybeContent.unsafeGet())
else:
ContentResult(kind: ContentMissing, contentId: contentId.get())
else:
ContentResult(kind: ContentKeyValidationFailure, error: ""))
proc toContentIdHandler(contentKey: ByteList): Option[ContentId] =
toContentId(contentKey)
proc getContent*(n: StateNetwork, key: ContentKey):
Future[Option[seq[byte]]] {.async.} =
let
keyEncoded = encode(key)
contentId = toContentId(key)
contentInRange = n.portalProtocol.inRange(contentId)
let nodeId = n.portalProtocol.localNode.id
let distance = n.portalProtocol.routingTable.distance(nodeId, contentId)
let inRange = distance <= n.portalProtocol.dataRadius
# When the content id is in our radius range, try to look it up in our db.
if inRange:
# When the content id is in the radius range, try to look it up in the db.
if contentInRange:
let contentFromDB = n.contentDB.get(contentId)
if contentFromDB.isSome():
return contentFromDB
let content = await n.portalProtocol.contentLookup(keyEncoded, contentId)
if content.isSome() and inRange:
# When content is found on the network and is in the radius range, store it.
if content.isSome() and contentInRange:
n.contentDB.put(contentId, content.get().asSeq())
# TODO: for now returning bytes, ultimately it would be nice to return proper
# domain types.
return content.map(x => x.asSeq())
proc new*(T: type StateNetwork, baseProtocol: protocol.Protocol,
contentDB: ContentDB , dataRadius = UInt256.high(),
proc new*(
T: type StateNetwork,
baseProtocol: protocol.Protocol,
contentDB: ContentDB,
dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = []): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, stateProtocolId, getHandler(contentDB), dataRadius,
bootstrapRecords, stateDistanceCalculator)
baseProtocol, stateProtocolId, contentDB, toContentIdHandler,
dataRadius, bootstrapRecords, stateDistanceCalculator)
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)

View File

@ -15,6 +15,7 @@ import
stew/results, chronicles, chronos, nimcrypto/hash, bearssl,
ssz_serialization,
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
../../content_db,
./messages
export messages, routing_table
@ -45,11 +46,8 @@ type
of ContentKeyValidationFailure:
error*: string
# Treating Result as typed union type. If the content is present the handler
# should return it, if not it should return the content id so that closest
# neighbours can be localized.
ContentHandler* =
proc(contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.}
ToContentIdHandler* =
proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.}
PortalProtocolId* = array[2, byte]
@ -57,8 +55,9 @@ type
protocolId: PortalProtocolId
routingTable*: RoutingTable
baseProtocol*: protocol.Protocol
contentDB*: ContentDB
toContentId: ToContentIdHandler
dataRadius*: UInt256
handleContentRequest: ContentHandler
bootstrapRecords*: seq[Record]
lastLookup: chronos.Moment
refreshLoop: Future[void]
@ -91,6 +90,10 @@ func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
func inRange*(p: PortalProtocol, contentId: ContentId): bool =
let distance = p.routingTable.distance(p.localNode.id, contentId)
distance <= p.dataRadius
func handlePing(p: PortalProtocol, ping: PingMessage): seq[byte] =
let customPayload = CustomPayload(dataRadius: p.dataRadius)
let p = PongMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
@ -125,49 +128,61 @@ func handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
encodeMessage(NodesMessage(total: 1, enrs: enrs))
proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
# TODO: Should we first do a simple check on ContentId versus Radius?
# That would needs access to specific toContentId call, or we need to move it
# to handleContentRequest, which would need access to the Radius value.
let contentHandlingResult = p.handleContentRequest(fc.contentKey)
case contentHandlingResult.kind
of ContentFound:
# TODO: Need to provide uTP connectionId when content is too large for a
# single response.
let content = contentHandlingResult.content
encodeMessage(ContentMessage(
contentMessageType: contentType, content: ByteList(content)))
of ContentMissing:
let contentIdOpt = p.toContentId(fc.contentKey)
if contentIdOpt.isSome():
let
contentId = contentHandlingResult.contentId
closestNodes = p.routingTable.neighbours(
NodeId(contentId), seenOnly = true)
enrs =
closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
encodeMessage(ContentMessage(
contentMessageType: enrsType, enrs: List[ByteList, 32](List(enrs))))
contentId = contentIdOpt.get()
# TODO: Should we first do a simple check on ContentId versus Radius
# before accessing the database?
maybeContent = p.contentDB.get(contentId)
if maybeContent.isSome():
let content = maybeContent.get()
# TODO: properly calculate max content size
if content.len <= 1000:
encodeMessage(ContentMessage(
contentMessageType: contentType, content: ByteList(content)))
else:
var connectionId: Bytes2
brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId)
of ContentKeyValidationFailure:
# Return empty content response when content key validation fails
# TODO: Better would be to return no message at all, or we need to add a
# None type or so.
let content = ByteList(@[])
encodeMessage(ContentMessage(
contentMessageType: contentType, content: content))
encodeMessage(ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId))
else:
# Don't have the content, send closest neighbours to content id.
let
closestNodes = p.routingTable.neighbours(
NodeId(contentId), seenOnly = true)
enrs =
closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
encodeMessage(ContentMessage(
contentMessageType: enrsType, enrs: List[ByteList, 32](List(enrs))))
else:
# Return empty response when content key validation fails
# TODO: Better would be to return no message at all, needs changes on
# discv5 layer.
@[]
proc handleOffer(p: PortalProtocol, o: OfferMessage): seq[byte] =
var contentKeys = ContentKeysBitList.init(o.contentKeys.len)
# TODO: Do we need some protection against a peer offering lots (64x) of
# content that fits our Radius but is actually bogus?
# Additional TODO, but more of a specification clarification: What if we don't
# want any of the content? Reply with empty bitlist and a connectionId of
# all zeroes but don't actually allow an uTP connection?
for i, contentKey in o.contentKeys:
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isSome():
let contentId = contentIdOpt.get()
if p.inRange(contentId):
if not p.contentDB.contains(contentId):
contentKeys.setBit(i)
else:
# Return empty response when content key validation fails
return @[]
func handleOffer(p: PortalProtocol, a: OfferMessage): seq[byte] =
let
# TODO: Not implemented: Based on the content radius and the content that is
# already stored, interest in provided content keys needs to be indicated
# by setting bits in this BitList.
# Do we need some protection here on a peer offering lots (64x) of content
# that fits our Radius but is actually bogus?
contentKeys = ContentKeysBitList.init(a.contentKeys.len)
# TODO: What if we don't want any of the content? Reply with empty bitlist
# and a connectionId of all zeroes?
var connectionId: Bytes2
brHmacDrbgGenerate(p.baseProtocol.rng[], connectionId)
# TODO: Random connection ID needs to be stored and linked with the uTP
# session that needs to be set up (start listening).
encodeMessage(
AcceptMessage(connectionId: connectionId, contentKeys: contentKeys))
@ -176,7 +191,7 @@ func handleOffer(p: PortalProtocol, a: OfferMessage): seq[byte] =
# get the closest neighbours of that data from our routing table, select a
# random subset and offer the same data to them.
proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
doAssert(protocol of PortalProtocol)
@ -220,7 +235,8 @@ proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
proc new*(T: type PortalProtocol,
baseProtocol: protocol.Protocol,
protocolId: PortalProtocolId,
contentHandler: ContentHandler,
contentDB: ContentDB,
toContentId: ToContentIdHandler,
dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator
@ -231,14 +247,15 @@ proc new*(T: type PortalProtocol,
routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop,
DefaultTableIpLimits, baseProtocol.rng, distanceCalculator),
baseProtocol: baseProtocol,
contentDB: contentDB,
toContentId: toContentId,
dataRadius: dataRadius,
handleContentRequest: contentHandler,
bootstrapRecords: @bootstrapRecords)
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id")
return proto
proto
# Sends the discv5 talkreq nessage with provided Portal message, awaits and
# validates the proper response, and updates the Portal Network routing table.

View File

@ -12,15 +12,16 @@ import
json_rpc/[rpcproxy, rpcserver], json_rpc/clients/httpclient,
stint,eth/p2p/discoveryv5/enr, eth/keys,
eth/p2p/discoveryv5/protocol as discv5_protocol,
../rpc/rpc_discovery_api, ./test_helpers
../rpc/rpc_discovery_api,
./test_helpers
type TestCase = ref object
localDiscovery: discv5_protocol.Protocol
localDiscovery: discv5_protocol.Protocol
server: RpcProxy
client: RpcHttpClient
proc setupTest(rng: ref BrHmacDrbgContext): Future[TestCase] {.async.} =
let
let
localSrvAddress = "127.0.0.1"
localSrvPort = 8545
ta = initTAddress(localSrvAddress, localSrvPort)

View File

@ -34,9 +34,3 @@ proc initDiscoveryNode*(rng: ref BrHmacDrbgContext,
rng = rng)
result.open()
proc random*(T: type UInt256, rng: var BrHmacDrbgContext): T =
var key: UInt256
brHmacDrbgGenerate(addr rng, addr key, csize_t(sizeof(key)))
key

View File

@ -12,6 +12,7 @@ import
eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
eth/p2p/discoveryv5/protocol as discv5_protocol,
../network/wire/portal_protocol,
../content_db,
./test_helpers
const protocolId = [byte 0x50, 0x00]
@ -22,16 +23,13 @@ type Default2NodeTest = ref object
proto1: PortalProtocol
proto2: PortalProtocol
proc testHandler(contentKey: ByteList): ContentResult =
let
idHash = sha256.digest("test")
id = readUintBE[256](idHash.data)
# TODO: Ideally we can return here a more valid content id. But that depends
proc testHandler(contentKey: ByteList): Option[ContentId] =
# Note: Returning a static content id here, as in practice this depends
# on the content key to content id derivation, which is different for the
# different content networks. And we want these tests to be independent from
# that. Could do something specifically for these tests, when there is a test
# case that would actually test this.
ContentResult(kind: ContentMissing, contentId: id)
# that.
let idHash = sha256.digest("test")
some(readUintBE[256](idHash.data))
proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
let
@ -40,8 +38,11 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, testHandler)
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler)
Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2)
@ -70,7 +71,7 @@ procSuite "Portal Wire Protocol Tests":
asyncTest "FindNodes/Nodes":
let test = defaultTestCase(rng)
block: # Find itself
let nodes = await test.proto1.findNodes(test.proto2.localNode,
List[uint16, 256](@[0'u16]))
@ -109,7 +110,7 @@ procSuite "Portal Wire Protocol Tests":
nodes.isOk()
nodes.get().total == 1'u8
nodes.get().enrs.len() == 1
await test.stopTest()
asyncTest "FindContent/Content - send enrs":
@ -193,9 +194,13 @@ procSuite "Portal Wire Protocol Tests":
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, testHandler)
proto3 = PortalProtocol.new(node3, protocolId, testHandler)
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db3 = ContentDB.new("", inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler)
proto3 = PortalProtocol.new(node3, protocolId, db3, testHandler)
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
check proto1.addNode(node2.localNode) == Added
@ -220,8 +225,11 @@ procSuite "Portal Wire Protocol Tests":
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, testHandler,
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler)
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandler,
bootstrapRecords = [node1.localNode.record])
proto1.start()
@ -241,8 +249,9 @@ procSuite "Portal Wire Protocol Tests":
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
db = ContentDB.new("", inMemory = true)
# No portal protocol for node1, hence an invalid bootstrap node
proto2 = PortalProtocol.new(node2, protocolId, testHandler,
proto2 = PortalProtocol.new(node2, protocolId, db, testHandler,
bootstrapRecords = [node1.localNode.record])
# seedTable to add node1 to the routing table

View File

@ -14,6 +14,7 @@ import
eth/p2p/discoveryv5/[enr, node],
eth/p2p/discoveryv5/protocol as discv5_protocol,
../common/common_utils,
../content_db,
../network/wire/[messages, portal_protocol],
../network/state/[state_content, state_network]
@ -178,16 +179,13 @@ proc discover(d: discv5_protocol.Protocol) {.async.} =
info "Lookup finished", nodes = discovered.len
await sleepAsync(30.seconds)
proc testHandler(contentKey: ByteList): ContentResult =
# Note: We don't incorperate storage in this tool so we always return
# missing content. For now we are using the state network derivation but it
# could be selective based on the network the tool is used for.
let contentId = toContentId(contentKey)
if contentId.isSome():
ContentResult(kind: ContentMissing, contentId: contentId.get())
else:
ContentResult(kind: ContentKeyValidationFailure,
error: "Failed decoding content key")
proc testHandler(contentKey: ByteList): Option[ContentId] =
# Note: Returning a static content id here, as in practice this depends
# on the content key to content id derivation, which is different for the
# different content networks. And we want these tests to be independent from
# that.
let idHash = sha256.digest("test")
some(readUintBE[256](idHash.data))
proc run(config: PortalCliConf) =
let
@ -212,7 +210,9 @@ proc run(config: PortalCliConf) =
d.open()
let portal = PortalProtocol.new(d, config.protocolId, testHandler,
let db = ContentDB.new("", inMemory = true)
let portal = PortalProtocol.new(d, config.protocolId, db, testHandler,
bootstrapRecords = bootstrapRecords)
if config.metricsEnabled: