From 27e1625d341bffa5a87f6578d35821dd15e232ca Mon Sep 17 00:00:00 2001 From: tersec Date: Wed, 15 Jun 2022 08:14:47 +0000 Subject: [PATCH] check for and log gossip broadcast failure (#3737) * check for and log gossip broadcast failure * switch notices to warns; update LC variables regardless * don't both return a Result and log sending error * add metrics counter for failed-due-to-no-peers and removed unnecessary async * don't report failure of sync committee messages * remove redundant metric * document metric being incremented --- beacon_chain/networking/eth2_network.nim | 172 +++++++++---------- beacon_chain/rpc/rest_beacon_api.nim | 6 +- beacon_chain/validators/validator_duties.nim | 144 ++++++++++------ 3 files changed, 177 insertions(+), 145 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 904665f30..49ecf9ab2 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -46,6 +46,7 @@ type Bytes = seq[byte] ErrorMsg = List[byte, 256] + SendResult* = Result[void, cstring] # TODO: This is here only to eradicate a compiler # warning about unused import (rpc/messages). @@ -216,7 +217,7 @@ func phase0metadata*(node: Eth2Node): phase0.MetaData = seq_number: node.metadata.seq_number, attnets: node.metadata.attnets) -func toAltairMetadata*(phase0: phase0.MetaData): altair.MetaData = +func toAltairMetadata(phase0: phase0.MetaData): altair.MetaData = altair.MetaData( seq_number: phase0.seq_number, attnets: phase0.attnets) @@ -315,9 +316,6 @@ func shortLog*(peer: Peer): string = shortLog(peer.peerId) chronicles.formatIt(Peer): shortLog(it) chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet()) -template remote*(peer: Peer): untyped = - peer.peerId - proc openStream(node: Eth2Node, peer: Peer, protocolId: string): Future[Connection] {.async.} = @@ -331,7 +329,7 @@ proc openStream(node: Eth2Node, return conn -proc init*(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} +proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} func peerId*(node: Eth2Node): PeerId = node.switch.peerInfo.peerId @@ -339,7 +337,7 @@ func peerId*(node: Eth2Node): PeerId = func enrRecord*(node: Eth2Node): Record = node.discovery.localNode.record -proc getPeer*(node: Eth2Node, peerId: PeerId): Peer = +proc getPeer(node: Eth2Node, peerId: PeerId): Peer = node.peers.withValue(peerId, peer) do: return peer[] do: @@ -350,33 +348,33 @@ proc peerFromStream(network: Eth2Node, conn: Connection): Peer = result = network.getPeer(conn.peerId) result.peerId = conn.peerId -proc getKey*(peer: Peer): PeerId {.inline.} = +func getKey*(peer: Peer): PeerId {.inline.} = peer.peerId -proc getFuture*(peer: Peer): Future[void] {.inline.} = +proc getFuture(peer: Peer): Future[void] {.inline.} = if isNil(peer.disconnectedFut): peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut -proc getScore*(a: Peer): int = +func getScore*(a: Peer): int = ## Returns current score value for peer ``peer``. a.score -proc updateScore*(peer: Peer, score: int) {.inline.} = +func updateScore*(peer: Peer, score: int) {.inline.} = ## Update peer's ``peer`` score with value ``score``. peer.score = peer.score + score if peer.score > PeerScoreHighLimit: peer.score = PeerScoreHighLimit -proc calcThroughput(dur: Duration, value: uint64): float = +func calcThroughput(dur: Duration, value: uint64): float = let secs = float(chronos.seconds(1).nanoseconds) if isZero(dur): 0.0 else: float(value) * (secs / float(dur.nanoseconds)) -proc updateNetThroughput*(peer: Peer, dur: Duration, - bytesCount: uint64) {.inline.} = +func updateNetThroughput(peer: Peer, dur: Duration, + bytesCount: uint64) {.inline.} = ## Update peer's ``peer`` network throughput. let bytesPerSecond = calcThroughput(dur, bytesCount) let a = peer.netThroughput.average @@ -384,19 +382,11 @@ proc updateNetThroughput*(peer: Peer, dur: Duration, peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1) inc(peer.netThroughput.count) -proc netBps*(peer: Peer): float {.inline.} = - ## Returns current network throughput average value in Bps for peer ``peer``. - round((peer.netThroughput.average * 10_000) / 10_000) - -proc netKbps*(peer: Peer): float {.inline.} = +func netKbps*(peer: Peer): float {.inline.} = ## Returns current network throughput average value in Kbps for peer ``peer``. round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) -proc netMbps*(peer: Peer): float {.inline.} = - ## Returns current network throughput average value in Mbps for peer ``peer``. - round(((peer.netThroughput.average / (1024 * 1024)) * 10_000) / 10_000) - -proc `<`*(a, b: Peer): bool = +func `<`(a, b: Peer): bool = ## Comparison function, which first checks peer's scores, and if the peers' ## score is equal it compares peers' network throughput. if a.score < b.score: @@ -428,7 +418,7 @@ template awaitNonNegativeRequestQuota*(peer: Peer) = func allowedOpsPerSecondCost*(n: int): float = (replenishRate * 1000000000'f / n.float) -proc isSeen*(network: Eth2Node, peerId: PeerId): bool = +proc isSeen(network: Eth2Node, peerId: PeerId): bool = ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## yet expired. let currentTime = now(chronos.Moment) @@ -444,7 +434,7 @@ proc isSeen*(network: Eth2Node, peerId: PeerId): bool = else: true -proc addSeen*(network: Eth2Node, peerId: PeerId, +proc addSeen(network: Eth2Node, peerId: PeerId, period: chronos.Duration) = ## Adds peer with PeerId ``peerId`` to SeenTable and timeout ``period``. let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period) @@ -560,7 +550,7 @@ proc isLightClientRequestProto(fn: NimNode): NimNode = return newLit(false) -proc writeChunkSZ*( +proc writeChunkSZ( conn: Connection, responseCode: Option[ResponseCode], uncompressedLen: uint64, payloadSZ: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = @@ -581,10 +571,10 @@ proc writeChunkSZ*( conn.write(output.getOutput) -proc writeChunk*(conn: Connection, - responseCode: Option[ResponseCode], - payload: openArray[byte], - contextBytes: openArray[byte] = []): Future[void] = +proc writeChunk(conn: Connection, + responseCode: Option[ResponseCode], + payload: openArray[byte], + contextBytes: openArray[byte] = []): Future[void] = var output = memoryOutput() try: @@ -653,7 +643,7 @@ proc sendResponseChunkBytes( inc response.writtenChunks response.stream.writeChunk(some Success, payload, contextBytes) -proc sendResponseChunk*( +proc sendResponseChunk( response: UntypedResponse, val: auto, contextBytes: openArray[byte] = []): Future[void] = sendResponseChunkBytes(response, SSZ.encode(val), contextBytes) @@ -667,8 +657,8 @@ template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResult: auto): untyped = writeChunk(stream, some Success, SSZ.encode(handlerResult)) -proc uncompressFramedStream*(conn: Connection, - expectedSize: int): Future[Result[seq[byte], cstring]] +proc uncompressFramedStream(conn: Connection, + expectedSize: int): Future[Result[seq[byte], cstring]] {.async.} = var header: array[framingHeader.len, byte] try: @@ -916,7 +906,7 @@ template send*[M]( doAssert UntypedResponse(r).writtenChunks == 0 sendResponseChunk(UntypedResponse(r), val, contextBytes) -proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} = +proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} = # Loop down serially because it's easier to reason about the connection state # when there are fewer async races, specially during setup for protocol in allProtocols: @@ -1153,7 +1143,7 @@ proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool = else: true -proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = +proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = ## Establish connection with remote peer identified by address ``peerAddr``. logScope: peer = peerAddr.peerId @@ -1209,7 +1199,7 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] = let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) ok(peerAddr) -proc isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool = +func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool = if discoveryForkId.fork_digest == peerForkId.fork_digest: if discoveryForkId.next_fork_version < peerForkId.next_fork_version: # Peer knows about a fork and we don't @@ -1445,7 +1435,7 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = default(SyncnetBits) ) -proc runDiscoveryLoop*(node: Eth2Node) {.async.} = +proc runDiscoveryLoop(node: Eth2Node) {.async.} = debug "Starting discovery loop" while true: @@ -1648,14 +1638,14 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} = peer = peerId, peer_state = peer.connectionState peer.connectionState = Disconnected -proc new*(T: type Eth2Node, - config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig, - enrForkId: ENRForkID, discoveryForkId: ENRForkID, - forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, - switch: Switch, pubsub: GossipSub, - ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port], - privKey: keys.PrivateKey, discovery: bool, - rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} = +proc new(T: type Eth2Node, + config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig, + enrForkId: ENRForkID, discoveryForkId: ENRForkID, + forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, + switch: Switch, pubsub: GossipSub, + ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port], + privKey: keys.PrivateKey, discovery: bool, + rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} = when not defined(local_testnet): let connectTimeout = chronos.minutes(1) @@ -1742,7 +1732,7 @@ proc new*(T: type Eth2Node, node -template publicKey*(node: Eth2Node): keys.PublicKey = +template publicKey(node: Eth2Node): keys.PublicKey = node.discovery.privKey.toPublicKey proc startListening*(node: Eth2Node) {.async.} = @@ -1810,7 +1800,7 @@ proc stop*(node: Eth2Node) {.async.} = trace "Eth2Node.stop(): timeout reached", timeout, futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) -proc init*(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = +proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = let res = Peer( peerId: peerId, network: network, @@ -2030,13 +2020,10 @@ proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} = await sleepAsync(1.seconds div max(1, excessPeers)) -func asLibp2pKey*(key: keys.PublicKey): PublicKey = - PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key)) - func asEthKey*(key: PrivateKey): keys.PrivateKey = keys.PrivateKey(key.skkey) -proc initAddress*(T: type MultiAddress, str: string): T = +proc initAddress(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str) if IPFS.match(address) and matchPartial(multiaddress.TCP, address): result = address @@ -2189,9 +2176,9 @@ func gossipId( messageDigest.data[0..19] -proc newBeaconSwitch*(config: BeaconNodeConf | LightClientConf, - seckey: PrivateKey, address: MultiAddress, - rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} = +proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf, + seckey: PrivateKey, address: MultiAddress, + rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} = SwitchBuilder .new() .withPrivateKey(seckey) @@ -2340,11 +2327,11 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, node -proc announcedENR*(node: Eth2Node): enr.Record = +func announcedENR*(node: Eth2Node): enr.Record = doAssert node.discovery != nil, "The Eth2Node must be initialized" node.discovery.localNode.record -proc shortForm*(id: NetKeyPair): string = +func shortForm*(id: NetKeyPair): string = $PeerId.init(id.pubkey) proc subscribe*( @@ -2431,18 +2418,8 @@ proc addAsyncValidator*[MsgType](node: Eth2Node, proc unsubscribe*(node: Eth2Node, topic: string) = node.pubsub.unsubscribeAll(topic) -proc traceMessage(fut: FutureBase, topic: string) = - fut.addCallback do (arg: pointer): - if not(fut.failed): - trace "Outgoing pubsub message sent" - elif fut.error != nil: - debug "Gossip message not sent", - topic, err = fut.error.msg - else: - debug "Unexpected future state for gossip", - topic, state = fut.state - -proc broadcast*(node: Eth2Node, topic: string, msg: auto) = +proc broadcast(node: Eth2Node, topic: string, msg: auto): + Future[Result[void, cstring]] {.async.} = try: let uncompressed = SSZ.encode(msg) @@ -2455,10 +2432,15 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) = except InputTooLarge: raiseAssert "More than 4gb? not likely.." - inc nbc_gossip_messages_sent + let peers = await node.pubsub.publish(topic, compressed) - var futSnappy = node.pubsub.publish(topic, compressed) - traceMessage(futSnappy, topic) + # TODO remove workaround for sync committee BN/VC log spam + if peers > 0 or find(topic, "sync_committee_") != -1: + inc nbc_gossip_messages_sent + return ok() + else: + # Increments libp2p_gossipsub_failed_publish metric + return err("No peers on libp2p topic") except IOError as exc: raiseAssert exc.msg # TODO in-memory compression shouldn't fail @@ -2541,75 +2523,85 @@ func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest = proc getWallEpoch(node: Eth2Node): Epoch = node.getBeaconTime().slotOrZero.epoch -proc broadcastAttestation*(node: Eth2Node, subnet_id: SubnetId, - attestation: Attestation) = +proc broadcastAttestation*( + node: Eth2Node, subnet_id: SubnetId, attestation: Attestation): + Future[SendResult] = # Regardless of the contents of the attestation, # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/altair/p2p-interface.md#transitioning-the-gossip # implies that pre-fork, messages using post-fork digests might be # ignored, whilst post-fork, there is effectively a seen_ttl-based # timer unsubscription point that means no new pre-fork-forkdigest # should be sent. - let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) - let topic = getAttestationTopic(forkPrefix, subnet_id) + let + forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) + topic = getAttestationTopic(forkPrefix, subnet_id) node.broadcast(topic, attestation) -proc broadcastVoluntaryExit*(node: Eth2Node, exit: SignedVoluntaryExit) = - let topic = getVoluntaryExitsTopic( - node.forkDigestAtEpoch(node.getWallEpoch)) +proc broadcastVoluntaryExit*( + node: Eth2Node, exit: SignedVoluntaryExit): Future[SendResult] = + let topic = getVoluntaryExitsTopic(node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, exit) -proc broadcastAttesterSlashing*(node: Eth2Node, slashing: AttesterSlashing) = +proc broadcastAttesterSlashing*( + node: Eth2Node, slashing: AttesterSlashing): Future[SendResult] = let topic = getAttesterSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) -proc broadcastProposerSlashing*(node: Eth2Node, slashing: ProposerSlashing) = +proc broadcastProposerSlashing*( + node: Eth2Node, slashing: ProposerSlashing): Future[SendResult] = let topic = getProposerSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) -proc broadcastAggregateAndProof*(node: Eth2Node, - proof: SignedAggregateAndProof) = +proc broadcastAggregateAndProof*( + node: Eth2Node, proof: SignedAggregateAndProof): Future[SendResult] = let topic = getAggregateAndProofsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, proof) -proc broadcastBeaconBlock*(node: Eth2Node, blck: phase0.SignedBeaconBlock) = +proc broadcastBeaconBlock*( + node: Eth2Node, blck: phase0.SignedBeaconBlock): Future[SendResult] = let topic = getBeaconBlocksTopic(node.forkDigests.phase0) node.broadcast(topic, blck) -proc broadcastBeaconBlock*(node: Eth2Node, blck: altair.SignedBeaconBlock) = +proc broadcastBeaconBlock*( + node: Eth2Node, blck: altair.SignedBeaconBlock): Future[SendResult] = let topic = getBeaconBlocksTopic(node.forkDigests.altair) node.broadcast(topic, blck) -proc broadcastBeaconBlock*(node: Eth2Node, blck: bellatrix.SignedBeaconBlock) = +proc broadcastBeaconBlock*( + node: Eth2Node, blck: bellatrix.SignedBeaconBlock): Future[SendResult] = let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix) node.broadcast(topic, blck) -proc broadcastBeaconBlock*(node: Eth2Node, forked: ForkedSignedBeaconBlock) = +proc broadcastBeaconBlock*( + node: Eth2Node, forked: ForkedSignedBeaconBlock): Future[SendResult] = withBlck(forked): node.broadcastBeaconBlock(blck) proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, - subcommitteeIdx: SyncSubcommitteeIndex) = + subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] = let topic = getSyncCommitteeTopic( node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx) node.broadcast(topic, msg) proc broadcastSignedContributionAndProof*( - node: Eth2Node, msg: SignedContributionAndProof) = + node: Eth2Node, msg: SignedContributionAndProof): Future[SendResult] = let topic = getSyncCommitteeContributionAndProofTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, msg) proc broadcastLightClientFinalityUpdate*( - node: Eth2Node, msg: altair.LightClientFinalityUpdate) = + node: Eth2Node, msg: altair.LightClientFinalityUpdate): + Future[SendResult] = let topic = getLightClientFinalityUpdateTopic( node.forkDigestAtEpoch(msg.attested_header.slot.epoch)) node.broadcast(topic, msg) proc broadcastLightClientOptimisticUpdate*( - node: Eth2Node, msg: altair.LightClientOptimisticUpdate) = + node: Eth2Node, msg: altair.LightClientOptimisticUpdate): + Future[SendResult] = let topic = getLightClientOptimisticUpdateTopic( node.forkDigestAtEpoch(msg.attested_header.slot.epoch)) node.broadcast(topic, msg) diff --git a/beacon_chain/rpc/rest_beacon_api.nim b/beacon_chain/rpc/rest_beacon_api.nim index c2e2ef2fa..009d5813a 100644 --- a/beacon_chain/rpc/rest_beacon_api.nim +++ b/beacon_chain/rpc/rest_beacon_api.nim @@ -950,7 +950,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = InvalidAttesterSlashingObjectError, $dres.error()) dres.get() - let res = node.sendAttesterSlashing(slashing) + let res = await node.sendAttesterSlashing(slashing) if res.isErr(): return RestApiResponse.jsonError(Http400, AttesterSlashingValidationError, @@ -982,7 +982,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = InvalidProposerSlashingObjectError, $dres.error()) dres.get() - let res = node.sendProposerSlashing(slashing) + let res = await node.sendProposerSlashing(slashing) if res.isErr(): return RestApiResponse.jsonError(Http400, ProposerSlashingValidationError, @@ -1045,7 +1045,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = InvalidVoluntaryExitObjectError, $dres.error()) dres.get() - let res = node.sendVoluntaryExit(exit) + let res = await node.sendVoluntaryExit(exit) if res.isErr(): return RestApiResponse.jsonError(Http400, VoluntaryExitValidationError, diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 21cc67286..5ab3f24ef 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -83,7 +83,6 @@ declarePublicGauge(attached_validator_balance_total, logScope: topics = "beacval" type - SendResult* = Result[void, cstring] SendBlockResult* = Result[bool, cstring] ForkedBlockResult* = Result[ForkedBeaconBlock, string] @@ -224,7 +223,7 @@ func isGoodForSending(validationResult: ValidationRes): bool = # to ensure that the message will reach as many peers as possible. validationResult.isOk() or validationResult.error[0] == ValidationResult.Ignore -proc sendAttestation*( +proc sendAttestation( node: BeaconNode, attestation: Attestation, subnet_id: SubnetId, checkSignature: bool): Future[SendResult] {.async.} = # Validate attestation before sending it via gossip - validation will also @@ -236,9 +235,11 @@ proc sendAttestation*( return if res.isGoodForSending: - node.network.broadcastAttestation(subnet_id, attestation) - beacon_attestations_sent.inc() - ok() + let sendResult = + await node.network.broadcastAttestation(subnet_id, attestation) + if sendResult.isOk: + beacon_attestations_sent.inc() + sendResult else: notice "Produced attestation failed validation", attestation = shortLog(attestation), @@ -268,18 +269,33 @@ proc handleLightClientUpdates(node: BeaconNode, slot: Slot) {.async.} = let finalized_slot = latest.finalized_header.slot if finalized_slot > node.lightClientPool[].latestForwardedFinalitySlot: template msg(): auto = latest - node.network.broadcastLightClientFinalityUpdate(msg) + let sendResult = await node.network.broadcastLightClientFinalityUpdate(msg) + + # Optimization for message with ephemeral validity, whether sent or not node.lightClientPool[].latestForwardedFinalitySlot = finalized_slot - beacon_light_client_finality_updates_sent.inc() - notice "LC finality update sent", message = shortLog(msg) + + if sendResult.isOk: + beacon_light_client_finality_updates_sent.inc() + notice "LC finality update sent", message = shortLog(msg) + else: + warn "LC finality update failed to send", + error = sendResult.error() let attested_slot = latest.attested_header.slot if attested_slot > node.lightClientPool[].latestForwardedOptimisticSlot: let msg = latest.toOptimistic - node.network.broadcastLightClientOptimisticUpdate(msg) + let sendResult = + await node.network.broadcastLightClientOptimisticUpdate(msg) + + # Optimization for message with ephemeral validity, whether sent or not node.lightClientPool[].latestForwardedOptimisticSlot = attested_slot - beacon_light_client_optimistic_updates_sent.inc() - notice "LC optimistic update sent", message = shortLog(msg) + + if sendResult.isOk: + beacon_light_client_optimistic_updates_sent.inc() + notice "LC optimistic update sent", message = shortLog(msg) + else: + warn "LC optimistic update failed to send", + error = sendResult.error() proc scheduleSendingLightClientUpdates(node: BeaconNode, slot: Slot) = if not node.config.lightClientDataServe.get: @@ -308,10 +324,12 @@ proc sendSyncCommitteeMessage( return if res.isGoodForSending: - node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx) - beacon_sync_committee_messages_sent.inc() - node.scheduleSendingLightClientUpdates(msg.slot) - SendResult.ok() + let sendResult = + await node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx) + if sendResult.isOk: + beacon_sync_committee_messages_sent.inc() + node.scheduleSendingLightClientUpdates(msg.slot) + sendResult else: notice "Sync committee message failed validation", msg, error = res.error() @@ -409,9 +427,11 @@ proc sendSyncCommitteeContribution*( return if res.isGoodForSending: - node.network.broadcastSignedContributionAndProof(msg) - beacon_sync_committee_contributions_sent.inc() - ok() + let sendResult = + await node.network.broadcastSignedContributionAndProof(msg) + if sendResult.isOk: + beacon_sync_committee_contributions_sent.inc() + sendResult else: notice "Sync committee contribution failed validation", msg, error = res.error() @@ -439,7 +459,11 @@ proc createAndSendAttestation(node: BeaconNode, let res = await node.sendAttestation( attestation, subnet_id, checkSignature = false) - if not res.isOk(): # Logged in sendAttestation + if not res.isOk(): + warn "Attestation failed", + validator = shortLog(validator), + attestation = shortLog(attestation), + error = res.error() return if node.config.dumpEnabled: @@ -766,7 +790,15 @@ proc proposeBlock(node: BeaconNode, # example a delay in signing. # We'll start broadcasting it before integrating fully in the chaindag # so that it can start propagating through the network ASAP. - node.network.broadcastBeaconBlock(signedBlock) + let sendResult = await node.network.broadcastBeaconBlock(signedBlock) + + if sendResult.isErr: + warn "Block failed to send", + blockRoot = shortLog(blockRoot), blck = shortLog(blck), + signature = shortLog(signature), validator = shortLog(validator), + error = sendResult.error() + + return head let wallTime = node.beaconClock.now() @@ -901,7 +933,8 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode, let res = await node.sendSyncCommitteeMessage( msg, subcommitteeIdx, checkSignature = false) if res.isErr(): - # Logged in sendSyncCommitteeMessage + warn "Sync committee message failed", + error = res.error() return if node.config.dumpEnabled: @@ -1151,7 +1184,12 @@ proc sendAggregatedAttestations( signedAP = SignedAggregateAndProof( message: aggregateAndProof, signature: sig) - node.network.broadcastAggregateAndProof(signedAP) + let sendResult = await node.network.broadcastAggregateAndProof(signedAP) + + if sendResult.isErr: + warn "Aggregated attestation failed to send", + error = sendResult.error() + return # The subnet on which the attestations (should have) arrived let @@ -1351,7 +1389,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = proc sendAttestation*(node: BeaconNode, attestation: Attestation): Future[SendResult] {.async.} = - # REST/JSON-RPC API helper procedure. + # REST helper procedure. let target = node.dag.getBlockRef(attestation.data.target.root).valueOr: notice "Attempt to send attestation for unknown target", @@ -1392,67 +1430,67 @@ proc sendAttestation*(node: BeaconNode, proc sendAggregateAndProof*(node: BeaconNode, proof: SignedAggregateAndProof): Future[SendResult] {. async.} = - # REST/JSON-RPC API helper procedure. + # REST helper procedure. let res = await node.processor.aggregateValidator(MsgSource.api, proof) return if res.isGoodForSending: - node.network.broadcastAggregateAndProof(proof) + let sendResult = await node.network.broadcastAggregateAndProof(proof) - notice "Aggregated attestation sent", - attestation = shortLog(proof.message.aggregate), - aggregator_index = proof.message.aggregator_index, - signature = shortLog(proof.signature) + if sendResult.isOk: + notice "Aggregated attestation sent", + attestation = shortLog(proof.message.aggregate), + aggregator_index = proof.message.aggregator_index, + signature = shortLog(proof.signature) - ok() + sendResult else: - notice "Aggregate and proof failed validation", - proof = shortLog(proof.message.aggregate), error = res.error() + notice "Aggregated attestation failed validation", + proof = shortLog(proof.message.aggregate), error = res.error() err(res.error()[1]) -proc sendVoluntaryExit*(node: BeaconNode, - exit: SignedVoluntaryExit): SendResult = - # REST/JSON-RPC API helper procedure. +proc sendVoluntaryExit*( + node: BeaconNode, exit: SignedVoluntaryExit): + Future[SendResult] {.async.} = + # REST helper procedure. let res = node.processor[].voluntaryExitValidator(MsgSource.api, exit) if res.isGoodForSending: - node.network.broadcastVoluntaryExit(exit) - ok() + return await node.network.broadcastVoluntaryExit(exit) else: notice "Voluntary exit request failed validation", exit = shortLog(exit.message), error = res.error() - err(res.error()[1]) + return err(res.error()[1]) -proc sendAttesterSlashing*(node: BeaconNode, - slashing: AttesterSlashing): SendResult = - # REST/JSON-RPC API helper procedure. +proc sendAttesterSlashing*( + node: BeaconNode, slashing: AttesterSlashing): Future[SendResult] {.async.} = + # REST helper procedure. let res = node.processor[].attesterSlashingValidator(MsgSource.api, slashing) if res.isGoodForSending: - node.network.broadcastAttesterSlashing(slashing) - ok() + return await node.network.broadcastAttesterSlashing(slashing) else: notice "Attester slashing request failed validation", slashing = shortLog(slashing), error = res.error() - err(res.error()[1]) + return err(res.error()[1]) -proc sendProposerSlashing*(node: BeaconNode, - slashing: ProposerSlashing): SendResult = - # REST/JSON-RPC API helper procedure. +proc sendProposerSlashing*( + node: BeaconNode, slashing: ProposerSlashing): Future[SendResult] + {.async.} = + # REST helper procedure. let res = node.processor[].proposerSlashingValidator(MsgSource.api, slashing) if res.isGoodForSending: - node.network.broadcastProposerSlashing(slashing) - ok() + return await node.network.broadcastProposerSlashing(slashing) else: notice "Proposer slashing request failed validation", slashing = shortLog(slashing), error = res.error() - err(res.error()[1]) + return err(res.error()[1]) proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock ): Future[SendBlockResult] {.async.} = - # REST/JSON-RPC API helper procedure. + # REST helper procedure. block: # Start with a quick gossip validation check such that broadcasting the # block doesn't get the node into trouble @@ -1465,7 +1503,9 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock # The block passed basic gossip validation - we can "safely" broadcast it now. # In fact, per the spec, we should broadcast it even if it later fails to # apply to our state. - node.network.broadcastBeaconBlock(forked) + let sendResult = await node.network.broadcastBeaconBlock(forked) + if sendResult.isErr: + return SendBlockResult.err(sendResult.error()) let wallTime = node.beaconClock.now()