Support for multiple content items over Portal stream (#1090)

Each content item send gets prefix with varint (max u32) allowing
to send multiple content items as requested by offer/accept flow.
This commit is contained in:
Kim De Mey 2022-06-24 15:35:31 +02:00 committed by GitHub
parent d4d4e8c28f
commit cad74db423
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 179 additions and 70 deletions

View File

@ -12,8 +12,8 @@
import
std/[sequtils, sets, algorithm],
stew/[results, byteutils], chronicles, chronos, nimcrypto/hash, bearssl,
ssz_serialization, metrics,
stew/[results, byteutils, leb128], chronicles, chronos, nimcrypto/hash,
bearssl, ssz_serialization, metrics, faststreams,
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
nodes_verification, lru],
../../content_db,
@ -426,8 +426,8 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
@[]
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
{.gcsafe, raises: [Defect].}
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].}
proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
# Get the max value of the logRadius range
@ -614,7 +614,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
error "Trying to connect to node with unknown address",
id = dst.id
return err("Trying to connect to node with unknown address")
let connFuture = p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
@ -651,7 +651,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
# send a FIN and clean up the socket.
socket.close()
if await readFut.withTimeout(p.stream.readTimeout):
if await readFut.withTimeout(p.stream.contentReadTimeout):
let content = readFut.read
# socket received remote FIN and drained whole buffer, it can be
# safely destroyed without notifing remote
@ -666,10 +666,11 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
return err("Reading data from socket timed out, content request failed")
except CancelledError as exc:
# even though we already installed cancelCallback on readFut, it is worth
# catching CancelledError in case that withTimeout throws CancelledError
# catching CancelledError in case that withTimeout throws CancelledError
# but readFut have already finished.
debug "Socket read cancelled",
socketKey = socket.socketKey
socket.close()
raise exc
of contentType:
@ -759,17 +760,26 @@ proc offer(p: PortalProtocol, o: OfferRequest):
error = connectionResult.error
return err("Error connecting uTP socket")
let clientSocket = connectionResult.get()
let socket = connectionResult.get()
template lenu32(x: untyped): untyped =
uint32(len(x))
case o.kind
of Direct:
for i, b in m.contentKeys:
if b:
let dataWritten = await clientSocket.write(o.contentList[i].content)
let content = o.contentList[i].content
var output = memoryOutput()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data", error = dataWritten.error
# No point in trying to continue writing data
clientSocket.close()
socket.close()
return err("Error writing requested data")
of Database:
for i, b in m.contentKeys:
@ -779,16 +789,25 @@ proc offer(p: PortalProtocol, o: OfferRequest):
let
contentId = contentIdOpt.get()
maybeContent = p.contentDB.get(contentId)
var output = memoryOutput()
if maybeContent.isSome():
let content = maybeContent.get()
let dataWritten = await clientSocket.write(content)
if dataWritten.isErr:
debug "Error writing requested data", error = dataWritten.error
# No point in trying to continue writing data
clientSocket.close()
return err("Error writing requested data")
await clientSocket.closeWait()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
else:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data", error = dataWritten.error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
await socket.closeWait()
return ok()
else:
return err("No accept response")
@ -1062,14 +1081,21 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
p.query(NodeId.random(p.baseProtocol.rng[]))
proc neighborhoodGossip*(
p: PortalProtocol, contentKeys: ContentKeysList, content: seq[byte])
p: PortalProtocol, contentKeys: ContentKeysList, content: seq[seq[byte]])
{.async.} =
let
# for now only 1 item is considered
contentInfo = ContentInfo(contentKey: contentKeys[0], content: content)
contentList = List[ContentInfo, contentKeysLimit].init(@[contentInfo])
contentIdOpt = p.toContentId(contentInfo.contentKey)
if content.len() == 0:
return
var contentList = List[ContentInfo, contentKeysLimit].init(@[])
for i, contentItem in content:
let contentInfo =
ContentInfo(contentKey: contentKeys[i], content: contentItem)
discard contentList.add(contentInfo)
# Just taking the first content item as target id.
# TODO: come up with something better?
let contentIdOpt = p.toContentId(contentList[0].contentKey)
if contentIdOpt.isNone():
return
@ -1183,31 +1209,30 @@ proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte])
p.contentDB.put(key, content)
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
{.gcsafe, raises: [Defect].} =
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].} =
let p = getUserData[PortalProtocol](stream)
# TODO:
# - Implement a way to discern different content items (e.g. length prefixed)
# - Check amount of content items according to ContentKeysList
# - The above could also live in `PortalStream`
# For now we only consider 1 item being offered
if contentKeys.len() == 1:
let contentKey = contentKeys[0]
if p.validateContent(content, contentKey):
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in content:
let contentKey = contentKeys[i]
if p.validateContent(contentItem, contentKey):
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
p.storeContent(contentId, content)
p.storeContent(contentId, contentItem)
info "Received valid offered content", contentKey
asyncSpawn neighborhoodGossip(p, contentKeys, content)
else:
error "Received invalid offered content", contentKey
# On one invalid piece of content we drop all and don't forward any of it
# TODO: Could also filter it out and still gossip the rest.
return
asyncSpawn neighborhoodGossip(p, contentKeys, content)
proc seedTable*(p: PortalProtocol) =
## Seed the table with specifically provided Portal bootstrap nodes. These are

