Inital poke implementation (#1037)

* Initial poke implementation
This commit is contained in:
KonradStaniec 2022-04-06 13:47:23 +02:00 committed by GitHub
parent 16ecbeb902
commit 203d8e8b70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 153 additions and 16 deletions

View File

@ -130,11 +130,21 @@ proc getBlockHeader*(
let headerContent = maybeHeaderContent.unsafeGet() let headerContent = maybeHeaderContent.unsafeGet()
let maybeHeader = validateHeaderBytes(headerContent, hash) let maybeHeader = validateHeaderBytes(headerContent.content, hash)
# content is in range and valid, put into db # content is in range and valid, put into db
if maybeHeader.isSome() and h.portalProtocol.inRange(contentId): if maybeHeader.isSome() and h.portalProtocol.inRange(contentId):
h.contentDB.put(contentId, headerContent) # TODO this bit is quite troubling, currently we may trigger offer/accept
# only when content is in our db and we save content only when is in our range
# which means we cannot propagate content which is not in our range, but maybe
# in range of other nodes.
h.contentDB.put(contentId, headerContent.content)
# content is valid and in the db, it may be propagated it through the network
h.portalProtocol.triggerPoke(
headerContent.nodesInterestedInContent,
keyEncoded,
contentId
)
return maybeHeader return maybeHeader
@ -164,7 +174,7 @@ proc getBlock*(
let bodyContent = maybeBodyContent.unsafeGet() let bodyContent = maybeBodyContent.unsafeGet()
let maybeBody = validateBodyBytes(bodyContent, header.txRoot, header.ommersHash) let maybeBody = validateBodyBytes(bodyContent.content, header.txRoot, header.ommersHash)
if maybeBody.isNone(): if maybeBody.isNone():
return none(Block) return none(Block)
@ -173,7 +183,17 @@ proc getBlock*(
# content is in range and valid, put into db # content is in range and valid, put into db
if h.portalProtocol.inRange(contentId): if h.portalProtocol.inRange(contentId):
h.contentDB.put(contentId, bodyContent) # TODO this bit is quite troubling, currently we may trigger offer/accept
# only when content is in our db and we save content only when is in our range
# which means we cannot propagate content which is not in our range, but maybe
# in range of other nodes.
h.contentDB.put(contentId, bodyContent.content)
# content is valid and in db we may propagate it through the network
h.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent,
keyEncoded,
contentId
)
return some[Block]((header, blockBody)) return some[Block]((header, blockBody))

View File

@ -42,13 +42,19 @@ proc getContent*(n: StateNetwork, key: ContentKey):
let content = await n.portalProtocol.contentLookup(keyEncoded, contentId) let content = await n.portalProtocol.contentLookup(keyEncoded, contentId)
if content.isNone():
return none[seq[byte]]()
let contentResult = content.get()
# When content is found on the network and is in the radius range, store it. # When content is found on the network and is in the radius range, store it.
if content.isSome() and contentInRange: if content.isSome() and contentInRange:
n.contentDB.put(contentId, content.get()) # TODO Add poke when working on state network
n.contentDB.put(contentId, contentResult.content)
# TODO: for now returning bytes, ultimately it would be nice to return proper # TODO: for now returning bytes, ultimately it would be nice to return proper
# domain types. # domain types.
return content return some(contentResult.content)
proc new*( proc new*(
T: type StateNetwork, T: type StateNetwork,

View File

@ -19,7 +19,7 @@ import
export ssz_serialization, stint, common_types export ssz_serialization, stint, common_types
const const
contentKeysLimit = 64 contentKeysLimit* = 64
type type
ContentKeysList* = List[ByteList, contentKeysLimit] ContentKeysList* = List[ByteList, contentKeysLimit]

View File

@ -144,12 +144,28 @@ type
Content Content
FoundContent* = object FoundContent* = object
src*: Node
case kind*: FoundContentKind case kind*: FoundContentKind
of Content: of Content:
content*: seq[byte] content*: seq[byte]
of Nodes: of Nodes:
nodes*: seq[Node] nodes*: seq[Node]
ContentLookupResult* = object
content*: seq[byte]
# List of nodes which do not have requested content, and for which
# content is in their range
nodesInterestedInContent*: seq[Node]
proc init*(
T: type ContentLookupResult,
content: seq[byte],
nodesInterestedInContent: seq[Node]): T =
ContentLookupResult(
content: content,
nodesInterestedInContent: nodesInterestedInContent
)
func `$`(id: PortalProtocolId): string = func `$`(id: PortalProtocolId): string =
id.toHex() id.toHex()
@ -168,9 +184,16 @@ func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] = func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
p.routingTable.neighbours(id = id, seenOnly = seenOnly) p.routingTable.neighbours(id = id, seenOnly = seenOnly)
proc inRange(
p: PortalProtocol,
nodeId: NodeId,
nodeRadius: Uint256,
contentId: ContentId): bool =
let distance = p.routingTable.distance(nodeId, contentId)
distance <= nodeRadius
func inRange*(p: PortalProtocol, contentId: ContentId): bool = func inRange*(p: PortalProtocol, contentId: ContentId): bool =
let distance = p.routingTable.distance(p.localNode.id, contentId) p.inRange(p.localNode.id, p.dataRadius, contentId)
distance <= p.dataRadius
func truncateEnrs( func truncateEnrs(
nodes: seq[Node], maxSize: int, enrOverhead: int): List[ByteList, 32] = nodes: seq[Node], maxSize: int, enrOverhead: int): List[ByteList, 32] =
@ -549,19 +572,19 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
if await readData.withTimeout(p.stream.readTimeout): if await readData.withTimeout(p.stream.readTimeout):
let content = readData.read let content = readData.read
await socket.destroyWait() await socket.destroyWait()
return ok(FoundContent(kind: Content, content: content)) return ok(FoundContent(src: dst, kind: Content, content: content))
else: else:
socket.close() socket.close()
return err("Reading data from socket timed out, content request failed") return err("Reading data from socket timed out, content request failed")
of contentType: of contentType:
return ok(FoundContent(kind: Content, content: m.content.asSeq())) return ok(FoundContent(src: dst, kind: Content, content: m.content.asSeq()))
of enrsType: of enrsType:
let records = recordsFromBytes(m.enrs) let records = recordsFromBytes(m.enrs)
if records.isOk(): if records.isOk():
let verifiedNodes = let verifiedNodes =
verifyNodesRecords(records.get(), dst, enrsResultLimit) verifyNodesRecords(records.get(), dst, enrsResultLimit)
return ok(FoundContent(kind: Nodes, nodes: verifiedNodes)) return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
else: else:
return err("Content message returned invalid ENRs") return err("Content message returned invalid ENRs")
@ -758,11 +781,35 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
p.lastLookup = now(chronos.Moment) p.lastLookup = now(chronos.Moment)
return closestNodes return closestNodes
proc triggerPoke*(
p: PortalProtocol,
nodes: seq[Node],
contentKey: ByteList,
contentId: ContentId) =
## Triggers asynchronous offer-accept interaction to provided nodes.
## Provided content should be in range of provided nodes
## Provided content should be in database
## TODO Related to todo in `proc offer` it maybe better to pass content to
## offer directly to avoid potential problems when content is not really in database
## this will be especially important when we introduce deleting content
## from database
let keys = ContentKeysList.init(@[contentKey])
for node in nodes:
if not p.offerQueue.full():
try:
p.offerQueue.putNoWait((node, keys))
except AsyncQueueFullError as e:
# should not happen as we always check is full before putting element to the queue
raiseAssert(e.msg)
else:
# offer queue full, do not start more offer offer-accept interactions
return
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other # TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
# networks will probably be very similar. Extract lookup function to separate module # networks will probably be very similar. Extract lookup function to separate module
# and make it more generaic # and make it more generaic
proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
Future[Option[seq[byte]]] {.async.} = Future[Option[ContentLookupResult]] {.async.} =
## Perform a lookup for the given target, return the closest n nodes to the ## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`. ## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance # `closestNodes` holds the k closest nodes to target found, sorted by distance
@ -779,6 +826,8 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha) var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
var requestAmount = 0'i64 var requestAmount = 0'i64
var nodesWithoutContent: seq[Node] = newSeq[Node]()
while true: while true:
var i = 0 var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried # Doing `alpha` amount of requests at once as long as closer non queried
@ -811,6 +860,13 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
case content.kind case content.kind
of Nodes: of Nodes:
let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
nodesWithoutContent.add(content.src)
for n in content.nodes: for n in content.nodes:
if not seen.containsOrIncl(n.id): if not seen.containsOrIncl(n.id):
discard p.routingTable.addNode(n) discard p.routingTable.addNode(n)
@ -823,20 +879,20 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
if closestNodes.len > BUCKET_SIZE: if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high()) closestNodes.del(closestNodes.high())
of Content: of Content:
# cancel any pending queries as we have find the content # cancel any pending queries as we have find the content
for f in pendingQueries: for f in pendingQueries:
f.cancel() f.cancel()
portal_lookup_content_requests.observe(requestAmount) portal_lookup_content_requests.observe(requestAmount)
return some(content.content) return some(ContentLookupResult.init(content.content, nodesWithoutContent))
else: else:
# TODO: Should we do something with the node that failed responding our # TODO: Should we do something with the node that failed responding our
# query? # query?
discard discard
portal_lookup_content_failures.inc() portal_lookup_content_failures.inc()
return none[seq[byte]]() return none[ContentLookupResult]()
proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async.} = {.async.} =

View File

@ -31,6 +31,14 @@ proc testHandler(contentKey: ByteList): Option[ContentId] =
let idHash = sha256.digest("test") let idHash = sha256.digest("test")
some(readUintBE[256](idHash.data)) some(readUintBE[256](idHash.data))
proc testHandlerSha256(contentKey: ByteList): Option[ContentId] =
# Note: Returning a static content id here, as in practice this depends
# on the content key to content id derivation, which is different for the
# different content networks. And we want these tests to be independent from
# that.
let idHash = sha256.digest(contentKey.asSeq())
some(readUintBE[256](idHash.data))
proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
let let
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
@ -218,6 +226,53 @@ procSuite "Portal Wire Protocol Tests":
await node2.closeWait() await node2.closeWait()
await node3.closeWait() await node3.closeWait()
asyncTest "Content lookup should return info about nodes interested in content":
let
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db3 = ContentDB.new("", inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db1, testHandlerSha256)
proto2 = PortalProtocol.new(node2, protocolId, db2, testHandlerSha256)
proto3 = PortalProtocol.new(node3, protocolId, db3, testHandlerSha256)
content = @[byte 1, 2]
contentList = List[byte, 2048].init(content)
contentId = readUintBE[256](sha256.digest(content).data)
# Only node3 have content
db3.put(contentId, content)
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
# Node1 needs to known Node2 radius to determine if node2 is interested in content
check proto1.addNode(node2.localNode) == Added
check proto2.addNode(node3.localNode) == Added
check (await proto1.ping(node2.localNode)).isOk()
check (await proto2.ping(node3.localNode)).isOk()
let lookupResult = await proto1.contentLookup(contentList, contentId)
check:
lookupResult.isSome()
let res = lookupResult.unsafeGet()
check:
res.content == content
res.nodesInterestedInContent.contains(node2.localNode)
await node1.closeWait()
await node2.closeWait()
await node3.closeWait()
asyncTest "Valid Bootstrap Node": asyncTest "Valid Bootstrap Node":
let let
node1 = initDiscoveryNode( node1 = initDiscoveryNode(