Improved error handling; Simple test case for connecting 2 peers

This commit is contained in:
Zahary Karadjov 2019-06-11 02:20:18 +03:00
parent 877b22cfb8
commit 12e9367f78
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
6 changed files with 98 additions and 42 deletions

View File

@ -4,7 +4,7 @@ import
spec/[crypto, datatypes], time, version
export
defs
defs, enabledLogLevel
const
DEFAULT_NETWORK* {.strdefine.} = "testnet0"

View File

@ -183,7 +183,7 @@ else:
try:
await node.daemon.connect(bootstrapNode.peer, bootstrapNode.addresses)
let peer = node.getPeer(bootstrapNode.peer)
await peer.performProtocolHandshakes()
await initializeConnection(peer)
except PeerDisconnected:
error "Failed to connect to bootstrap node", node = bootstrapNode

View File

@ -33,6 +33,7 @@ type
DisconnectionReason* = enum
UselessPeer
BreachOfProtocol
FaultOrError
UntypedResponder = object
peer*: Peer
@ -105,17 +106,16 @@ proc init*(node: Eth2Node) {.async.} =
await node.daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
proc readMsg(stream: P2PStream, MsgType: type,
timeout = 10.seconds): Future[Option[MsgType]] {.async.} =
var timeout = sleepAsync timeout
deadline: Future[void]): Future[Option[MsgType]] {.async.} =
var sizePrefix: uint32
var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix))
await readSizePrefix or timeout
await readSizePrefix or deadline
if not readSizePrefix.finished: return
var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix))
copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix))
var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int)
await readBody or timeout
await readBody or deadline
if not readBody.finished: return
let decoded = SSZ.decode(msgBytes, MsgType)
@ -137,11 +137,22 @@ proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} =
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout = 10.seconds): Future[Option[ResponseMsg]] {.async.} =
var stream = await peer.network.daemon.openStream(peer.id, @[protocolId])
# TODO how does openStream fail? Set a timeout here and handle it
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
await streamFut or deadline
if not streamFut.finished:
return none(ResponseMsg)
# Send the request
let stream = streamFut.read
let sent = await stream.transp.write(requestBytes)
# TODO: Should I check that `sent` is equal to the desired number of bytes
return await stream.readMsg(ResponseMsg, timeout)
if sent != requestBytes:
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
# Read the response
return await stream.readMsg(ResponseMsg, deadline)
proc p2pStreamName(MsgType: type): string =
mixin msgProtocol, protocolInfo, msgId
@ -167,22 +178,31 @@ template handshakeImpl(HandshakeTypeExpr: untyped,
type HandshakeType = type(HandshakeTypeExpr)
proc asyncStep(stream: P2PStream): Future[HandshakeType] {.async.} =
let deadline = sleepAsync timeout
var stream = stream
if stream == nil:
stream = await openStream(peer.network.daemon, peer.id,
@[p2pStreamName(HandshakeType)],
# TODO openStream should accept Duration
int milliseconds(timeout))
try: stream = await openStream(peer.network.daemon, peer.id,
@[p2pStreamName(HandshakeType)],
# TODO openStream should accept Duration
int milliseconds(timeout))
except CatchableError:
const errMsg = "Failed to open LIBP2P stream"
debug errMsg,
stream = p2pStreamName(HandshakeType),
err = getCurrentExceptionMsg()
await disconnectAndRaise(peer, FaultOrError, errMsg)
# Please pay attention that `lazySendCall` is evaluated lazily here.
# For this reason `handshakeImpl` must remain a template.
await lazySendCall
let response = await readMsg(stream, HandshakeType, timeout)
if response.isSome:
return response.get
else:
await disconnectAndRaise(peer, BreachOfProtocol, "Handshake not completed in time")
try:
# Please pay attention that `lazySendCall` is evaluated lazily here.
# For this reason `handshakeImpl` must remain a template.
await lazySendCall
let response = await readMsg(stream, HandshakeType, deadline)
if response.isSome:
return response.get
else:
await disconnectAndRaise(peer, BreachOfProtocol, "Handshake not completed in time")
except CatchableError:
await reraiseAsPeerDisconnected(peer, "Failure during handshake")
asyncStep(stream)
@ -213,15 +233,6 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} =
await all(subProtocolsHandshakes)
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer =
Eth2Node(daemon.userData).getPeer(stream.peer)
template getRecipient(peer: Peer): Peer =
peer
@ -328,6 +339,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
receivedMsg = ident "msg"
daemonVar = ident "daemon"
streamVar = ident "stream"
deadlineVar = ident "deadline"
await = ident "await"
p.useRequestIds = false
@ -369,7 +381,8 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msg.defineThunk quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} =
var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `requestDataTimeout`)
var `deadlineVar` = sleepAsync `requestDataTimeout`
var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `deadlineVar`)
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
return

View File

@ -2,6 +2,17 @@
proc `$`*(peer: Peer): string = $peer.id
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
Eth2Node(daemon.userData).getPeer(stream.peer)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
@ -22,6 +33,12 @@ proc disconnectAndRaise(peer: Peer,
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string,
reason = FaultOrError): auto =
const errMsg = errMsgExpr
debug errMsg, err = getCurrentExceptionMsg()
disconnectAndRaise(peer, reason, errMsg)
proc getCompressedMsgId*(MsgType: type): CompressedMsgId =
mixin msgId, msgProtocol, protocolInfo
(protocolIdx: MsgType.msgProtocol.protocolInfo.index,

View File

@ -192,21 +192,23 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream))
await all(subProtocolsHandshakes)
debug "All protocols initialized", peer
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
Eth2Node(daemon.userData).getPeer(stream.peer)
proc initializeConnection*(peer: Peer) {.async.} =
let daemon = peer.network.daemon
try:
peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol])
await performProtocolHandshakes(peer)
except CatchableError:
await reraiseAsPeerDisconnected(peer, "Failed to perform handshake")
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} =
let peer = daemon.peerFromStream(stream)
peer.rpcStream = stream
await performProtocolHandshakes(peer)
proc accepts(d: Dispatcher, methodId: uint16): bool =
methodId.int < d.messages.len

View File

@ -0,0 +1,24 @@
import
unittest, os,
chronos, confutils,
../beacon_chain/[conf, eth2_network]
template asyncTest*(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()
asyncTest "connect two nodes":
let tempDir = getTempDir() / "peers_test"
var c1 = BeaconNodeConf.defaults
c1.dataDir = OutDir(tempDir / "node-1")
var n1 = await createEth2Node(c1)
var n1Address = getPersistenBootstrapAddr(c1, parseIpAddress("127.0.0.1"), Port 50000)
var c2 = BeaconNodeConf.defaults
c2.dataDir = OutDir(tempDir / "node-2")
var n2 = await createEth2Node(c2)
await n2.connectToNetwork(bootstrapNodes = @[n1Address])