diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 876d03a45..7c7bc3c62 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -11,8 +11,8 @@ type StartUpCommand* = enum noCommand - createTestnet importValidator + createTestnet updateTestnet BeaconNodeConf* = object diff --git a/beacon_chain/gossipsub_protocol.nim b/beacon_chain/gossipsub_protocol.nim index 09d4eb9eb..208a9cc78 100644 --- a/beacon_chain/gossipsub_protocol.nim +++ b/beacon_chain/gossipsub_protocol.nim @@ -24,6 +24,10 @@ proc initProtocolState*(peer: GossipSubPeer, _: Peer) = peer.sentMessages = initSet[string]() peer.subscribedFor = initSet[string]() +proc trySubscribing(peer: Peer, topic: string) {.gcsafe.} +proc tryEmitting(peer: Peer, topic: string, + msgId: string, msg: string): Future[void] {.gcsafe.} + p2pProtocol GossipSub(version = 1, shortName = "gss", peerState = GossipSubPeer, @@ -35,14 +39,10 @@ p2pProtocol GossipSub(version = 1, info "GossipSub Peer connected", peer let gossipNet = peer.networkState for topic, _ in gossipNet.topicSubscribers: - asyncCheck peer.subscribeFor(topic) + peer.trySubscribing(topic) onPeerDisconnected do (peer: Peer, reason: DisconnectionReason): info "GossipSub Peer disconnected", peer, reason - # TODO, add stacktraces support to nim-chronicles - debug "Debugging stacktrace" - writeStyledStackTrace() - debug "Continuing ..." proc subscribeFor(peer: Peer, topic: string) = peer.state.subscribedFor.incl topic @@ -57,14 +57,14 @@ p2pProtocol GossipSub(version = 1, for p in peer.network.peers(GossipSub): if msgId notin p.state.sentMessages and topic in p.state.subscribedFor: p.state.sentMessages.incl msgId - asyncCheck p.emit(topic, msgId, msg) + asyncDiscard p.tryEmitting(topic, msgId, msg) {.gcsafe.}: let handler = peer.networkState.topicSubscribers.getOrDefault(topic) if handler != nil: handler(msg) -proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} = +proc broadcastIMPL(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} = var randBytes: array[10, byte]; if randomBytes(randBytes) != 10: warn "Failed to generate random message id" @@ -74,7 +74,21 @@ proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[v for peer in node.peers(GossipSub): if topic in peer.state(GossipSub).subscribedFor: - result.add peer.emit(topic, msgId, msg) + result.add peer.tryEmitting(topic, msgId, msg) + +proc trySubscribing(peer: Peer, topic: string) = + var fut = peer.subscribeFor(topic) + fut.addCallback do (arg: pointer): + if fut.failed: + warn "Failed to subscribe to topic with GossipSub peer", topic, peer + +proc tryEmitting(peer: Peer, topic: string, + msgId: string, msg: string): Future[void] = + var fut = peer.emit(topic, msgId, msg) + fut.addCallback do (arg: pointer): + if fut.failed: + warn "GossipSub message not delivered to Peer", peer + return fut proc subscribe*[MsgType](node: EthereumNode, topic: string, @@ -84,8 +98,11 @@ proc subscribe*[MsgType](node: EthereumNode, userHandler Json.decode(msg, MsgType) for peer in node.peers(GossipSub): - discard peer.subscribeFor(topic) + peer.trySubscribing(topic) proc broadcast*(node: EthereumNode, topic: string, msg: auto) {.async.} = - await all(node.broadcastImpl(topic, Json.encode(msg))) + # We are intentionally using `yield` here, so the broadcast call can + # never fail. Please note that errors are logged through a callback + # set in `tryEmitting` + yield all(node.broadcastIMPL(topic, Json.encode(msg))) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 6b6484edd..5c51cfd02 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -84,7 +84,7 @@ p2pProtocol BeaconSync(version = 1, bestRoot: Eth2Digest # TODO bestSlot = latestState.slot - let m = await handshake(peer, timeout = 500.milliseconds, + let m = await handshake(peer, timeout = 10.seconds, status(networkId, latestFinalizedRoot, latestFinalizedEpoch, bestRoot, bestSlot)) @@ -92,34 +92,41 @@ p2pProtocol BeaconSync(version = 1, await peer.disconnect(UselessPeer) return - let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) - if bestDiff == 0: - # Nothing to do? - trace "Nothing to sync", peer = peer.remote - else: - # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the - # connection if it's too big. - - if bestDiff > 0: - # Send roots - # TODO: Currently we send all block roots in one "packet". Maybe - # they should be split to multiple packets. - type Root = (Eth2Digest, Slot) - var roots = newSeqOfCap[Root](128) - for i in int(m.bestSlot) + 1 .. int(bestSlot): - for r in blockPool.blockRootsForSlot(i.Slot): - roots.add((r, i.Slot)) - - await peer.beaconBlockRoots(roots) + # TODO: onPeerConnected runs unconditionally for every connected peer, but we + # don't need to sync with everybody. The beacon node should detect a situation + # where it needs to sync and it should execute the sync algorithm with a certain + # number of randomly selected peers. The algorithm itself must be extracted in a proc. + try: + let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) + if bestDiff == 0: + # Nothing to do? + trace "Nothing to sync", peer = peer.remote else: - # Receive roots - let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) - let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) - var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) - for r in roots.roots: - bodiesRequest.add(r[0]) - let bodies = await peer.getBeaconBlockBodies(bodiesRequest) - node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) + # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the + # connection if it's too big. + + if bestDiff > 0: + # Send roots + # TODO: Currently we send all block roots in one "packet". Maybe + # they should be split to multiple packets. + type Root = (Eth2Digest, Slot) + var roots = newSeqOfCap[Root](128) + for i in int(m.bestSlot) + 1 .. int(bestSlot): + for r in blockPool.blockRootsForSlot(i.Slot): + roots.add((r, i.Slot)) + + await peer.beaconBlockRoots(roots) + else: + # Receive roots + let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) + let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) + var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) + for r in roots.roots: + bodiesRequest.add(r[0]) + let bodies = await peer.getBeaconBlockBodies(bodiesRequest) + node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) + except CatchableError: + warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() proc status( peer: Peer,