Improved error handling in the networking code

This commit is contained in:
Zahary Karadjov 2019-03-26 12:01:13 +02:00
parent d94d4f2606
commit c0fa229254
3 changed files with 63 additions and 39 deletions

View File

@ -11,8 +11,8 @@ type
StartUpCommand* = enum StartUpCommand* = enum
noCommand noCommand
createTestnet
importValidator importValidator
createTestnet
updateTestnet updateTestnet
BeaconNodeConf* = object BeaconNodeConf* = object

View File

@ -24,6 +24,10 @@ proc initProtocolState*(peer: GossipSubPeer, _: Peer) =
peer.sentMessages = initSet[string]() peer.sentMessages = initSet[string]()
peer.subscribedFor = initSet[string]() peer.subscribedFor = initSet[string]()
proc trySubscribing(peer: Peer, topic: string) {.gcsafe.}
proc tryEmitting(peer: Peer, topic: string,
msgId: string, msg: string): Future[void] {.gcsafe.}
p2pProtocol GossipSub(version = 1, p2pProtocol GossipSub(version = 1,
shortName = "gss", shortName = "gss",
peerState = GossipSubPeer, peerState = GossipSubPeer,
@ -35,14 +39,10 @@ p2pProtocol GossipSub(version = 1,
info "GossipSub Peer connected", peer info "GossipSub Peer connected", peer
let gossipNet = peer.networkState let gossipNet = peer.networkState
for topic, _ in gossipNet.topicSubscribers: for topic, _ in gossipNet.topicSubscribers:
asyncCheck peer.subscribeFor(topic) peer.trySubscribing(topic)
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason): onPeerDisconnected do (peer: Peer, reason: DisconnectionReason):
info "GossipSub Peer disconnected", peer, reason info "GossipSub Peer disconnected", peer, reason
# TODO, add stacktraces support to nim-chronicles
debug "Debugging stacktrace"
writeStyledStackTrace()
debug "Continuing ..."
proc subscribeFor(peer: Peer, topic: string) = proc subscribeFor(peer: Peer, topic: string) =
peer.state.subscribedFor.incl topic peer.state.subscribedFor.incl topic
@ -57,14 +57,14 @@ p2pProtocol GossipSub(version = 1,
for p in peer.network.peers(GossipSub): for p in peer.network.peers(GossipSub):
if msgId notin p.state.sentMessages and topic in p.state.subscribedFor: if msgId notin p.state.sentMessages and topic in p.state.subscribedFor:
p.state.sentMessages.incl msgId p.state.sentMessages.incl msgId
asyncCheck p.emit(topic, msgId, msg) asyncDiscard p.tryEmitting(topic, msgId, msg)
{.gcsafe.}: {.gcsafe.}:
let handler = peer.networkState.topicSubscribers.getOrDefault(topic) let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
if handler != nil: if handler != nil:
handler(msg) handler(msg)
proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} = proc broadcastIMPL(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} =
var randBytes: array[10, byte]; var randBytes: array[10, byte];
if randomBytes(randBytes) != 10: if randomBytes(randBytes) != 10:
warn "Failed to generate random message id" warn "Failed to generate random message id"
@ -74,7 +74,21 @@ proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[v
for peer in node.peers(GossipSub): for peer in node.peers(GossipSub):
if topic in peer.state(GossipSub).subscribedFor: if topic in peer.state(GossipSub).subscribedFor:
result.add peer.emit(topic, msgId, msg) result.add peer.tryEmitting(topic, msgId, msg)
proc trySubscribing(peer: Peer, topic: string) =
var fut = peer.subscribeFor(topic)
fut.addCallback do (arg: pointer):
if fut.failed:
warn "Failed to subscribe to topic with GossipSub peer", topic, peer
proc tryEmitting(peer: Peer, topic: string,
msgId: string, msg: string): Future[void] =
var fut = peer.emit(topic, msgId, msg)
fut.addCallback do (arg: pointer):
if fut.failed:
warn "GossipSub message not delivered to Peer", peer
return fut
proc subscribe*[MsgType](node: EthereumNode, proc subscribe*[MsgType](node: EthereumNode,
topic: string, topic: string,
@ -84,8 +98,11 @@ proc subscribe*[MsgType](node: EthereumNode,
userHandler Json.decode(msg, MsgType) userHandler Json.decode(msg, MsgType)
for peer in node.peers(GossipSub): for peer in node.peers(GossipSub):
discard peer.subscribeFor(topic) peer.trySubscribing(topic)
proc broadcast*(node: EthereumNode, topic: string, msg: auto) {.async.} = proc broadcast*(node: EthereumNode, topic: string, msg: auto) {.async.} =
await all(node.broadcastImpl(topic, Json.encode(msg))) # We are intentionally using `yield` here, so the broadcast call can
# never fail. Please note that errors are logged through a callback
# set in `tryEmitting`
yield all(node.broadcastIMPL(topic, Json.encode(msg)))

View File

@ -84,7 +84,7 @@ p2pProtocol BeaconSync(version = 1,
bestRoot: Eth2Digest # TODO bestRoot: Eth2Digest # TODO
bestSlot = latestState.slot bestSlot = latestState.slot
let m = await handshake(peer, timeout = 500.milliseconds, let m = await handshake(peer, timeout = 10.seconds,
status(networkId, latestFinalizedRoot, status(networkId, latestFinalizedRoot,
latestFinalizedEpoch, bestRoot, bestSlot)) latestFinalizedEpoch, bestRoot, bestSlot))
@ -92,34 +92,41 @@ p2pProtocol BeaconSync(version = 1,
await peer.disconnect(UselessPeer) await peer.disconnect(UselessPeer)
return return
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) # TODO: onPeerConnected runs unconditionally for every connected peer, but we
if bestDiff == 0: # don't need to sync with everybody. The beacon node should detect a situation
# Nothing to do? # where it needs to sync and it should execute the sync algorithm with a certain
trace "Nothing to sync", peer = peer.remote # number of randomly selected peers. The algorithm itself must be extracted in a proc.
else: try:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
# connection if it's too big. if bestDiff == 0:
# Nothing to do?
if bestDiff > 0: trace "Nothing to sync", peer = peer.remote
# Send roots
# TODO: Currently we send all block roots in one "packet". Maybe
# they should be split to multiple packets.
type Root = (Eth2Digest, Slot)
var roots = newSeqOfCap[Root](128)
for i in int(m.bestSlot) + 1 .. int(bestSlot):
for r in blockPool.blockRootsForSlot(i.Slot):
roots.add((r, i.Slot))
await peer.beaconBlockRoots(roots)
else: else:
# Receive roots # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) # connection if it's too big.
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) if bestDiff > 0:
for r in roots.roots: # Send roots
bodiesRequest.add(r[0]) # TODO: Currently we send all block roots in one "packet". Maybe
let bodies = await peer.getBeaconBlockBodies(bodiesRequest) # they should be split to multiple packets.
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) type Root = (Eth2Digest, Slot)
var roots = newSeqOfCap[Root](128)
for i in int(m.bestSlot) + 1 .. int(bestSlot):
for r in blockPool.blockRootsForSlot(i.Slot):
roots.add((r, i.Slot))
await peer.beaconBlockRoots(roots)
else:
# Receive roots
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots)
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len)
for r in roots.roots:
bodiesRequest.add(r[0])
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)
except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()
proc status( proc status(
peer: Peer, peer: Peer,