Handle gracefully a pre-mature closing of a libp2p stream by another peer
This commit is contained in:
parent
b5fad0c9e8
commit
37043f0d91
|
@ -126,6 +126,10 @@ proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
|
||||||
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
|
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
|
||||||
Eth2Node(daemon.userData).getPeer(stream.peer)
|
Eth2Node(daemon.userData).getPeer(stream.peer)
|
||||||
|
|
||||||
|
proc safeClose(stream: P2PStream) {.async.} =
|
||||||
|
if P2PStreamFlags.Closed notin stream.flags:
|
||||||
|
await close(stream)
|
||||||
|
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
||||||
# TODO: How should we notify the other peer?
|
# TODO: How should we notify the other peer?
|
||||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
|
@ -327,7 +331,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
||||||
|
|
||||||
let stream = streamFut.read
|
let stream = streamFut.read
|
||||||
defer:
|
defer:
|
||||||
await close(stream)
|
await safeClose(stream)
|
||||||
|
|
||||||
var s = init OutputStream
|
var s = init OutputStream
|
||||||
s.appendVarint requestBytes.len.uint64
|
s.appendVarint requestBytes.len.uint64
|
||||||
|
@ -407,7 +411,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
|
|
||||||
let stream = streamFut.read
|
let stream = streamFut.read
|
||||||
defer:
|
defer:
|
||||||
await close(stream)
|
await safeClose(stream)
|
||||||
|
|
||||||
# Send the request
|
# Send the request
|
||||||
var s = init OutputStream
|
var s = init OutputStream
|
||||||
|
@ -589,7 +593,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
||||||
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
||||||
defer:
|
defer:
|
||||||
`await` close(`streamVar`)
|
`await` safeClose(`streamVar`)
|
||||||
|
|
||||||
let
|
let
|
||||||
`deadlineVar` = sleepAsync RESP_TIMEOUT
|
`deadlineVar` = sleepAsync RESP_TIMEOUT
|
||||||
|
|
Loading…
Reference in New Issue