diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index e92374ba2..13ffce7ca 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -12,8 +12,8 @@ import std/[sequtils, sets, algorithm], - stew/[results, byteutils], chronicles, chronos, nimcrypto/hash, bearssl, - ssz_serialization, metrics, + stew/[results, byteutils, leb128], chronicles, chronos, nimcrypto/hash, + bearssl, ssz_serialization, metrics, faststreams, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification, lru], ../../content_db, @@ -426,8 +426,8 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte], @[] proc processContent( - stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) - {.gcsafe, raises: [Defect].} + stream: PortalStream, contentKeys: ContentKeysList, + content: seq[seq[byte]]) {.gcsafe, raises: [Defect].} proc fromLogRadius(T: type UInt256, logRadius: uint16): T = # Get the max value of the logRadius range @@ -614,7 +614,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): error "Trying to connect to node with unknown address", id = dst.id return err("Trying to connect to node with unknown address") - + let connFuture = p.stream.connectTo( nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId) @@ -651,7 +651,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): # send a FIN and clean up the socket. socket.close() - if await readFut.withTimeout(p.stream.readTimeout): + if await readFut.withTimeout(p.stream.contentReadTimeout): let content = readFut.read # socket received remote FIN and drained whole buffer, it can be # safely destroyed without notifing remote @@ -666,10 +666,11 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): return err("Reading data from socket timed out, content request failed") except CancelledError as exc: # even though we already installed cancelCallback on readFut, it is worth - # catching CancelledError in case that withTimeout throws CancelledError + # catching CancelledError in case that withTimeout throws CancelledError # but readFut have already finished. debug "Socket read cancelled", socketKey = socket.socketKey + socket.close() raise exc of contentType: @@ -759,17 +760,26 @@ proc offer(p: PortalProtocol, o: OfferRequest): error = connectionResult.error return err("Error connecting uTP socket") - let clientSocket = connectionResult.get() + let socket = connectionResult.get() + + template lenu32(x: untyped): untyped = + uint32(len(x)) case o.kind of Direct: for i, b in m.contentKeys: if b: - let dataWritten = await clientSocket.write(o.contentList[i].content) + let content = o.contentList[i].content + var output = memoryOutput() + + output.write(toBytes(content.lenu32, Leb128).toOpenArray()) + output.write(content) + + let dataWritten = await socket.write(output.getOutput) if dataWritten.isErr: debug "Error writing requested data", error = dataWritten.error # No point in trying to continue writing data - clientSocket.close() + socket.close() return err("Error writing requested data") of Database: for i, b in m.contentKeys: @@ -779,16 +789,25 @@ proc offer(p: PortalProtocol, o: OfferRequest): let contentId = contentIdOpt.get() maybeContent = p.contentDB.get(contentId) + + var output = memoryOutput() if maybeContent.isSome(): let content = maybeContent.get() - let dataWritten = await clientSocket.write(content) - if dataWritten.isErr: - debug "Error writing requested data", error = dataWritten.error - # No point in trying to continue writing data - clientSocket.close() - return err("Error writing requested data") - await clientSocket.closeWait() + output.write(toBytes(content.lenu32, Leb128).toOpenArray()) + output.write(content) + else: + # When data turns out missing, add a 0 size varint + output.write(toBytes(0'u8, Leb128).toOpenArray()) + + let dataWritten = await socket.write(output.getOutput) + if dataWritten.isErr: + debug "Error writing requested data", error = dataWritten.error + # No point in trying to continue writing data + socket.close() + return err("Error writing requested data") + + await socket.closeWait() return ok() else: return err("No accept response") @@ -1062,14 +1081,21 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] = p.query(NodeId.random(p.baseProtocol.rng[])) proc neighborhoodGossip*( - p: PortalProtocol, contentKeys: ContentKeysList, content: seq[byte]) + p: PortalProtocol, contentKeys: ContentKeysList, content: seq[seq[byte]]) {.async.} = - let - # for now only 1 item is considered - contentInfo = ContentInfo(contentKey: contentKeys[0], content: content) - contentList = List[ContentInfo, contentKeysLimit].init(@[contentInfo]) - contentIdOpt = p.toContentId(contentInfo.contentKey) + if content.len() == 0: + return + var contentList = List[ContentInfo, contentKeysLimit].init(@[]) + for i, contentItem in content: + let contentInfo = + ContentInfo(contentKey: contentKeys[i], content: contentItem) + + discard contentList.add(contentInfo) + + # Just taking the first content item as target id. + # TODO: come up with something better? + let contentIdOpt = p.toContentId(contentList[0].contentKey) if contentIdOpt.isNone(): return @@ -1183,31 +1209,30 @@ proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte]) p.contentDB.put(key, content) proc processContent( - stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) - {.gcsafe, raises: [Defect].} = + stream: PortalStream, contentKeys: ContentKeysList, + content: seq[seq[byte]]) {.gcsafe, raises: [Defect].} = let p = getUserData[PortalProtocol](stream) - # TODO: - # - Implement a way to discern different content items (e.g. length prefixed) - # - Check amount of content items according to ContentKeysList - # - The above could also live in `PortalStream` - # For now we only consider 1 item being offered - if contentKeys.len() == 1: - let contentKey = contentKeys[0] - if p.validateContent(content, contentKey): + # content passed here can have less items then contentKeys, but not more. + for i, contentItem in content: + let contentKey = contentKeys[i] + if p.validateContent(contentItem, contentKey): let contentIdOpt = p.toContentId(contentKey) if contentIdOpt.isNone(): return let contentId = contentIdOpt.get() - p.storeContent(contentId, content) + p.storeContent(contentId, contentItem) info "Received valid offered content", contentKey - - asyncSpawn neighborhoodGossip(p, contentKeys, content) else: error "Received invalid offered content", contentKey + # On one invalid piece of content we drop all and don't forward any of it + # TODO: Could also filter it out and still gossip the rest. + return + + asyncSpawn neighborhoodGossip(p, contentKeys, content) proc seedTable*(p: PortalProtocol) = ## Seed the table with specifically provided Portal bootstrap nodes. These are diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index d7b5cbece..f57ae96ee 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -9,7 +9,7 @@ import std/sequtils, - chronos, stew/byteutils, chronicles, + chronos, stew/[byteutils, leb128], chronicles, eth/utp/utp_discv5_protocol, # even though utp_discv5_protocol exports this, import is still needed, # perhaps protocol.Protocol type of usage? @@ -24,7 +24,7 @@ logScope: const utpProtocolId* = "utp".toBytes() defaultConnectionTimeout = 5.seconds - defaultReadTimeout = 2.seconds + defaultContentReadTimeout = 2.seconds # TalkReq message is used as transport for uTP. It is assumed here that Portal # protocol messages were exchanged before sending uTP over discv5 data. This @@ -56,8 +56,8 @@ type timeout: Moment ContentHandlerCallback* = proc( - stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) - {.gcsafe, raises: [Defect].} + stream: PortalStream, contentKeys: ContentKeysList, + content: seq[seq[byte]]) {.gcsafe, raises: [Defect].} PortalStream* = ref object transport: UtpDiscv5Protocol @@ -77,7 +77,7 @@ type contentRequests: seq[ContentRequest] contentOffers: seq[ContentOffer] connectionTimeout: Duration - readTimeout*: Duration + contentReadTimeout*: Duration rng: ref BrHmacDrbgContext udata: pointer contentHandler: ContentHandlerCallback @@ -177,7 +177,7 @@ proc connectTo*( let socket = socketRes.get() return ok(socket) -proc writeAndClose( +proc writeContentRequest( socket: UtpSocket[NodeAddress], stream: PortalStream, request: ContentRequest) {.async.} = let dataWritten = await socket.write(request.content) @@ -186,37 +186,97 @@ proc writeAndClose( await socket.closeWait() -proc readAndClose( +proc readVarint(socket: UtpSocket[NodeAddress]): + Future[Opt[uint32]] {.async.} = + var + buffer: array[5, byte] + + for i in 0.. 0: + return ok(lenU32) + elif bytesRead == 0: + continue + else: + return err() + +proc readContentItem(socket: UtpSocket[NodeAddress]): + Future[Opt[seq[byte]]] {.async.} = + let len = await socket.readVarint() + + if len.isOk(): + let contentItem = await socket.read(len.get()) + if contentItem.len() == len.get().int: + return ok(contentItem) + else: + return err() + else: + return err() + +proc readContentOffer( socket: UtpSocket[NodeAddress], stream: PortalStream, offer: ContentOffer) {.async.} = - # Read all bytes from the socket - # This will either end with a FIN, or because the read action times out. - # A FIN does not necessarily mean that the data read is complete. Further - # validation is required, using a length prefix here might be beneficial for - # this. - # TODO: Should also limit the amount of data to read and/or total time. - var readData = socket.read() - if await readData.withTimeout(stream.readTimeout): - let content = readData.read - if not stream.contentHandler.isNil(): - stream.contentHandler(stream, offer.contentKeys, content) + # Read number of content items according to amount of ContentKeys accepted. + # This will either end with a FIN, or because the read action times out or + # because the number of expected items was read (if this happens and no FIN + # was received yet, a FIN will be send from this side). + # None of this means that the contentItems are valid, further validation is + # required. + # Socket will be closed when this call ends. + # TODO: Currently reading from the socket 1 item at a time, and validating + # items at later time. Uncertain what is best approach here (mostly from a + # security PoV), e.g. other options such as reading all content from socket at + # once, then processing the individual content items. Or reading and + # validating one per time. + + let amount = offer.contentKeys.len() + + var contentItems: seq[seq[byte]] + for i in 0..