View File

@ -9,7 +9,7 @@
import
std/sequtils,
chronos, stew/byteutils, chronicles,
chronos, stew/[byteutils, leb128], chronicles,
eth/utp/utp_discv5_protocol,
# even though utp_discv5_protocol exports this, import is still needed,
# perhaps protocol.Protocol type of usage?
@ -24,7 +24,7 @@ logScope:
const
utpProtocolId* = "utp".toBytes()
defaultConnectionTimeout = 5.seconds
defaultReadTimeout = 2.seconds
defaultContentReadTimeout = 2.seconds
# TalkReq message is used as transport for uTP. It is assumed here that Portal
# protocol messages were exchanged before sending uTP over discv5 data. This
@ -56,8 +56,8 @@ type
timeout: Moment
ContentHandlerCallback* = proc(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
{.gcsafe, raises: [Defect].}
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].}
PortalStream* = ref object
transport: UtpDiscv5Protocol
@ -77,7 +77,7 @@ type
contentRequests: seq[ContentRequest]
contentOffers: seq[ContentOffer]
connectionTimeout: Duration
readTimeout*: Duration
contentReadTimeout*: Duration
rng: ref BrHmacDrbgContext
udata: pointer
contentHandler: ContentHandlerCallback
@ -177,7 +177,7 @@ proc connectTo*(
let socket = socketRes.get()
return ok(socket)
proc writeAndClose(
proc writeContentRequest(
socket: UtpSocket[NodeAddress], stream: PortalStream,
request: ContentRequest) {.async.} =
let dataWritten = await socket.write(request.content)
@ -186,37 +186,97 @@ proc writeAndClose(
await socket.closeWait()
proc readAndClose(
proc readVarint(socket: UtpSocket[NodeAddress]):
Future[Opt[uint32]] {.async.} =
var
buffer: array[5, byte]
for i in 0..<len(buffer):
let dataRead = await socket.read(1)
if dataRead.len() == 0:
return err()
buffer[i] = dataRead[0]
let (lenU32, bytesRead) = fromBytes(uint32, buffer.toOpenArray(0, i), Leb128)
if bytesRead > 0:
return ok(lenU32)
elif bytesRead == 0:
continue
else:
return err()
proc readContentItem(socket: UtpSocket[NodeAddress]):
Future[Opt[seq[byte]]] {.async.} =
let len = await socket.readVarint()
if len.isOk():
let contentItem = await socket.read(len.get())
if contentItem.len() == len.get().int:
return ok(contentItem)
else:
return err()
else:
return err()
proc readContentOffer(
socket: UtpSocket[NodeAddress], stream: PortalStream,
offer: ContentOffer) {.async.} =
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete. Further
# validation is required, using a length prefix here might be beneficial for
# this.
# TODO: Should also limit the amount of data to read and/or total time.
var readData = socket.read()
if await readData.withTimeout(stream.readTimeout):
let content = readData.read
if not stream.contentHandler.isNil():
stream.contentHandler(stream, offer.contentKeys, content)
# Read number of content items according to amount of ContentKeys accepted.
# This will either end with a FIN, or because the read action times out or
# because the number of expected items was read (if this happens and no FIN
# was received yet, a FIN will be send from this side).
# None of this means that the contentItems are valid, further validation is
# required.
# Socket will be closed when this call ends.
# TODO: Currently reading from the socket 1 item at a time, and validating
# items at later time. Uncertain what is best approach here (mostly from a
# security PoV), e.g. other options such as reading all content from socket at
# once, then processing the individual content items. Or reading and
# validating one per time.
let amount = offer.contentKeys.len()
var contentItems: seq[seq[byte]]
for i in 0..<amount:
let contentItemFut = socket.readContentItem()
if await contentItemFut.withTimeout(stream.contentReadTimeout):
let contentItem = contentItemFut.read
if contentItem.isOk():
contentItems.add(contentItem.get())
else:
# Invalid data, stop reading content, but still process data received
# so far.
debug "Reading content item failed, content offer failed"
break
else:
# Read timed out, stop further reading, but still process data received
# so far.
debug "Reading data from socket timed out, content offer failed"
break
if socket.atEof():
# Destroy socket and not closing as we already received FIN. Closing would
# send also a FIN from our side, see also:
# https://github.com/status-im/nim-eth/blob/b2dab4be0839c95ca2564df9eacf81995bf57802/eth/utp/utp_socket.nim#L1223
await socket.destroyWait()
else:
debug "Reading data from socket timed out, content request failed"
# Even though reading timed out, lets be nice and still send a FIN.
# This means FIN didn't arrive yet, perhaps it got dropped but it might also
# be still in flight. Closing the socket (= sending FIN) ourselves.
# Not waiting here for its ACK however, so no `closeWait`
socket.close()
if not stream.contentHandler.isNil():
stream.contentHandler(stream, offer.contentKeys, contentItems)
proc new*(
T: type PortalStream,
contentHandler: ContentHandlerCallback,
udata: ref,
connectionTimeout = defaultConnectionTimeout,
readTimeout = defaultReadTimeout,
contentReadTimeout = defaultContentReadTimeout,
rng = newRng()): T =
GC_ref(udata)
let
@ -224,7 +284,7 @@ proc new*(
contentHandler: contentHandler,
udata: cast[pointer](udata),
connectionTimeout: connectionTimeout,
readTimeout: readTimeout,
contentReadTimeout: contentReadTimeout,
rng: rng)
stream
@ -236,22 +296,22 @@ func setTransport*(stream: PortalStream, transport: UtpDiscv5Protocol) =
proc registerIncomingSocketCallback*(
streams: seq[PortalStream]): AcceptConnectionCallback[NodeAddress] =
return (
proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
proc(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] =
for stream in streams:
# Note: Connection id of uTP SYN is different from other packets, it is
# actually the peers `send_conn_id`, opposed to `receive_conn_id` for all
# other packets.
for i, request in stream.contentRequests:
if request.connectionId == client.connectionId and
request.nodeId == client.remoteAddress.nodeId:
let fut = client.writeAndClose(stream, request)
if request.connectionId == socket.connectionId and
request.nodeId == socket.remoteAddress.nodeId:
let fut = socket.writeContentRequest(stream, request)
stream.contentRequests.del(i)
return fut
for i, offer in stream.contentOffers:
if offer.connectionId == client.connectionId and
offer.nodeId == client.remoteAddress.nodeId:
let fut = client.readAndClose(stream, offer)
if offer.connectionId == socket.connectionId and
offer.nodeId == socket.remoteAddress.nodeId:
let fut = socket.readContentOffer(stream, offer)
stream.contentOffers.del(i)
return fut

View File

@ -185,7 +185,7 @@ proc historyPropagate*(
while true:
let (keys, content) = await gossipQueue.popFirst()
await p.neighborhoodGossip(keys, content)
await p.neighborhoodGossip(keys, @[content])
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
@ -230,7 +230,7 @@ proc historyPropagateBlock*(
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), value[1])
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
return ok()
else:

View File

@ -192,7 +192,7 @@ procSuite "Portal testnet tests":
# because the data needs to propagate over the nodes. What one could do is
# add a json-rpc debug proc that returns whether the offer queue is empty or
# not. And then poll every node until all nodes have an empty queue.
await sleepAsync(15.seconds)
await sleepAsync(20.seconds)
let blockData = readBlockDataTable(dataFile)
check blockData.isOk()

View File

@ -14,7 +14,8 @@ import
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
proc initDiscoveryNode*(rng: ref BrHmacDrbgContext,
proc initDiscoveryNode*(
rng: ref BrHmacDrbgContext,
privKey: PrivateKey,
address: Address,
bootstrapRecords: openArray[Record] = [],

View File

@ -8,7 +8,7 @@
{.used.}
import
std/algorithm,
std/[algorithm, sequtils],
chronos, testutils/unittests, stew/shims/net,
eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
eth/p2p/discoveryv5/protocol as discv5_protocol,
@ -170,6 +170,29 @@ procSuite "Portal Wire Protocol Tests":
await proto1.stopPortalProtocol()
await proto2.stopPortalProtocol()
asyncTest "Offer/Accept/Stream":
let (proto1, proto2) = defaultTestSetup(rng)
var content: seq[ContentInfo]
for i in 0..<contentKeysLimit:
let contentItem = ContentInfo(
contentKey: ByteList(@[byte i]), content: repeat(byte i, 5000))
content.add(contentItem)
let res = await proto1.offer(proto2.baseProtocol.localNode, content)
check:
res.isOk()
for contentInfo in content:
let receivedContent = proto2.contentDB.get(
toContentId(contentInfo.contentKey).get())
check:
receivedContent.isSome()
receivedContent.get() == contentInfo.content
await proto1.stopPortalProtocol()
await proto2.stopPortalProtocol()
asyncTest "Correctly mark node as seen after request":
let (proto1, proto2) = defaultTestSetup(rng)