fix req/resp protocol (#1621)
per spec, we must half-close request stream - not doing so may lead to failure of the other end to start processing our request leading to timeouts. In particular, this fixes many sync problems that have been seen on medalla. * remove safeClose - close no longer raises * use per-chunk timeouts in request processing
This commit is contained in:
parent
67ba595ce4
commit
07e7916b30
|
@ -250,10 +250,6 @@ declarePublicCounter nbc_timeout_dials,
|
||||||
declarePublicGauge nbc_peers,
|
declarePublicGauge nbc_peers,
|
||||||
"Number of active libp2p peers"
|
"Number of active libp2p peers"
|
||||||
|
|
||||||
proc safeClose(conn: Connection) {.async.} =
|
|
||||||
if not conn.closed:
|
|
||||||
await close(conn)
|
|
||||||
|
|
||||||
const
|
const
|
||||||
snappy_implementation {.strdefine.} = "libp2p"
|
snappy_implementation {.strdefine.} = "libp2p"
|
||||||
|
|
||||||
|
@ -470,7 +466,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
||||||
try:
|
try:
|
||||||
await stream.writeChunk(none ResponseCode, requestBytes)
|
await stream.writeChunk(none ResponseCode, requestBytes)
|
||||||
finally:
|
finally:
|
||||||
await safeClose(stream)
|
await stream.close()
|
||||||
|
|
||||||
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes) {.async.} =
|
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes) {.async.} =
|
||||||
inc response.writtenChunks
|
inc response.writtenChunks
|
||||||
|
@ -505,14 +501,16 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
try:
|
try:
|
||||||
# Send the request
|
# Send the request
|
||||||
await stream.writeChunk(none ResponseCode, requestBytes)
|
await stream.writeChunk(none ResponseCode, requestBytes)
|
||||||
|
# Half-close the stream to mark the end of the request - if this is not
|
||||||
|
# done, the other peer might never send us the response.
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
# Read the response
|
# Read the response
|
||||||
return awaitWithTimeout(
|
return
|
||||||
readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
|
await readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
|
||||||
peer, ResponseMsg),
|
peer, ResponseMsg, timeout)
|
||||||
deadline, neterr(ReadResponseTimeout))
|
|
||||||
finally:
|
finally:
|
||||||
await safeClose(stream)
|
await stream.close()
|
||||||
|
|
||||||
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
|
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
|
||||||
peer: Peer, conn: Connection): T =
|
peer: Peer, conn: Connection): T =
|
||||||
|
@ -676,7 +674,7 @@ proc handleIncomingStream(network: Eth2Node,
|
||||||
debug "Error processing an incoming request", err = err.msg, msgName
|
debug "Error processing an incoming request", err = err.msg, msgName
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
await safeClose(conn)
|
await conn.close()
|
||||||
|
|
||||||
proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
|
proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
|
||||||
let network = peer.network
|
let network = peer.network
|
||||||
|
|
|
@ -155,17 +155,34 @@ proc readResponseChunk(conn: Connection, peer: Peer,
|
||||||
return neterr UnexpectedEOF
|
return neterr UnexpectedEOF
|
||||||
|
|
||||||
proc readResponse(conn: Connection, peer: Peer,
|
proc readResponse(conn: Connection, peer: Peer,
|
||||||
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
|
MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.gcsafe, async.} =
|
||||||
when MsgType is seq:
|
when MsgType is seq:
|
||||||
type E = ElemType(MsgType)
|
type E = ElemType(MsgType)
|
||||||
var results: MsgType
|
var results: MsgType
|
||||||
while true:
|
while true:
|
||||||
let nextRes = await conn.readResponseChunk(peer, E)
|
# Because we interleave networking with response processing, it may
|
||||||
|
# happen that reading all chunks takes longer than a strict dealine
|
||||||
|
# timeout would allow, so we allow each chunk a new timeout instead.
|
||||||
|
# The problem is exacerbated by the large number of round-trips to the
|
||||||
|
# poll loop that each future along the way causes.
|
||||||
|
trace "reading chunk", conn
|
||||||
|
let nextFut = conn.readResponseChunk(peer, E)
|
||||||
|
if not await nextFut.withTimeout(timeout):
|
||||||
|
return neterr(ReadResponseTimeout)
|
||||||
|
let nextRes = nextFut.read()
|
||||||
if nextRes.isErr:
|
if nextRes.isErr:
|
||||||
if nextRes.error.kind == PotentiallyExpectedEOF:
|
if nextRes.error.kind == PotentiallyExpectedEOF:
|
||||||
|
trace "EOF chunk", conn, err = nextRes.error
|
||||||
|
|
||||||
return ok results
|
return ok results
|
||||||
|
trace "Error chunk", conn, err = nextRes.error
|
||||||
|
|
||||||
return err nextRes.error
|
return err nextRes.error
|
||||||
else:
|
else:
|
||||||
|
trace "Got chunk", conn
|
||||||
results.add nextRes.value
|
results.add nextRes.value
|
||||||
else:
|
else:
|
||||||
return await conn.readResponseChunk(peer, MsgType)
|
let nextFut = conn.readResponseChunk(peer, MsgType)
|
||||||
|
if not await nextFut.withTimeout(timeout):
|
||||||
|
return neterr(ReadResponseTimeout)
|
||||||
|
return nextFut.read()
|
||||||
|
|
|
@ -633,7 +633,7 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
|
||||||
debug "Error, while reading getBlocks response",
|
debug "Error, while reading getBlocks response",
|
||||||
peer = peer, slot = req.slot, count = req.count,
|
peer = peer, slot = req.slot, count = req.count,
|
||||||
step = req.step, peer_speed = peer.netKbps(),
|
step = req.step, peer_speed = peer.netKbps(),
|
||||||
topics = "syncman", error = res.error()
|
topics = "syncman", error = $res.error()
|
||||||
result = res
|
result = res
|
||||||
|
|
||||||
template headAge(): uint64 =
|
template headAge(): uint64 =
|
||||||
|
|
Loading…
Reference in New Issue