From eb84eb08547097e8a09d8bf3167bd7339c384d22 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 11 Apr 2022 11:25:36 +0200 Subject: [PATCH] Improve content offer (#1050) * Improve content offer --- fluffy/network/history/history_network.nim | 33 ++--- fluffy/network/wire/portal_protocol.nim | 150 +++++++++++++++------ 2 files changed, 123 insertions(+), 60 deletions(-) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index a33f584cf..706bedc09 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -132,20 +132,18 @@ proc getBlockHeader*( let maybeHeader = validateHeaderBytes(headerContent.content, hash) - # content is in range and valid, put into db - if maybeHeader.isSome() and h.portalProtocol.inRange(contentId): - # TODO this bit is quite troubling, currently we may trigger offer/accept - # only when content is in our db and we save content only when is in our range - # which means we cannot propagate content which is not in our range, but maybe - # in range of other nodes. - h.contentDB.put(contentId, headerContent.content) - # content is valid and in the db, it may be propagated it through the network + if maybeHeader.isSome(): + # Content is valid we can propagate it to interested peers h.portalProtocol.triggerPoke( headerContent.nodesInterestedInContent, keyEncoded, - contentId + headerContent.content ) + if h.portalProtocol.inRange(contentId): + # content is valid and in our range, save it into our db + h.contentDB.put(contentId, headerContent.content) + return maybeHeader proc getBlock*( @@ -181,19 +179,16 @@ proc getBlock*( let blockBody = maybeBody.unsafeGet() + # body is valid, propagate it to interested peers + h.portalProtocol.triggerPoke( + bodyContent.nodesInterestedInContent, + keyEncoded, + bodyContent.content + ) + # content is in range and valid, put into db if h.portalProtocol.inRange(contentId): - # TODO this bit is quite troubling, currently we may trigger offer/accept - # only when content is in our db and we save content only when is in our range - # which means we cannot propagate content which is not in our range, but maybe - # in range of other nodes. h.contentDB.put(contentId, bodyContent.content) - # content is valid and in db we may propagate it through the network - h.portalProtocol.triggerPoke( - bodyContent.nodesInterestedInContent, - keyEncoded, - contentId - ) return some[Block]((header, blockBody)) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 18a6731fa..6cefdebae 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -120,6 +120,21 @@ type PortalProtocolId* = array[2, byte] RadiusCache* = LRUCache[NodeId, UInt256] + + ContentInfo* = object + contentKey*: ByteList + content*: seq[byte] + + OfferRequestType = enum + Direct, Database + + OfferRequest = object + dst: Node + case kind: OfferRequestType + of Direct: + contentList: List[ContentInfo, contentKeysLimit] + of Database: + contentKeys: ContentKeysList PortalProtocol* = ref object of TalkProtocol protocolId*: PortalProtocolId @@ -134,7 +149,7 @@ type revalidateLoop: Future[void] stream*: PortalStream radiusCache: RadiusCache - offerQueue: AsyncQueue[(Node, ContentKeysList)] + offerQueue: AsyncQueue[OfferRequest] offerWorkers: seq[Future[void]] PortalResult*[T] = Result[T, cstring] @@ -157,6 +172,15 @@ type # content is in their range nodesInterestedInContent*: seq[Node] +proc init*( + T: type ContentInfo, + contentKey: ByteList, + content: seq[byte]): T = + ContentInfo( + contentKey: contentKey, + content: content + ) + proc init*( T: type ContentLookupResult, content: seq[byte], @@ -408,7 +432,7 @@ proc new*(T: type PortalProtocol, dataRadius: dataRadius, bootstrapRecords: @bootstrapRecords, radiusCache: RadiusCache.init(256), - offerQueue: newAsyncQueue[(Node, ContentKeysList)](concurrentOffers)) + offerQueue: newAsyncQueue[OfferRequest](concurrentOffers)) proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect( "Only one protocol should have this id") @@ -588,35 +612,53 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): else: return err("Content message returned invalid ENRs") -# TODO: Depending on how this gets used, it might be better not to request -# the data from the database here, but pass it as parameter. (like, if it was -# just received it and now needs to be forwarded) -proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): - Future[PortalResult[void]] {.async.} = - let acceptMessageResponse = await p.offerImpl(dst, contentKeys) - portal_content_keys_offered.observe(contentKeys.len().int64) +proc getContentKeys(o: OfferRequest): ContentKeysList = + case o.kind + of Direct: + var contentKeys:ContentKeysList + for info in o.contentList: + discard contentKeys.add(info.contentKey) + return contentKeys + of Database: + return o.contentKeys + +proc offer(p: PortalProtocol, o: OfferRequest): + Future[PortalResult[void]] {.async.} = + ## Offer triggers offer-accept interaction with one peer + ## Whole flow has two phases: + ## 1. Come to an agreement on what content to transfer, by using offer and accept + ## messages. + ## 2. Open uTP stream from content provider to content receiver and transfer + ## agreed content. + ## There are two types of possible offer requests: + ## Direct - when caller provides content to transfer. This way, content is + ## guaranteed to be transferred as it stays in memory until whole transfer + ## is completed. + ## Database - when caller provides keys of content to be transferred. This + ## way content is provided from database just before it is transferred through + ## uTP socket. This is useful when there is a lot of content to be transferred + ## to many peers, and keeping it all in memory could exhaust node resources. + ## Main drawback is that content may be deleted from the node database + ## by the cleanup process before it will be transferred, so this way does not + ## guarantee content transfer + let contentKeys = getContentKeys(o) + + let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys) if acceptMessageResponse.isOk(): let m = acceptMessageResponse.get() - - # Filter contentKeys with bitlist - var requestedContentKeys: seq[ByteList] - for i, b in m.contentKeys: - if b: - requestedContentKeys.add(contentKeys[i]) - - let contentKeysAmount = requestedContentKeys.len() - portal_content_keys_accepted.observe(contentKeysAmount.int64) - if contentKeysAmount == 0: + let acceptedKeysAmount = m.contentKeys.countOnes() + portal_content_keys_accepted.observe(acceptedKeysAmount.int64) + if acceptedKeysAmount == 0: # Don't open an uTP stream if no content was requested return ok() - let nodeAddress = NodeAddress.init(dst) + let nodeAddress = NodeAddress.init(o.dst) if nodeAddress.isNone(): # It should not happen as we are already after succesfull talkreq/talkresp # cycle error "Trying to connect to node with unknown address", - id = dst.id + id = o.dst.id return err("Trying to connect to node with unknown address") let connectionResult = @@ -632,31 +674,59 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): let clientSocket = connectionResult.get() - for contentKey in requestedContentKeys: - let contentIdOpt = p.toContentId(contentKey) - if contentIdOpt.isSome(): - let - contentId = contentIdOpt.get() - maybeContent = p.contentDB.get(contentId) - if maybeContent.isSome(): - let content = maybeContent.get() - let dataWritten = await clientSocket.write(content) + case o.kind + of Direct: + for i, b in m.contentKeys: + if b: + let dataWritten = await clientSocket.write(o.contentList[i].content) if dataWritten.isErr: error "Error writing requested data", error = dataWritten.error # No point in trying to continue writing data clientSocket.close() return err("Error writing requested data") + of Database: + for i, b in m.contentKeys: + if b: + let contentIdOpt = p.toContentId(o.contentKeys[i]) + if contentIdOpt.isSome(): + let + contentId = contentIdOpt.get() + maybeContent = p.contentDB.get(contentId) + if maybeContent.isSome(): + let content = maybeContent.get() + let dataWritten = await clientSocket.write(content) + if dataWritten.isErr: + error "Error writing requested data", error = dataWritten.error + # No point in trying to continue writing data + clientSocket.close() + return err("Error writing requested data") await clientSocket.closeWait() return ok() else: return err("No accept response") +proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): + Future[PortalResult[void]] {.async.} = + let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys) + let res = await p.offer(req) + return res + +proc offer*(p: PortalProtocol, dst: Node, content: seq[ContentInfo]): + Future[PortalResult[void]] {.async.} = + if len(content) > contentKeysLimit: + return err("Cannot offer more than 64 content items") + + let contentList = List[ContentInfo, contentKeysLimit].init(content) + let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList) + let res = await p.offer(req) + return res + proc offerWorker(p: PortalProtocol) {.async.} = while true: - let (node, contentKeys) = await p.offerQueue.popFirst() + let req = await p.offerQueue.popFirst() - let res = await p.offer(node, contentKeys) + let res = await p.offer(req) if res.isOk(): portal_gossip_offers_successful.inc(labelValues = [$p.protocolId]) else: @@ -681,7 +751,8 @@ proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.asyn NodeId(contentId), k = 6, seenOnly = false) for node in closestNodes: - await p.offerQueue.addLast((node, contentKeys)) + let req = OfferRequest(dst: node, kind: Database, contentKeys: contentKeys) + await p.offerQueue.addLast(req) proc processContent( stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) @@ -785,19 +856,16 @@ proc triggerPoke*( p: PortalProtocol, nodes: seq[Node], contentKey: ByteList, - contentId: ContentId) = + content: seq[byte]) = ## Triggers asynchronous offer-accept interaction to provided nodes. ## Provided content should be in range of provided nodes - ## Provided content should be in database - ## TODO Related to todo in `proc offer` it maybe better to pass content to - ## offer directly to avoid potential problems when content is not really in database - ## this will be especially important when we introduce deleting content - ## from database - let keys = ContentKeysList.init(@[contentKey]) for node in nodes: if not p.offerQueue.full(): try: - p.offerQueue.putNoWait((node, keys)) + let ci = ContentInfo(contentKey: contentKey, content: content) + let list = List[ContentInfo, contentKeysLimit].init(@[ci]) + let req = OfferRequest(dst: node, kind: Direct, contentList: list) + p.offerQueue.putNoWait(req) except AsyncQueueFullError as e: # should not happen as we always check is full before putting element to the queue raiseAssert(e.msg)