From 12e9367f78bfd8a47739974334759d2448eb8d4f Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Tue, 11 Jun 2019 02:20:18 +0300 Subject: [PATCH] Improved error handling; Simple test case for connecting 2 peers --- beacon_chain/conf.nim | 2 +- beacon_chain/eth2_network.nim | 2 +- beacon_chain/libp2p_backend.nim | 75 +++++++++++++++---------- beacon_chain/libp2p_backends_common.nim | 17 ++++++ beacon_chain/libp2p_spec_backend.nim | 20 ++++--- tests/test_peer_connection.nim | 24 ++++++++ 6 files changed, 98 insertions(+), 42 deletions(-) create mode 100644 tests/test_peer_connection.nim diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 7e9311a4c..403c53e25 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -4,7 +4,7 @@ import spec/[crypto, datatypes], time, version export - defs + defs, enabledLogLevel const DEFAULT_NETWORK* {.strdefine.} = "testnet0" diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index d6eba1eb1..7f660b435 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -183,7 +183,7 @@ else: try: await node.daemon.connect(bootstrapNode.peer, bootstrapNode.addresses) let peer = node.getPeer(bootstrapNode.peer) - await peer.performProtocolHandshakes() + await initializeConnection(peer) except PeerDisconnected: error "Failed to connect to bootstrap node", node = bootstrapNode diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 677ca3937..ac2bf3efc 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -33,6 +33,7 @@ type DisconnectionReason* = enum UselessPeer BreachOfProtocol + FaultOrError UntypedResponder = object peer*: Peer @@ -105,17 +106,16 @@ proc init*(node: Eth2Node) {.async.} = await node.daemon.addHandler(@[msg.libp2pProtocol], msg.thunk) proc readMsg(stream: P2PStream, MsgType: type, - timeout = 10.seconds): Future[Option[MsgType]] {.async.} = - var timeout = sleepAsync timeout + deadline: Future[void]): Future[Option[MsgType]] {.async.} = var sizePrefix: uint32 var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix)) - await readSizePrefix or timeout + await readSizePrefix or deadline if not readSizePrefix.finished: return var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix)) copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix)) var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int) - await readBody or timeout + await readBody or deadline if not readBody.finished: return let decoded = SSZ.decode(msgBytes, MsgType) @@ -137,11 +137,22 @@ proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} = proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout = 10.seconds): Future[Option[ResponseMsg]] {.async.} = - var stream = await peer.network.daemon.openStream(peer.id, @[protocolId]) - # TODO how does openStream fail? Set a timeout here and handle it + var deadline = sleepAsync timeout + + # Open a new LibP2P stream + var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId]) + await streamFut or deadline + if not streamFut.finished: + return none(ResponseMsg) + + # Send the request + let stream = streamFut.read let sent = await stream.transp.write(requestBytes) - # TODO: Should I check that `sent` is equal to the desired number of bytes - return await stream.readMsg(ResponseMsg, timeout) + if sent != requestBytes: + await disconnectAndRaise(peer, FaultOrError, "Incomplete send") + + # Read the response + return await stream.readMsg(ResponseMsg, deadline) proc p2pStreamName(MsgType: type): string = mixin msgProtocol, protocolInfo, msgId @@ -167,22 +178,31 @@ template handshakeImpl(HandshakeTypeExpr: untyped, type HandshakeType = type(HandshakeTypeExpr) proc asyncStep(stream: P2PStream): Future[HandshakeType] {.async.} = + let deadline = sleepAsync timeout var stream = stream if stream == nil: - stream = await openStream(peer.network.daemon, peer.id, - @[p2pStreamName(HandshakeType)], - # TODO openStream should accept Duration - int milliseconds(timeout)) + try: stream = await openStream(peer.network.daemon, peer.id, + @[p2pStreamName(HandshakeType)], + # TODO openStream should accept Duration + int milliseconds(timeout)) + except CatchableError: + const errMsg = "Failed to open LIBP2P stream" + debug errMsg, + stream = p2pStreamName(HandshakeType), + err = getCurrentExceptionMsg() + await disconnectAndRaise(peer, FaultOrError, errMsg) - # Please pay attention that `lazySendCall` is evaluated lazily here. - # For this reason `handshakeImpl` must remain a template. - await lazySendCall - - let response = await readMsg(stream, HandshakeType, timeout) - if response.isSome: - return response.get - else: - await disconnectAndRaise(peer, BreachOfProtocol, "Handshake not completed in time") + try: + # Please pay attention that `lazySendCall` is evaluated lazily here. + # For this reason `handshakeImpl` must remain a template. + await lazySendCall + let response = await readMsg(stream, HandshakeType, deadline) + if response.isSome: + return response.get + else: + await disconnectAndRaise(peer, BreachOfProtocol, "Handshake not completed in time") + except CatchableError: + await reraiseAsPeerDisconnected(peer, "Failure during handshake") asyncStep(stream) @@ -213,15 +233,6 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} = await all(subProtocolsHandshakes) -proc getPeer*(node: Eth2Node, peerId: PeerID): Peer = - result = node.peers.getOrDefault(peerId) - if result == nil: - result = Peer.init(node, peerId) - node.peers[peerId] = result - -proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer = - Eth2Node(daemon.userData).getPeer(stream.peer) - template getRecipient(peer: Peer): Peer = peer @@ -328,6 +339,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = receivedMsg = ident "msg" daemonVar = ident "daemon" streamVar = ident "stream" + deadlineVar = ident "deadline" await = ident "await" p.useRequestIds = false @@ -369,7 +381,8 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = msg.defineThunk quote do: proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} = - var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `requestDataTimeout`) + var `deadlineVar` = sleepAsync `requestDataTimeout` + var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `deadlineVar`) if `receivedMsg`.isNone: # TODO: This peer is misbehaving, perhaps we should penalize him somehow return diff --git a/beacon_chain/libp2p_backends_common.nim b/beacon_chain/libp2p_backends_common.nim index 24a4e6919..24318727c 100644 --- a/beacon_chain/libp2p_backends_common.nim +++ b/beacon_chain/libp2p_backends_common.nim @@ -2,6 +2,17 @@ proc `$`*(peer: Peer): string = $peer.id +proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.} + +proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = + result = node.peers.getOrDefault(peerId) + if result == nil: + result = Peer.init(node, peerId) + node.peers[peerId] = result + +proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = + Eth2Node(daemon.userData).getPeer(stream.peer) + proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = # TODO: How should we notify the other peer? if peer.connectionState notin {Disconnecting, Disconnected}: @@ -22,6 +33,12 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) +template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string, + reason = FaultOrError): auto = + const errMsg = errMsgExpr + debug errMsg, err = getCurrentExceptionMsg() + disconnectAndRaise(peer, reason, errMsg) + proc getCompressedMsgId*(MsgType: type): CompressedMsgId = mixin msgId, msgProtocol, protocolInfo (protocolIdx: MsgType.msgProtocol.protocolInfo.index, diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim index 3d57db53b..eca716db7 100644 --- a/beacon_chain/libp2p_spec_backend.nim +++ b/beacon_chain/libp2p_spec_backend.nim @@ -192,21 +192,23 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} = var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) for protocol in allProtocols: if protocol.handshake != nil: - subProtocolsHandshakes.add((protocol.handshake)(peer, nil)) + subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream)) await all(subProtocolsHandshakes) + debug "All protocols initialized", peer -proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = - result = node.peers.getOrDefault(peerId) - if result == nil: - result = Peer.init(node, peerId) - node.peers[peerId] = result - -proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = - Eth2Node(daemon.userData).getPeer(stream.peer) +proc initializeConnection*(peer: Peer) {.async.} = + let daemon = peer.network.daemon + try: + peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol]) + await performProtocolHandshakes(peer) + except CatchableError: + await reraiseAsPeerDisconnected(peer, "Failed to perform handshake") proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = let peer = daemon.peerFromStream(stream) + peer.rpcStream = stream + await performProtocolHandshakes(peer) proc accepts(d: Dispatcher, methodId: uint16): bool = methodId.int < d.messages.len diff --git a/tests/test_peer_connection.nim b/tests/test_peer_connection.nim new file mode 100644 index 000000000..d5d923c29 --- /dev/null +++ b/tests/test_peer_connection.nim @@ -0,0 +1,24 @@ +import + unittest, os, + chronos, confutils, + ../beacon_chain/[conf, eth2_network] + +template asyncTest*(name, body: untyped) = + test name: + proc scenario {.async.} = body + waitFor scenario() + +asyncTest "connect two nodes": + let tempDir = getTempDir() / "peers_test" + + var c1 = BeaconNodeConf.defaults + c1.dataDir = OutDir(tempDir / "node-1") + var n1 = await createEth2Node(c1) + var n1Address = getPersistenBootstrapAddr(c1, parseIpAddress("127.0.0.1"), Port 50000) + + var c2 = BeaconNodeConf.defaults + c2.dataDir = OutDir(tempDir / "node-2") + var n2 = await createEth2Node(c2) + + await n2.connectToNetwork(bootstrapNodes = @[n1Address]) +