Improve content offer (#1050)

* Improve content offer
This commit is contained in:
KonradStaniec 2022-04-11 11:25:36 +02:00 committed by GitHub
parent ded38128b5
commit eb84eb0854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 60 deletions

View File

@ -132,20 +132,18 @@ proc getBlockHeader*(
let maybeHeader = validateHeaderBytes(headerContent.content, hash)
# content is in range and valid, put into db
if maybeHeader.isSome() and h.portalProtocol.inRange(contentId):
# TODO this bit is quite troubling, currently we may trigger offer/accept
# only when content is in our db and we save content only when is in our range
# which means we cannot propagate content which is not in our range, but maybe
# in range of other nodes.
h.contentDB.put(contentId, headerContent.content)
# content is valid and in the db, it may be propagated it through the network
if maybeHeader.isSome():
# Content is valid we can propagate it to interested peers
h.portalProtocol.triggerPoke(
headerContent.nodesInterestedInContent,
keyEncoded,
contentId
headerContent.content
)
if h.portalProtocol.inRange(contentId):
# content is valid and in our range, save it into our db
h.contentDB.put(contentId, headerContent.content)
return maybeHeader
proc getBlock*(
@ -181,19 +179,16 @@ proc getBlock*(
let blockBody = maybeBody.unsafeGet()
# body is valid, propagate it to interested peers
h.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent,
keyEncoded,
bodyContent.content
)
# content is in range and valid, put into db
if h.portalProtocol.inRange(contentId):
# TODO this bit is quite troubling, currently we may trigger offer/accept
# only when content is in our db and we save content only when is in our range
# which means we cannot propagate content which is not in our range, but maybe
# in range of other nodes.
h.contentDB.put(contentId, bodyContent.content)
# content is valid and in db we may propagate it through the network
h.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent,
keyEncoded,
contentId
)
return some[Block]((header, blockBody))

View File

@ -120,6 +120,21 @@ type
PortalProtocolId* = array[2, byte]
RadiusCache* = LRUCache[NodeId, UInt256]
ContentInfo* = object
contentKey*: ByteList
content*: seq[byte]
OfferRequestType = enum
Direct, Database
OfferRequest = object
dst: Node
case kind: OfferRequestType
of Direct:
contentList: List[ContentInfo, contentKeysLimit]
of Database:
contentKeys: ContentKeysList
PortalProtocol* = ref object of TalkProtocol
protocolId*: PortalProtocolId
@ -134,7 +149,7 @@ type
revalidateLoop: Future[void]
stream*: PortalStream
radiusCache: RadiusCache
offerQueue: AsyncQueue[(Node, ContentKeysList)]
offerQueue: AsyncQueue[OfferRequest]
offerWorkers: seq[Future[void]]
PortalResult*[T] = Result[T, cstring]
@ -157,6 +172,15 @@ type
# content is in their range
nodesInterestedInContent*: seq[Node]
proc init*(
T: type ContentInfo,
contentKey: ByteList,
content: seq[byte]): T =
ContentInfo(
contentKey: contentKey,
content: content
)
proc init*(
T: type ContentLookupResult,
content: seq[byte],
@ -408,7 +432,7 @@ proc new*(T: type PortalProtocol,
dataRadius: dataRadius,
bootstrapRecords: @bootstrapRecords,
radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[(Node, ContentKeysList)](concurrentOffers))
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers))
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id")
@ -588,35 +612,53 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
else:
return err("Content message returned invalid ENRs")
# TODO: Depending on how this gets used, it might be better not to request
# the data from the database here, but pass it as parameter. (like, if it was
# just received it and now needs to be forwarded)
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
Future[PortalResult[void]] {.async.} =
let acceptMessageResponse = await p.offerImpl(dst, contentKeys)
portal_content_keys_offered.observe(contentKeys.len().int64)
proc getContentKeys(o: OfferRequest): ContentKeysList =
case o.kind
of Direct:
var contentKeys:ContentKeysList
for info in o.contentList:
discard contentKeys.add(info.contentKey)
return contentKeys
of Database:
return o.contentKeys
proc offer(p: PortalProtocol, o: OfferRequest):
Future[PortalResult[void]] {.async.} =
## Offer triggers offer-accept interaction with one peer
## Whole flow has two phases:
## 1. Come to an agreement on what content to transfer, by using offer and accept
## messages.
## 2. Open uTP stream from content provider to content receiver and transfer
## agreed content.
## There are two types of possible offer requests:
## Direct - when caller provides content to transfer. This way, content is
## guaranteed to be transferred as it stays in memory until whole transfer
## is completed.
## Database - when caller provides keys of content to be transferred. This
## way content is provided from database just before it is transferred through
## uTP socket. This is useful when there is a lot of content to be transferred
## to many peers, and keeping it all in memory could exhaust node resources.
## Main drawback is that content may be deleted from the node database
## by the cleanup process before it will be transferred, so this way does not
## guarantee content transfer
let contentKeys = getContentKeys(o)
let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys)
if acceptMessageResponse.isOk():
let m = acceptMessageResponse.get()
# Filter contentKeys with bitlist
var requestedContentKeys: seq[ByteList]
for i, b in m.contentKeys:
if b:
requestedContentKeys.add(contentKeys[i])
let contentKeysAmount = requestedContentKeys.len()
portal_content_keys_accepted.observe(contentKeysAmount.int64)
if contentKeysAmount == 0:
let acceptedKeysAmount = m.contentKeys.countOnes()
portal_content_keys_accepted.observe(acceptedKeysAmount.int64)
if acceptedKeysAmount == 0:
# Don't open an uTP stream if no content was requested
return ok()
let nodeAddress = NodeAddress.init(dst)
let nodeAddress = NodeAddress.init(o.dst)
if nodeAddress.isNone():
# It should not happen as we are already after succesfull talkreq/talkresp
# cycle
error "Trying to connect to node with unknown address",
id = dst.id
id = o.dst.id
return err("Trying to connect to node with unknown address")
let connectionResult =
@ -632,31 +674,59 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
let clientSocket = connectionResult.get()
for contentKey in requestedContentKeys:
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isSome():
let
contentId = contentIdOpt.get()
maybeContent = p.contentDB.get(contentId)
if maybeContent.isSome():
let content = maybeContent.get()
let dataWritten = await clientSocket.write(content)
case o.kind
of Direct:
for i, b in m.contentKeys:
if b:
let dataWritten = await clientSocket.write(o.contentList[i].content)
if dataWritten.isErr:
error "Error writing requested data", error = dataWritten.error
# No point in trying to continue writing data
clientSocket.close()
return err("Error writing requested data")
of Database:
for i, b in m.contentKeys:
if b:
let contentIdOpt = p.toContentId(o.contentKeys[i])
if contentIdOpt.isSome():
let
contentId = contentIdOpt.get()
maybeContent = p.contentDB.get(contentId)
if maybeContent.isSome():
let content = maybeContent.get()
let dataWritten = await clientSocket.write(content)
if dataWritten.isErr:
error "Error writing requested data", error = dataWritten.error
# No point in trying to continue writing data
clientSocket.close()
return err("Error writing requested data")
await clientSocket.closeWait()
return ok()
else:
return err("No accept response")
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
Future[PortalResult[void]] {.async.} =
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
let res = await p.offer(req)
return res
proc offer*(p: PortalProtocol, dst: Node, content: seq[ContentInfo]):
Future[PortalResult[void]] {.async.} =
if len(content) > contentKeysLimit:
return err("Cannot offer more than 64 content items")
let contentList = List[ContentInfo, contentKeysLimit].init(content)
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
let res = await p.offer(req)
return res
proc offerWorker(p: PortalProtocol) {.async.} =
while true:
let (node, contentKeys) = await p.offerQueue.popFirst()
let req = await p.offerQueue.popFirst()
let res = await p.offer(node, contentKeys)
let res = await p.offer(req)
if res.isOk():
portal_gossip_offers_successful.inc(labelValues = [$p.protocolId])
else:
@ -681,7 +751,8 @@ proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.asyn
NodeId(contentId), k = 6, seenOnly = false)
for node in closestNodes:
await p.offerQueue.addLast((node, contentKeys))
let req = OfferRequest(dst: node, kind: Database, contentKeys: contentKeys)
await p.offerQueue.addLast(req)
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
@ -785,19 +856,16 @@ proc triggerPoke*(
p: PortalProtocol,
nodes: seq[Node],
contentKey: ByteList,
contentId: ContentId) =
content: seq[byte]) =
## Triggers asynchronous offer-accept interaction to provided nodes.
## Provided content should be in range of provided nodes
## Provided content should be in database
## TODO Related to todo in `proc offer` it maybe better to pass content to
## offer directly to avoid potential problems when content is not really in database
## this will be especially important when we introduce deleting content
## from database
let keys = ContentKeysList.init(@[contentKey])
for node in nodes:
if not p.offerQueue.full():
try:
p.offerQueue.putNoWait((node, keys))
let ci = ContentInfo(contentKey: contentKey, content: content)
let list = List[ContentInfo, contentKeysLimit].init(@[ci])
let req = OfferRequest(dst: node, kind: Direct, contentList: list)
p.offerQueue.putNoWait(req)
except AsyncQueueFullError as e:
# should not happen as we always check is full before putting element to the queue
raiseAssert(e.msg)