Fix testnet tests (#1176)

* Testnet improvements

Increase timeout for reading
Add more logs
Offer endpoint can fail due to talkReq timeout, to avoid
test failure, retry it few times until success.
This commit is contained in:
KonradStaniec 2022-07-29 14:24:07 +02:00 committed by GitHub
parent e3cabaff7f
commit a1253c67bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 38 deletions

View File

@ -280,21 +280,21 @@ proc getBlockHeader*(
let headerFromDb = n.getContentFromDb(BlockHeader, contentId) let headerFromDb = n.getContentFromDb(BlockHeader, contentId)
if headerFromDb.isSome(): if headerFromDb.isSome():
info "Fetched block header from database", hash info "Fetched block header from database", hash, contentKey = keyEncoded
return headerFromDb return headerFromDb
for i in 0..<requestRetries: for i in 0..<requestRetries:
let headerContentLookup = let headerContentLookup =
await n.portalProtocol.contentLookup(keyEncoded, contentId) await n.portalProtocol.contentLookup(keyEncoded, contentId)
if headerContentLookup.isNone(): if headerContentLookup.isNone():
warn "Failed fetching block header from the network", hash warn "Failed fetching block header from the network", hash, contentKey = keyEncoded
return none(BlockHeader) return none(BlockHeader)
let headerContent = headerContentLookup.unsafeGet() let headerContent = headerContentLookup.unsafeGet()
let res = validateBlockHeaderBytes(headerContent.content, hash) let res = validateBlockHeaderBytes(headerContent.content, hash)
if res.isOk(): if res.isOk():
info "Fetched block header from the network", hash info "Fetched block header from the network", hash, contentKey = keyEncoded
# Content is valid we can propagate it to interested peers # Content is valid we can propagate it to interested peers
n.portalProtocol.triggerPoke( n.portalProtocol.triggerPoke(
headerContent.nodesInterestedInContent, headerContent.nodesInterestedInContent,
@ -306,7 +306,7 @@ proc getBlockHeader*(
return some(res.get()) return some(res.get())
else: else:
warn "Validation of block header failed", err = res.error, hash warn "Validation of block header failed", err = res.error, hash, contentKey = keyEncoded
# Headers were requested `requestRetries` times and all failed on validation # Headers were requested `requestRetries` times and all failed on validation
return none(BlockHeader) return none(BlockHeader)
@ -324,7 +324,7 @@ proc getBlockBody*(
bodyFromDb = n.getContentFromDb(BlockBody, contentId) bodyFromDb = n.getContentFromDb(BlockBody, contentId)
if bodyFromDb.isSome(): if bodyFromDb.isSome():
info "Fetched block body from database", hash info "Fetched block body from database", hash, contentKey = keyEncoded
return bodyFromDb return bodyFromDb
for i in 0..<requestRetries: for i in 0..<requestRetries:
@ -332,7 +332,7 @@ proc getBlockBody*(
await n.portalProtocol.contentLookup(keyEncoded, contentId) await n.portalProtocol.contentLookup(keyEncoded, contentId)
if bodyContentLookup.isNone(): if bodyContentLookup.isNone():
warn "Failed fetching block body from the network", hash warn "Failed fetching block body from the network", hash, contentKey = keyEncoded
return none(BlockBody) return none(BlockBody)
let bodyContent = bodyContentLookup.unsafeGet() let bodyContent = bodyContentLookup.unsafeGet()
@ -340,7 +340,7 @@ proc getBlockBody*(
let res = validateBlockBodyBytes( let res = validateBlockBodyBytes(
bodyContent.content, header.txRoot, header.ommersHash) bodyContent.content, header.txRoot, header.ommersHash)
if res.isOk(): if res.isOk():
info "Fetched block body from the network", hash info "Fetched block body from the network", hash, contentKey = keyEncoded
# body is valid, propagate it to interested peers # body is valid, propagate it to interested peers
n.portalProtocol.triggerPoke( n.portalProtocol.triggerPoke(
@ -353,15 +353,18 @@ proc getBlockBody*(
return some(res.get()) return some(res.get())
else: else:
warn "Validation of block body failed", err = res.error, hash warn "Validation of block body failed", err = res.error, hash, contentKey = keyEncoded
return none(BlockBody) return none(BlockBody)
proc getBlock*( proc getBlock*(
n: HistoryNetwork, chainId: uint16, hash: BlockHash): n: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[Block]] {.async.} = Future[Option[Block]] {.async.} =
debug "Trying to retrieve block with hash", hash
let headerOpt = await n.getBlockHeader(chainId, hash) let headerOpt = await n.getBlockHeader(chainId, hash)
if headerOpt.isNone(): if headerOpt.isNone():
warn "Failed to get header when getting block with hash", hash
# Cannot validate block without header. # Cannot validate block without header.
return none(Block) return none(Block)
@ -370,6 +373,7 @@ proc getBlock*(
let bodyOpt = await n.getBlockBody(chainId, hash, header) let bodyOpt = await n.getBlockBody(chainId, hash, header)
if bodyOpt.isNone(): if bodyOpt.isNone():
warn "Failed to get body when gettin block with hash", hash
return none(Block) return none(Block)
let body = bodyOpt.unsafeGet() let body = bodyOpt.unsafeGet()
@ -397,14 +401,14 @@ proc getReceipts*(
let receiptsContentLookup = let receiptsContentLookup =
await n.portalProtocol.contentLookup(keyEncoded, contentId) await n.portalProtocol.contentLookup(keyEncoded, contentId)
if receiptsContentLookup.isNone(): if receiptsContentLookup.isNone():
warn "Failed fetching receipts from the network", hash warn "Failed fetching receipts from the network", hash, contentKey = keyEncoded
return none(seq[Receipt]) return none(seq[Receipt])
let receiptsContent = receiptsContentLookup.unsafeGet() let receiptsContent = receiptsContentLookup.unsafeGet()
let res = validateReceiptsBytes(receiptsContent.content, header.receiptRoot) let res = validateReceiptsBytes(receiptsContent.content, header.receiptRoot)
if res.isOk(): if res.isOk():
info "Fetched receipts from the network", hash info "Fetched receipts from the network", hash, contentKey = keyEncoded
let receipts = res.get() let receipts = res.get()
@ -419,7 +423,7 @@ proc getReceipts*(
return some(res.get()) return some(res.get())
else: else:
warn "Validation of receipts failed", err = res.error, hash warn "Validation of receipts failed", err = res.error, hash, contentKey = keyEncoded
return none(seq[Receipt]) return none(seq[Receipt])

View File

@ -588,6 +588,7 @@ proc findNodes*(
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
Future[PortalResult[FoundContent]] {.async.} = Future[PortalResult[FoundContent]] {.async.} =
let contentMessageResponse = await p.findContentImpl(dst, contentKey) let contentMessageResponse = await p.findContentImpl(dst, contentKey)
if contentMessageResponse.isOk(): if contentMessageResponse.isOk():
@ -672,6 +673,10 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes)) return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
else: else:
return err("Content message returned invalid ENRs") return err("Content message returned invalid ENRs")
else:
warn "FindContent failed due to find content request failure ", error = contentMessageResponse.error, contentKey = contentKey
return err("No content response")
proc getContentKeys(o: OfferRequest): ContentKeysList = proc getContentKeys(o: OfferRequest): ContentKeysList =
case o.kind case o.kind
@ -703,6 +708,9 @@ proc offer(p: PortalProtocol, o: OfferRequest):
## by the cleanup process before it will be transferred, so this way does not ## by the cleanup process before it will be transferred, so this way does not
## guarantee content transfer. ## guarantee content transfer.
let contentKeys = getContentKeys(o) let contentKeys = getContentKeys(o)
debug "Offering content", contentKeys = contentKeys
portal_content_keys_offered.observe(contentKeys.len().int64) portal_content_keys_offered.observe(contentKeys.len().int64)
let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys) let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys)
@ -726,6 +734,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
let acceptedKeysAmount = m.contentKeys.countOnes() let acceptedKeysAmount = m.contentKeys.countOnes()
portal_content_keys_accepted.observe(acceptedKeysAmount.int64) portal_content_keys_accepted.observe(acceptedKeysAmount.int64)
if acceptedKeysAmount == 0: if acceptedKeysAmount == 0:
debug "No content acceppted", contentKeys = contentKeys
# Don't open an uTP stream if no content was requested # Don't open an uTP stream if no content was requested
return ok() return ok()
@ -745,7 +754,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
if connectionResult.isErr(): if connectionResult.isErr():
debug "Utp connection error while trying to offer content", debug "Utp connection error while trying to offer content",
error = connectionResult.error error = connectionResult.error, contentKeys = contentKeys
return err("Error connecting uTP socket") return err("Error connecting uTP socket")
let socket = connectionResult.get() let socket = connectionResult.get()
@ -765,7 +774,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
let dataWritten = await socket.write(output.getOutput) let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr: if dataWritten.isErr:
debug "Error writing requested data", error = dataWritten.error debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys
# No point in trying to continue writing data # No point in trying to continue writing data
socket.close() socket.close()
return err("Error writing requested data") return err("Error writing requested data")
@ -790,14 +799,17 @@ proc offer(p: PortalProtocol, o: OfferRequest):
let dataWritten = await socket.write(output.getOutput) let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr: if dataWritten.isErr:
debug "Error writing requested data", error = dataWritten.error debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys
# No point in trying to continue writing data # No point in trying to continue writing data
socket.close() socket.close()
return err("Error writing requested data") return err("Error writing requested data")
debug "Content successfully offered", contentKeys = contentKeys
await socket.closeWait() await socket.closeWait()
return ok() return ok()
else: else:
warn "Offer failed due to accept request failure ", error = acceptMessageResponse.error, contentKeys = contentKeys
return err("No accept response") return err("No accept response")
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
@ -1096,7 +1108,6 @@ proc neighborhoodGossip*(
for i, contentItem in content: for i, contentItem in content:
let contentInfo = let contentInfo =
ContentInfo(contentKey: contentKeys[i], content: contentItem) ContentInfo(contentKey: contentKeys[i], content: contentItem)
discard contentList.add(contentInfo) discard contentList.add(contentInfo)
# Just taking the first content item as target id. # Just taking the first content item as target id.
@ -1142,7 +1153,6 @@ proc neighborhoodGossip*(
else: # use looked up nodes for gossip else: # use looked up nodes for gossip
portal_gossip_with_lookup.inc(labelValues = [$p.protocolId]) portal_gossip_with_lookup.inc(labelValues = [$p.protocolId])
let closestNodes = await p.lookup(NodeId(contentId)) let closestNodes = await p.lookup(NodeId(contentId))
for node in closestNodes[0..<min(closestNodes.len, maxGossipNodes)]: for node in closestNodes[0..<min(closestNodes.len, maxGossipNodes)]:
# Note: opportunistically not checking if the radius of the node is known # Note: opportunistically not checking if the radius of the node is known
# and thus if the node is in radius with the content. Reason is, these # and thus if the node is in radius with the content. Reason is, these

View File

@ -24,7 +24,7 @@ logScope:
const const
utpProtocolId* = "utp".toBytes() utpProtocolId* = "utp".toBytes()
defaultConnectionTimeout = 5.seconds defaultConnectionTimeout = 5.seconds
defaultContentReadTimeout = 2.seconds defaultContentReadTimeout = 15.seconds
# TalkReq message is used as transport for uTP. It is assumed here that Portal # TalkReq message is used as transport for uTP. It is assumed here that Portal
# protocol messages were exchanged before sending uTP over discv5 data. This # protocol messages were exchanged before sending uTP over discv5 data. This
@ -102,6 +102,9 @@ proc addContentOffer*(
# uTP protocol uses BE for all values in the header, incl. connection id. # uTP protocol uses BE for all values in the header, incl. connection id.
let id = uint16.fromBytesBE(connectionId) let id = uint16.fromBytesBE(connectionId)
debug "Register new incoming offer", contentKeys
let contentOffer = ContentOffer( let contentOffer = ContentOffer(
connectionId: id, connectionId: id,
nodeId: nodeId, nodeId: nodeId,
@ -244,12 +247,12 @@ proc readContentOffer(
else: else:
# Invalid data, stop reading content, but still process data received # Invalid data, stop reading content, but still process data received
# so far. # so far.
debug "Reading content item failed, content offer failed" debug "Reading content item failed, content offer failed", contentKeys = offer.contentKeys
break break
else: else:
# Read timed out, stop further reading, but still process data received # Read timed out, stop further reading, but still process data received
# so far. # so far.
debug "Reading data from socket timed out, content offer failed" debug "Reading data from socket timed out, content offer failed", contentKeys = offer.contentKeys
break break
if socket.atEof(): if socket.atEof():

View File

@ -9,6 +9,7 @@ import
os, os,
std/sequtils, std/sequtils,
unittest2, testutils, confutils, chronos, unittest2, testutils, confutils, chronos,
stew/byteutils,
eth/p2p/discoveryv5/random2, eth/keys, eth/p2p/discoveryv5/random2, eth/keys,
../../nimbus/rpc/[hexstrings, rpc_types], ../../nimbus/rpc/[hexstrings, rpc_types],
../rpc/portal_rpc_client, ../rpc/portal_rpc_client,
@ -52,7 +53,8 @@ proc withRetries[A](
check: CheckCallback[A], check: CheckCallback[A],
numRetries: int, numRetries: int,
initialWait: Duration, initialWait: Duration,
checkFailMessage: string): Future[A] {.async.} = checkFailMessage: string,
nodeIdx: int): Future[A] {.async.} =
## Retries given future callback until either: ## Retries given future callback until either:
## it returns successfuly and given check is true ## it returns successfuly and given check is true
## or ## or
@ -71,7 +73,8 @@ proc withRetries[A](
except CatchableError as exc: except CatchableError as exc:
if tries > numRetries: if tries > numRetries:
# if we reached max number of retries fail # if we reached max number of retries fail
raise exc let msg = "Call failed with msg: " & exc.msg & ", for node with idx: " & $nodeIdx
raise newException(ValueError, msg)
inc tries inc tries
# wait before new retry # wait before new retry
@ -81,12 +84,13 @@ proc withRetries[A](
# Sometimes we need to wait till data will be propagated over the network. # Sometimes we need to wait till data will be propagated over the network.
# To avoid long sleeps, this combinator can be used to retry some calls until # To avoid long sleeps, this combinator can be used to retry some calls until
# success or until some condition hold (or both) # success or until some condition hold (or both)
proc retryUntilDataPropagated[A]( proc retryUntil[A](
f: FutureCallback[A], f: FutureCallback[A],
c: CheckCallback[A], c: CheckCallback[A],
checkFailMessage: string): Future[A] = checkFailMessage: string,
# some reasonable limits, which will cause waits as: 1, 2, 4, 8, 16 seconds nodeIdx: int): Future[A] =
return withRetries(f, c, 5, seconds(1), checkFailMessage) # some reasonable limits, which will cause waits as: 1, 2, 4, 8, 16, 32 seconds
return withRetries(f, c, 6, seconds(1), checkFailMessage, nodeIdx)
# Note: # Note:
# When doing json-rpc requests following `RpcPostError` can occur: # When doing json-rpc requests following `RpcPostError` can occur:
@ -237,7 +241,7 @@ procSuite "Portal testnet tests":
let blockData = readBlockDataTable(dataFile) let blockData = readBlockDataTable(dataFile)
check blockData.isOk() check blockData.isOk()
for client in clients: for i, client in clients:
# Note: Once there is the Canonical Indices Network, we don't need to # Note: Once there is the Canonical Indices Network, we don't need to
# access this file anymore here for the block hashes. # access this file anymore here for the block hashes.
for hash in blockData.get().blockHashes(): for hash in blockData.get().blockHashes():
@ -246,7 +250,7 @@ procSuite "Portal testnet tests":
# add a json-rpc debug proc that returns whether the offer queue is empty or # add a json-rpc debug proc that returns whether the offer queue is empty or
# not. And then poll every node until all nodes have an empty queue. # not. And then poll every node until all nodes have an empty queue.
let content = await retryUntilDataPropagated( let content = await retryUntil(
proc (): Future[Option[BlockObject]] {.async.} = proc (): Future[Option[BlockObject]] {.async.} =
try: try:
let res = await client.eth_getBlockByHash(hash.ethHashStr(), false) let res = await client.eth_getBlockByHash(hash.ethHashStr(), false)
@ -257,7 +261,8 @@ procSuite "Portal testnet tests":
raise exc raise exc
, ,
proc (mc: Option[BlockObject]): bool = return mc.isSome(), proc (mc: Option[BlockObject]): bool = return mc.isSome(),
"Did not receive expected Block with hash " & $hash "Did not receive expected Block with hash " & hash.data.toHex(),
i
) )
check content.isSome() check content.isSome()
let blockObj = content.get() let blockObj = content.get()
@ -272,7 +277,7 @@ procSuite "Portal testnet tests":
blockHash: some(hash) blockHash: some(hash)
) )
let logs = await retryUntilDataPropagated( let logs = await retryUntil(
proc (): Future[seq[FilterLog]] {.async.} = proc (): Future[seq[FilterLog]] {.async.} =
try: try:
let res = await client.eth_getLogs(filterOptions) let res = await client.eth_getLogs(filterOptions)
@ -283,7 +288,8 @@ procSuite "Portal testnet tests":
raise exc raise exc
, ,
proc (mc: seq[FilterLog]): bool = return true, proc (mc: seq[FilterLog]): bool = return true,
"" "",
i
) )
for l in logs: for l in logs:
@ -334,14 +340,27 @@ procSuite "Portal testnet tests":
# offer content to node 1..63 # offer content to node 1..63
for i in 1..lastNodeIdx: for i in 1..lastNodeIdx:
let receipientId = nodeInfos[i].nodeId let receipientId = nodeInfos[i].nodeId
check (await clients[0].portal_history_offerContentInNodeRange(tempDbPath, receipientId, 64, 0)) let offerResponse = await retryUntil(
proc (): Future[bool] {.async.} =
try:
let res = await clients[0].portal_history_offerContentInNodeRange(tempDbPath, receipientId, 64, 0)
await clients[0].close() await clients[0].close()
return res
except CatchableError as exc:
await clients[0].close()
raise exc
,
proc (os: bool): bool = return os,
"Offer failed",
i
)
check offerResponse
for client in clients: for i, client in clients:
# Note: Once there is the Canonical Indices Network, we don't need to # Note: Once there is the Canonical Indices Network, we don't need to
# access this file anymore here for the block hashes. # access this file anymore here for the block hashes.
for hash in bd.blockHashes(): for hash in bd.blockHashes():
let content = await retryUntilDataPropagated( let content = await retryUntil(
proc (): Future[Option[BlockObject]] {.async.} = proc (): Future[Option[BlockObject]] {.async.} =
try: try:
let res = await client.eth_getBlockByHash(hash.ethHashStr(), false) let res = await client.eth_getBlockByHash(hash.ethHashStr(), false)
@ -352,7 +371,8 @@ procSuite "Portal testnet tests":
raise exc raise exc
, ,
proc (mc: Option[BlockObject]): bool = return mc.isSome(), proc (mc: Option[BlockObject]): bool = return mc.isSome(),
"Did not receive expected Block with hash " & $hash "Did not receive expected Block with hash " & hash.data.toHex(),
i
) )
check content.isSome() check content.isSome()
@ -403,11 +423,11 @@ procSuite "Portal testnet tests":
check (await clients[0].portal_history_depthContentPropagate(tempDbPath, 64)) check (await clients[0].portal_history_depthContentPropagate(tempDbPath, 64))
await clients[0].close() await clients[0].close()
for client in clients: for i, client in clients:
# Note: Once there is the Canonical Indices Network, we don't need to # Note: Once there is the Canonical Indices Network, we don't need to
# access this file anymore here for the block hashes. # access this file anymore here for the block hashes.
for hash in bd.blockHashes(): for hash in bd.blockHashes():
let content = await retryUntilDataPropagated( let content = await retryUntil(
proc (): Future[Option[BlockObject]] {.async.} = proc (): Future[Option[BlockObject]] {.async.} =
try: try:
let res = await client.eth_getBlockByHash(hash.ethHashStr(), false) let res = await client.eth_getBlockByHash(hash.ethHashStr(), false)
@ -418,7 +438,8 @@ procSuite "Portal testnet tests":
raise exc raise exc
, ,
proc (mc: Option[BlockObject]): bool = return mc.isSome(), proc (mc: Option[BlockObject]): bool = return mc.isSome(),
"Did not receive expected Block with hash " & $hash "Did not receive expected Block with hash " & hash.data.toHex(),
i
) )
check content.isSome() check content.isSome()