Adhere to transport limits in seed methods (#1186)
* Add means to limit offered content to fit talkreq * Add test for history network limits * Change seed method api to return num of items offered
This commit is contained in:
parent
4f8d10f338
commit
f0cd340163
|
@ -18,6 +18,12 @@ import
|
|||
export ssz_serialization, common_types, hash
|
||||
|
||||
## Types and calls for history network content keys
|
||||
const
|
||||
# Maximum content key size comes from:
|
||||
# 34 bytes for ssz serialized BlockKey
|
||||
# 1 byte for contentType
|
||||
# TODO it would be nice to caluclate it somehow from the object definition (macro?)
|
||||
maxContentKeySize* = 35
|
||||
|
||||
type
|
||||
ContentType* = enum
|
||||
|
|
|
@ -8,9 +8,11 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/math,
|
||||
chronos,
|
||||
eth/p2p/discoveryv5/[node, random2],
|
||||
./wire/portal_protocol,
|
||||
./history/[history_content, history_network],
|
||||
../seed_db
|
||||
|
||||
# Experimental module which implements different content seeding strategies.
|
||||
|
@ -27,6 +29,19 @@ import
|
|||
# and creating few procs which would start/stop given seedTask or even few
|
||||
# seed tasks
|
||||
|
||||
const
|
||||
#TODO currently we are using value for history network, but this should be
|
||||
#caluculated per netowork basis
|
||||
maxItemsPerOfferBySize = getMaxOfferedContentKeys(
|
||||
uint32(len(history_network.historyProtocolId)),
|
||||
uint32(history_content.maxContentKeySize)
|
||||
)
|
||||
|
||||
# Offering is restricted to max 64 items
|
||||
maxItemPerOfferByLen = 64
|
||||
|
||||
maxItemsPerOffer = min(maxItemsPerOfferBySize, maxItemPerOfferByLen)
|
||||
|
||||
proc depthContentPropagate*(
|
||||
p: PortalProtocol, seedDbPath: string, maxClosestNodes: uint32):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
|
@ -35,7 +50,7 @@ proc depthContentPropagate*(
|
|||
## offer as much content as possible in their range from seed db. Offers are made conccurently
|
||||
## with at most one offer per peer at the time.
|
||||
|
||||
const batchSize = 64
|
||||
const batchSize = maxItemsPerOffer
|
||||
|
||||
var gossipWorkers: seq[Future[void]]
|
||||
|
||||
|
@ -148,7 +163,7 @@ proc breadthContentPropagate*(
|
|||
|
||||
let
|
||||
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
batchSize = 64
|
||||
batchSize = maxItemsPerOffer
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
target = p.localNode.id
|
||||
|
||||
|
@ -156,7 +171,7 @@ proc breadthContentPropagate*(
|
|||
|
||||
while true:
|
||||
# Setting radius to `UInt256.high` and using batchSize and offset, means
|
||||
# we will iterate over whole database in batches of 64 items
|
||||
# we will iterate over whole database in batches of `maxItemsPerOffer` items
|
||||
var contentData = db.getContentInRange(target, UInt256.high, batchSize, offset)
|
||||
|
||||
if len(contentData) == 0:
|
||||
|
@ -187,14 +202,17 @@ proc offerContentInNodeRange*(
|
|||
seedDbPath: string,
|
||||
nodeId: NodeId,
|
||||
max: uint32,
|
||||
starting: uint32): Future[PortalResult[void]] {.async.} =
|
||||
starting: uint32): Future[PortalResult[int]] {.async.} =
|
||||
## Offers `max` closest elements starting from `starting` index to peer
|
||||
## with given `nodeId`.
|
||||
## Maximum value of `max` is 64 , as this is limit for single offer.
|
||||
## Maximum value of `max` is 64 , as this is limit for single offer. Although
|
||||
## `starting` argument is needed as seed_db is read only, so if there is
|
||||
## more content in peer range than max, then to offer 64 closest elements
|
||||
# it needs to be set to 0. To offer next 64 elements it need to be set to
|
||||
# 64 etc.
|
||||
## it needs to be set to 0. To offer next 64 elements it need to be set to
|
||||
## 64 etc.
|
||||
## Return number of items really offered to remote peer.
|
||||
|
||||
let numberToToOffer = min(int(max), maxItemsPerOffer)
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
|
@ -211,7 +229,7 @@ proc offerContentInNodeRange*(
|
|||
let
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
(node, radius) = maybeNodeAndRadius.unsafeGet()
|
||||
content = db.getContentInRange(node.id, radius, int64(max), int64(starting))
|
||||
content = db.getContentInRange(node.id, radius, int64(numberToToOffer), int64(starting))
|
||||
|
||||
# We got all we wanted from seed_db, it can be closed now.
|
||||
db.close()
|
||||
|
@ -223,11 +241,15 @@ proc offerContentInNodeRange*(
|
|||
let info = ContentInfo(contentKey: k, content: cont.content)
|
||||
ci.add(info)
|
||||
|
||||
let offerResult = await p.offer(node, ci)
|
||||
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
return offerResult
|
||||
let offerResult = await p.offer(node, ci)
|
||||
|
||||
if offerResult.isOk():
|
||||
return ok(len(content))
|
||||
else:
|
||||
return err(offerResult.error)
|
||||
|
||||
|
||||
proc storeContentInNodeRange*(
|
||||
p: PortalProtocol,
|
||||
|
|
|
@ -20,6 +20,13 @@ export ssz_serialization, stint, common_types
|
|||
|
||||
const
|
||||
contentKeysLimit* = 64
|
||||
# overhead of content message is a result of 1byte for kind enum, and
|
||||
# 4 bytes for offset in ssz serialization
|
||||
offerMessageOverhead* = 5
|
||||
|
||||
# each key in ContentKeysList has uint32 offset which results in 4 bytes per
|
||||
# key overhead when serialized
|
||||
perContentKeyOverhead* = 4
|
||||
|
||||
type
|
||||
ContentKeysList* = List[ByteList, contentKeysLimit]
|
||||
|
@ -166,3 +173,18 @@ func optToResult*[T, E](opt: Option[T], e: E): Result[T, E] =
|
|||
|
||||
func getInnerMessageResult*[T: SomeMessage](m: Message, errMessage: cstring): Result[T, cstring] =
|
||||
optToResult(getInnnerMessage[T](m), errMessage)
|
||||
|
||||
func getTalkReqOverhead*(protocolIdLen: int): int =
|
||||
return (
|
||||
16 + # IV size
|
||||
55 + # header size
|
||||
1 + # talkReq msg id
|
||||
3 + # rlp encoding outer list, max length will be encoded in 2 bytes
|
||||
9 + # request id (max = 8) + 1 byte from rlp encoding byte string
|
||||
protocolIdLen + 1 + # + 1 is necessary due to rlp encoding of byte string
|
||||
3 + # rlp encoding response byte string, max length in 2 bytes
|
||||
16 # HMAC
|
||||
)
|
||||
|
||||
func getTalkReqOverhead*(protocolId: openArray[byte]): int =
|
||||
return getTalkReqOverhead(len(protocolId))
|
||||
|
|
|
@ -692,6 +692,23 @@ proc getContentKeys(o: OfferRequest): ContentKeysList =
|
|||
of Database:
|
||||
return o.contentKeys
|
||||
|
||||
func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int =
|
||||
## Calculates how many ContentKeys will fit in one offer message which
|
||||
## will be small enouch to fit into discv5 limit.
|
||||
## This is neccesarry as contentKeysLimit (64) is sometimes to big, and even
|
||||
## half of this can be too much to fit into discv5 limits.
|
||||
|
||||
let maxTalkReqPayload = maxDiscv5PacketSize - getTalkReqOverhead(int(protocolIdLen))
|
||||
# To calculate how much bytes, `n` content keys of size `maxKeySize` will take
|
||||
# we can use following equation:
|
||||
# bytes = (n * (maxKeySize + perContentKeyOverhead)) + offerMessageOverhead
|
||||
# to calculate maximal number of keys which will will given space this can be
|
||||
# transformed to:
|
||||
# n = trunc((bytes - offerMessageOverhead) / (maxKeySize + perContentKeyOverhead))
|
||||
return (
|
||||
(maxTalkReqPayload - 5) div (int(maxKeySize) + 4)
|
||||
)
|
||||
|
||||
proc offer(p: PortalProtocol, o: OfferRequest):
|
||||
Future[PortalResult[void]] {.async.} =
|
||||
## Offer triggers offer-accept interaction with one peer
|
||||
|
|
|
@ -30,15 +30,7 @@ const
|
|||
# protocol messages were exchanged before sending uTP over discv5 data. This
|
||||
# means that a session is established and that the discv5 messages send are
|
||||
# discv5 ordinary message packets, for which below calculation applies.
|
||||
talkReqOverhead =
|
||||
16 + # IV size
|
||||
55 + # header size
|
||||
1 + # talkReq msg id
|
||||
3 + # rlp encoding outer list, max length will be encoded in 2 bytes
|
||||
9 + # request id (max = 8) + 1 byte from rlp encoding byte string
|
||||
len(utpProtocolId) + 1 + # + 1 is necessary due to rlp encoding of byte string
|
||||
3 + # rlp encoding response byte string, max length in 2 bytes
|
||||
16 # HMAC
|
||||
talkReqOverhead = getTalkReqOverhead(utpProtocolId)
|
||||
utpHeaderOverhead = 20
|
||||
maxUtpPayloadSize* = maxDiscv5PacketSize - talkReqOverhead - utpHeaderOverhead
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ proc portal_history_propagateEpochAccumulator(
|
|||
proc portal_history_storeContentInNodeRange(
|
||||
dbPath: string, max: uint32, starting: uint32): bool
|
||||
proc portal_history_offerContentInNodeRange(
|
||||
dbPath: string, nodeId: NodeId, max: uint32, starting: uint32): bool
|
||||
dbPath: string, nodeId: NodeId, max: uint32, starting: uint32): int
|
||||
proc portal_history_depthContentPropagate(
|
||||
dbPath: string, max: uint32): bool
|
||||
proc portal_history_breadthContentPropagate(
|
||||
|
|
|
@ -88,13 +88,13 @@ proc installPortalDebugApiHandlers*(
|
|||
dbPath: string,
|
||||
nodeId: NodeId,
|
||||
max: uint32,
|
||||
starting: uint32) -> bool:
|
||||
starting: uint32) -> int:
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
let offerResult = await p.offerContentInNodeRange(dbPath, nodeId, max, starting)
|
||||
|
||||
if offerResult.isOk():
|
||||
return true
|
||||
return offerResult.get()
|
||||
else:
|
||||
raise newException(ValueError, $offerResult.error)
|
||||
|
||||
|
|
|
@ -346,7 +346,7 @@ procSuite "Portal testnet tests":
|
|||
for i in 1..lastNodeIdx:
|
||||
let receipientId = nodeInfos[i].nodeId
|
||||
let offerResponse = await retryUntil(
|
||||
proc (): Future[bool] {.async.} =
|
||||
proc (): Future[int] {.async.} =
|
||||
try:
|
||||
let res = await clients[0].portal_history_offerContentInNodeRange(tempDbPath, receipientId, 64, 0)
|
||||
await clients[0].close()
|
||||
|
@ -355,11 +355,12 @@ procSuite "Portal testnet tests":
|
|||
await clients[0].close()
|
||||
raise exc
|
||||
,
|
||||
proc (os: bool): bool = return os,
|
||||
proc (os: int): bool = return true,
|
||||
"Offer failed",
|
||||
i
|
||||
)
|
||||
check offerResponse
|
||||
check:
|
||||
offerResponse > 0
|
||||
|
||||
for i, client in clients:
|
||||
# Note: Once there is the Canonical Indices Network, we don't need to
|
||||
|
|
|
@ -17,7 +17,6 @@ import
|
|||
../content_db,
|
||||
./test_helpers
|
||||
|
||||
|
||||
type HistoryNode = ref object
|
||||
discoveryProtocol*: discv5_protocol.Protocol
|
||||
historyNetwork*: HistoryNetwork
|
||||
|
@ -56,6 +55,32 @@ proc stop(hn: HistoryNode) {.async.} =
|
|||
hn.historyNetwork.stop()
|
||||
await hn.discoveryProtocol.closeWait()
|
||||
|
||||
proc containsId(hn: HistoryNode, contentId: ContentId): bool =
|
||||
return hn.historyNetwork.contentDb.get(contentId).isSome()
|
||||
|
||||
proc createEmptyHeaders(fromNum: int, toNum: int): seq[BlockHeader] =
|
||||
var headers: seq[BlockHeader]
|
||||
for i in fromNum..toNum:
|
||||
var bh = BlockHeader()
|
||||
bh.blockNumber = u256(i)
|
||||
bh.difficulty = u256(i)
|
||||
# empty so that we won't care about creating fake block bodies
|
||||
bh.ommersHash = EMPTY_UNCLE_HASH
|
||||
bh.txRoot = BLANK_ROOT_HASH
|
||||
headers.add(bh)
|
||||
return headers
|
||||
|
||||
proc headersToContentInfo(headers: seq[BlockHeader]): seq[ContentInfo] =
|
||||
var contentInfos: seq[ContentInfo]
|
||||
for h in headers:
|
||||
let headerHash = h.blockHash()
|
||||
let bk = BlockKey(chainId: 1'u16, blockHash: headerHash)
|
||||
let ck = encode(ContentKey(contentType: blockHeader, blockHeaderKey: bk))
|
||||
let headerEncoded = rlp.encode(h)
|
||||
let ci = ContentInfo(contentKey: ck, content: headerEncoded)
|
||||
contentInfos.add(ci)
|
||||
return contentInfos
|
||||
|
||||
procSuite "History Content Network":
|
||||
let rng = newRng()
|
||||
asyncTest "Get block by block number":
|
||||
|
@ -65,15 +90,7 @@ procSuite "History Content Network":
|
|||
|
||||
# enough headers so there will be at least two epochs
|
||||
let numHeaders = 9000
|
||||
var headers: seq[BlockHeader]
|
||||
for i in 0..numHeaders:
|
||||
var bh = BlockHeader()
|
||||
bh.blockNumber = u256(i)
|
||||
bh.difficulty = u256(i)
|
||||
# empty so that we won't care about creating fake block bodies
|
||||
bh.ommersHash = EMPTY_UNCLE_HASH
|
||||
bh.txRoot = BLANK_ROOT_HASH
|
||||
headers.add(bh)
|
||||
var headers: seq[BlockHeader] = createEmptyHeaders(0, numHeaders)
|
||||
|
||||
let masterAccumulator = buildAccumulator(headers)
|
||||
let epochAccumulators = buildAccumulatorData(headers)
|
||||
|
@ -121,3 +138,60 @@ procSuite "History Content Network":
|
|||
|
||||
await historyNode1.stop()
|
||||
await historyNode2.stop()
|
||||
|
||||
asyncTest "Offer maximum amout of content in one offer message":
|
||||
let
|
||||
historyNode1 = newHistoryNode(rng, 20302)
|
||||
historyNode2 = newHistoryNode(rng, 20303)
|
||||
|
||||
check historyNode1.portalWireProtocol().addNode(historyNode2.localNodeInfo()) == Added
|
||||
check historyNode2.portalWireProtocol().addNode(historyNode1.localNodeInfo()) == Added
|
||||
|
||||
check (await historyNode1.portalWireProtocol().ping(historyNode2.localNodeInfo())).isOk()
|
||||
check (await historyNode2.portalWireProtocol().ping(historyNode1.localNodeInfo())).isOk()
|
||||
|
||||
let maxOfferedHistoryContent = getMaxOfferedContentKeys(
|
||||
uint32(len(historyProtocolId)), maxContentKeySize)
|
||||
|
||||
# one header too many to fit offer message, talkReq with this amout of header will fail
|
||||
let headers = createEmptyHeaders(0, maxOfferedHistoryContent)
|
||||
let masterAccumulator = buildAccumulator(headers)
|
||||
|
||||
await historyNode1.historyNetwork.initMasterAccumulator(some(masterAccumulator))
|
||||
await historyNode2.historyNetwork.initMasterAccumulator(some(masterAccumulator))
|
||||
|
||||
let contentInfos = headersToContentInfo(headers)
|
||||
|
||||
# node 1 will offer content so it need to have it in its database
|
||||
for ci in contentInfos:
|
||||
let id = toContentId(ci.contentKey)
|
||||
historyNode1.portalWireProtocol.storeContent(id, ci.content)
|
||||
|
||||
|
||||
let offerResultTooMany = await historyNode1.portalWireProtocol.offer(
|
||||
historyNode2.localNodeInfo(),
|
||||
contentInfos
|
||||
)
|
||||
|
||||
check:
|
||||
# failing due timeout, as remote side won't respond to large discv5 packets
|
||||
offerResultTooMany.isErr()
|
||||
|
||||
for ci in contentInfos:
|
||||
let id = toContentId(ci.contentKey)
|
||||
check:
|
||||
historyNode2.containsId(id) == false
|
||||
|
||||
# one contentkey less should make offer go through
|
||||
let correctInfos = contentInfos[0..<len(contentInfos)-1]
|
||||
|
||||
let offerResultCorrect = await historyNode1.portalWireProtocol.offer(
|
||||
historyNode2.localNodeInfo(),
|
||||
correctInfos
|
||||
)
|
||||
|
||||
check:
|
||||
offerResultCorrect.isOk()
|
||||
|
||||
await historyNode1.stop()
|
||||
await historyNode2.stop()
|
||||
|
|
Loading…
Reference in New Issue