check for and log gossip broadcast failure (#3737)

* check for and log gossip broadcast failure

* switch notices to warns; update LC variables regardless

* don't both return a Result and log sending error

* add metrics counter for failed-due-to-no-peers and removed unnecessary async

* don't report failure of sync committee messages

* remove redundant metric

* document metric being incremented
This commit is contained in:
tersec 2022-06-15 08:14:47 +00:00 committed by GitHub
parent 694b653757
commit 27e1625d34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 177 additions and 145 deletions

View File

@ -46,6 +46,7 @@ type
Bytes = seq[byte] Bytes = seq[byte]
ErrorMsg = List[byte, 256] ErrorMsg = List[byte, 256]
SendResult* = Result[void, cstring]
# TODO: This is here only to eradicate a compiler # TODO: This is here only to eradicate a compiler
# warning about unused import (rpc/messages). # warning about unused import (rpc/messages).
@ -216,7 +217,7 @@ func phase0metadata*(node: Eth2Node): phase0.MetaData =
seq_number: node.metadata.seq_number, seq_number: node.metadata.seq_number,
attnets: node.metadata.attnets) attnets: node.metadata.attnets)
func toAltairMetadata*(phase0: phase0.MetaData): altair.MetaData = func toAltairMetadata(phase0: phase0.MetaData): altair.MetaData =
altair.MetaData( altair.MetaData(
seq_number: phase0.seq_number, seq_number: phase0.seq_number,
attnets: phase0.attnets) attnets: phase0.attnets)
@ -315,9 +316,6 @@ func shortLog*(peer: Peer): string = shortLog(peer.peerId)
chronicles.formatIt(Peer): shortLog(it) chronicles.formatIt(Peer): shortLog(it)
chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet()) chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet())
template remote*(peer: Peer): untyped =
peer.peerId
proc openStream(node: Eth2Node, proc openStream(node: Eth2Node,
peer: Peer, peer: Peer,
protocolId: string): Future[Connection] {.async.} = protocolId: string): Future[Connection] {.async.} =
@ -331,7 +329,7 @@ proc openStream(node: Eth2Node,
return conn return conn
proc init*(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.}
func peerId*(node: Eth2Node): PeerId = func peerId*(node: Eth2Node): PeerId =
node.switch.peerInfo.peerId node.switch.peerInfo.peerId
@ -339,7 +337,7 @@ func peerId*(node: Eth2Node): PeerId =
func enrRecord*(node: Eth2Node): Record = func enrRecord*(node: Eth2Node): Record =
node.discovery.localNode.record node.discovery.localNode.record
proc getPeer*(node: Eth2Node, peerId: PeerId): Peer = proc getPeer(node: Eth2Node, peerId: PeerId): Peer =
node.peers.withValue(peerId, peer) do: node.peers.withValue(peerId, peer) do:
return peer[] return peer[]
do: do:
@ -350,33 +348,33 @@ proc peerFromStream(network: Eth2Node, conn: Connection): Peer =
result = network.getPeer(conn.peerId) result = network.getPeer(conn.peerId)
result.peerId = conn.peerId result.peerId = conn.peerId
proc getKey*(peer: Peer): PeerId {.inline.} = func getKey*(peer: Peer): PeerId {.inline.} =
peer.peerId peer.peerId
proc getFuture*(peer: Peer): Future[void] {.inline.} = proc getFuture(peer: Peer): Future[void] {.inline.} =
if isNil(peer.disconnectedFut): if isNil(peer.disconnectedFut):
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
peer.disconnectedFut peer.disconnectedFut
proc getScore*(a: Peer): int = func getScore*(a: Peer): int =
## Returns current score value for peer ``peer``. ## Returns current score value for peer ``peer``.
a.score a.score
proc updateScore*(peer: Peer, score: int) {.inline.} = func updateScore*(peer: Peer, score: int) {.inline.} =
## Update peer's ``peer`` score with value ``score``. ## Update peer's ``peer`` score with value ``score``.
peer.score = peer.score + score peer.score = peer.score + score
if peer.score > PeerScoreHighLimit: if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit peer.score = PeerScoreHighLimit
proc calcThroughput(dur: Duration, value: uint64): float = func calcThroughput(dur: Duration, value: uint64): float =
let secs = float(chronos.seconds(1).nanoseconds) let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur): if isZero(dur):
0.0 0.0
else: else:
float(value) * (secs / float(dur.nanoseconds)) float(value) * (secs / float(dur.nanoseconds))
proc updateNetThroughput*(peer: Peer, dur: Duration, func updateNetThroughput(peer: Peer, dur: Duration,
bytesCount: uint64) {.inline.} = bytesCount: uint64) {.inline.} =
## Update peer's ``peer`` network throughput. ## Update peer's ``peer`` network throughput.
let bytesPerSecond = calcThroughput(dur, bytesCount) let bytesPerSecond = calcThroughput(dur, bytesCount)
let a = peer.netThroughput.average let a = peer.netThroughput.average
@ -384,19 +382,11 @@ proc updateNetThroughput*(peer: Peer, dur: Duration,
peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1) peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1)
inc(peer.netThroughput.count) inc(peer.netThroughput.count)
proc netBps*(peer: Peer): float {.inline.} = func netKbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Bps for peer ``peer``.
round((peer.netThroughput.average * 10_000) / 10_000)
proc netKbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Kbps for peer ``peer``. ## Returns current network throughput average value in Kbps for peer ``peer``.
round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) round(((peer.netThroughput.average / 1024) * 10_000) / 10_000)
proc netMbps*(peer: Peer): float {.inline.} = func `<`(a, b: Peer): bool =
## Returns current network throughput average value in Mbps for peer ``peer``.
round(((peer.netThroughput.average / (1024 * 1024)) * 10_000) / 10_000)
proc `<`*(a, b: Peer): bool =
## Comparison function, which first checks peer's scores, and if the peers' ## Comparison function, which first checks peer's scores, and if the peers'
## score is equal it compares peers' network throughput. ## score is equal it compares peers' network throughput.
if a.score < b.score: if a.score < b.score:
@ -428,7 +418,7 @@ template awaitNonNegativeRequestQuota*(peer: Peer) =
func allowedOpsPerSecondCost*(n: int): float = func allowedOpsPerSecondCost*(n: int): float =
(replenishRate * 1000000000'f / n.float) (replenishRate * 1000000000'f / n.float)
proc isSeen*(network: Eth2Node, peerId: PeerId): bool = proc isSeen(network: Eth2Node, peerId: PeerId): bool =
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
## yet expired. ## yet expired.
let currentTime = now(chronos.Moment) let currentTime = now(chronos.Moment)
@ -444,7 +434,7 @@ proc isSeen*(network: Eth2Node, peerId: PeerId): bool =
else: else:
true true
proc addSeen*(network: Eth2Node, peerId: PeerId, proc addSeen(network: Eth2Node, peerId: PeerId,
period: chronos.Duration) = period: chronos.Duration) =
## Adds peer with PeerId ``peerId`` to SeenTable and timeout ``period``. ## Adds peer with PeerId ``peerId`` to SeenTable and timeout ``period``.
let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period) let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period)
@ -560,7 +550,7 @@ proc isLightClientRequestProto(fn: NimNode): NimNode =
return newLit(false) return newLit(false)
proc writeChunkSZ*( proc writeChunkSZ(
conn: Connection, responseCode: Option[ResponseCode], conn: Connection, responseCode: Option[ResponseCode],
uncompressedLen: uint64, payloadSZ: openArray[byte], uncompressedLen: uint64, payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
@ -581,10 +571,10 @@ proc writeChunkSZ*(
conn.write(output.getOutput) conn.write(output.getOutput)
proc writeChunk*(conn: Connection, proc writeChunk(conn: Connection,
responseCode: Option[ResponseCode], responseCode: Option[ResponseCode],
payload: openArray[byte], payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput() var output = memoryOutput()
try: try:
@ -653,7 +643,7 @@ proc sendResponseChunkBytes(
inc response.writtenChunks inc response.writtenChunks
response.stream.writeChunk(some Success, payload, contextBytes) response.stream.writeChunk(some Success, payload, contextBytes)
proc sendResponseChunk*( proc sendResponseChunk(
response: UntypedResponse, val: auto, response: UntypedResponse, val: auto,
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
sendResponseChunkBytes(response, SSZ.encode(val), contextBytes) sendResponseChunkBytes(response, SSZ.encode(val), contextBytes)
@ -667,8 +657,8 @@ template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResult: auto): untyped = handlerResult: auto): untyped =
writeChunk(stream, some Success, SSZ.encode(handlerResult)) writeChunk(stream, some Success, SSZ.encode(handlerResult))
proc uncompressFramedStream*(conn: Connection, proc uncompressFramedStream(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]] expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} = {.async.} =
var header: array[framingHeader.len, byte] var header: array[framingHeader.len, byte]
try: try:
@ -916,7 +906,7 @@ template send*[M](
doAssert UntypedResponse(r).writtenChunks == 0 doAssert UntypedResponse(r).writtenChunks == 0
sendResponseChunk(UntypedResponse(r), val, contextBytes) sendResponseChunk(UntypedResponse(r), val, contextBytes)
proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} = proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} =
# Loop down serially because it's easier to reason about the connection state # Loop down serially because it's easier to reason about the connection state
# when there are fewer async races, specially during setup # when there are fewer async races, specially during setup
for protocol in allProtocols: for protocol in allProtocols:
@ -1153,7 +1143,7 @@ proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool =
else: else:
true true
proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} = proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
## Establish connection with remote peer identified by address ``peerAddr``. ## Establish connection with remote peer identified by address ``peerAddr``.
logScope: logScope:
peer = peerAddr.peerId peer = peerAddr.peerId
@ -1209,7 +1199,7 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr) ok(peerAddr)
proc isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool = func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool =
if discoveryForkId.fork_digest == peerForkId.fork_digest: if discoveryForkId.fork_digest == peerForkId.fork_digest:
if discoveryForkId.next_fork_version < peerForkId.next_fork_version: if discoveryForkId.next_fork_version < peerForkId.next_fork_version:
# Peer knows about a fork and we don't # Peer knows about a fork and we don't
@ -1445,7 +1435,7 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
default(SyncnetBits) default(SyncnetBits)
) )
proc runDiscoveryLoop*(node: Eth2Node) {.async.} = proc runDiscoveryLoop(node: Eth2Node) {.async.} =
debug "Starting discovery loop" debug "Starting discovery loop"
while true: while true:
@ -1648,14 +1638,14 @@ proc onConnEvent(node: Eth2Node, peerId: PeerId, event: ConnEvent) {.async.} =
peer = peerId, peer_state = peer.connectionState peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected peer.connectionState = Disconnected
proc new*(T: type Eth2Node, proc new(T: type Eth2Node,
config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig, config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig,
enrForkId: ENRForkID, discoveryForkId: ENRForkID, enrForkId: ENRForkID, discoveryForkId: ENRForkID,
forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn,
switch: Switch, pubsub: GossipSub, switch: Switch, pubsub: GossipSub,
ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port], ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port],
privKey: keys.PrivateKey, discovery: bool, privKey: keys.PrivateKey, discovery: bool,
rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} = rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} =
when not defined(local_testnet): when not defined(local_testnet):
let let
connectTimeout = chronos.minutes(1) connectTimeout = chronos.minutes(1)
@ -1742,7 +1732,7 @@ proc new*(T: type Eth2Node,
node node
template publicKey*(node: Eth2Node): keys.PublicKey = template publicKey(node: Eth2Node): keys.PublicKey =
node.discovery.privKey.toPublicKey node.discovery.privKey.toPublicKey
proc startListening*(node: Eth2Node) {.async.} = proc startListening*(node: Eth2Node) {.async.} =
@ -1810,7 +1800,7 @@ proc stop*(node: Eth2Node) {.async.} =
trace "Eth2Node.stop(): timeout reached", timeout, trace "Eth2Node.stop(): timeout reached", timeout,
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
proc init*(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer =
let res = Peer( let res = Peer(
peerId: peerId, peerId: peerId,
network: network, network: network,
@ -2030,13 +2020,10 @@ proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} =
await sleepAsync(1.seconds div max(1, excessPeers)) await sleepAsync(1.seconds div max(1, excessPeers))
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
func asEthKey*(key: PrivateKey): keys.PrivateKey = func asEthKey*(key: PrivateKey): keys.PrivateKey =
keys.PrivateKey(key.skkey) keys.PrivateKey(key.skkey)
proc initAddress*(T: type MultiAddress, str: string): T = proc initAddress(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str) let address = MultiAddress.init(str)
if IPFS.match(address) and matchPartial(multiaddress.TCP, address): if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
result = address result = address
@ -2189,9 +2176,9 @@ func gossipId(
messageDigest.data[0..19] messageDigest.data[0..19]
proc newBeaconSwitch*(config: BeaconNodeConf | LightClientConf, proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf,
seckey: PrivateKey, address: MultiAddress, seckey: PrivateKey, address: MultiAddress,
rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} = rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} =
SwitchBuilder SwitchBuilder
.new() .new()
.withPrivateKey(seckey) .withPrivateKey(seckey)
@ -2340,11 +2327,11 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
node node
proc announcedENR*(node: Eth2Node): enr.Record = func announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized" doAssert node.discovery != nil, "The Eth2Node must be initialized"
node.discovery.localNode.record node.discovery.localNode.record
proc shortForm*(id: NetKeyPair): string = func shortForm*(id: NetKeyPair): string =
$PeerId.init(id.pubkey) $PeerId.init(id.pubkey)
proc subscribe*( proc subscribe*(
@ -2431,18 +2418,8 @@ proc addAsyncValidator*[MsgType](node: Eth2Node,
proc unsubscribe*(node: Eth2Node, topic: string) = proc unsubscribe*(node: Eth2Node, topic: string) =
node.pubsub.unsubscribeAll(topic) node.pubsub.unsubscribeAll(topic)
proc traceMessage(fut: FutureBase, topic: string) = proc broadcast(node: Eth2Node, topic: string, msg: auto):
fut.addCallback do (arg: pointer): Future[Result[void, cstring]] {.async.} =
if not(fut.failed):
trace "Outgoing pubsub message sent"
elif fut.error != nil:
debug "Gossip message not sent",
topic, err = fut.error.msg
else:
debug "Unexpected future state for gossip",
topic, state = fut.state
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
try: try:
let uncompressed = SSZ.encode(msg) let uncompressed = SSZ.encode(msg)
@ -2455,10 +2432,15 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
except InputTooLarge: except InputTooLarge:
raiseAssert "More than 4gb? not likely.." raiseAssert "More than 4gb? not likely.."
inc nbc_gossip_messages_sent let peers = await node.pubsub.publish(topic, compressed)
var futSnappy = node.pubsub.publish(topic, compressed) # TODO remove workaround for sync committee BN/VC log spam
traceMessage(futSnappy, topic) if peers > 0 or find(topic, "sync_committee_") != -1:
inc nbc_gossip_messages_sent
return ok()
else:
# Increments libp2p_gossipsub_failed_publish metric
return err("No peers on libp2p topic")
except IOError as exc: except IOError as exc:
raiseAssert exc.msg # TODO in-memory compression shouldn't fail raiseAssert exc.msg # TODO in-memory compression shouldn't fail
@ -2541,75 +2523,85 @@ func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest =
proc getWallEpoch(node: Eth2Node): Epoch = proc getWallEpoch(node: Eth2Node): Epoch =
node.getBeaconTime().slotOrZero.epoch node.getBeaconTime().slotOrZero.epoch
proc broadcastAttestation*(node: Eth2Node, subnet_id: SubnetId, proc broadcastAttestation*(
attestation: Attestation) = node: Eth2Node, subnet_id: SubnetId, attestation: Attestation):
Future[SendResult] =
# Regardless of the contents of the attestation, # Regardless of the contents of the attestation,
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/altair/p2p-interface.md#transitioning-the-gossip # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/altair/p2p-interface.md#transitioning-the-gossip
# implies that pre-fork, messages using post-fork digests might be # implies that pre-fork, messages using post-fork digests might be
# ignored, whilst post-fork, there is effectively a seen_ttl-based # ignored, whilst post-fork, there is effectively a seen_ttl-based
# timer unsubscription point that means no new pre-fork-forkdigest # timer unsubscription point that means no new pre-fork-forkdigest
# should be sent. # should be sent.
let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) let
let topic = getAttestationTopic(forkPrefix, subnet_id) forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch)
topic = getAttestationTopic(forkPrefix, subnet_id)
node.broadcast(topic, attestation) node.broadcast(topic, attestation)
proc broadcastVoluntaryExit*(node: Eth2Node, exit: SignedVoluntaryExit) = proc broadcastVoluntaryExit*(
let topic = getVoluntaryExitsTopic( node: Eth2Node, exit: SignedVoluntaryExit): Future[SendResult] =
node.forkDigestAtEpoch(node.getWallEpoch)) let topic = getVoluntaryExitsTopic(node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, exit) node.broadcast(topic, exit)
proc broadcastAttesterSlashing*(node: Eth2Node, slashing: AttesterSlashing) = proc broadcastAttesterSlashing*(
node: Eth2Node, slashing: AttesterSlashing): Future[SendResult] =
let topic = getAttesterSlashingsTopic( let topic = getAttesterSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch)) node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, slashing) node.broadcast(topic, slashing)
proc broadcastProposerSlashing*(node: Eth2Node, slashing: ProposerSlashing) = proc broadcastProposerSlashing*(
node: Eth2Node, slashing: ProposerSlashing): Future[SendResult] =
let topic = getProposerSlashingsTopic( let topic = getProposerSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch)) node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, slashing) node.broadcast(topic, slashing)
proc broadcastAggregateAndProof*(node: Eth2Node, proc broadcastAggregateAndProof*(
proof: SignedAggregateAndProof) = node: Eth2Node, proof: SignedAggregateAndProof): Future[SendResult] =
let topic = getAggregateAndProofsTopic( let topic = getAggregateAndProofsTopic(
node.forkDigestAtEpoch(node.getWallEpoch)) node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, proof) node.broadcast(topic, proof)
proc broadcastBeaconBlock*(node: Eth2Node, blck: phase0.SignedBeaconBlock) = proc broadcastBeaconBlock*(
node: Eth2Node, blck: phase0.SignedBeaconBlock): Future[SendResult] =
let topic = getBeaconBlocksTopic(node.forkDigests.phase0) let topic = getBeaconBlocksTopic(node.forkDigests.phase0)
node.broadcast(topic, blck) node.broadcast(topic, blck)
proc broadcastBeaconBlock*(node: Eth2Node, blck: altair.SignedBeaconBlock) = proc broadcastBeaconBlock*(
node: Eth2Node, blck: altair.SignedBeaconBlock): Future[SendResult] =
let topic = getBeaconBlocksTopic(node.forkDigests.altair) let topic = getBeaconBlocksTopic(node.forkDigests.altair)
node.broadcast(topic, blck) node.broadcast(topic, blck)
proc broadcastBeaconBlock*(node: Eth2Node, blck: bellatrix.SignedBeaconBlock) = proc broadcastBeaconBlock*(
node: Eth2Node, blck: bellatrix.SignedBeaconBlock): Future[SendResult] =
let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix) let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix)
node.broadcast(topic, blck) node.broadcast(topic, blck)
proc broadcastBeaconBlock*(node: Eth2Node, forked: ForkedSignedBeaconBlock) = proc broadcastBeaconBlock*(
node: Eth2Node, forked: ForkedSignedBeaconBlock): Future[SendResult] =
withBlck(forked): node.broadcastBeaconBlock(blck) withBlck(forked): node.broadcastBeaconBlock(blck)
proc broadcastSyncCommitteeMessage*( proc broadcastSyncCommitteeMessage*(
node: Eth2Node, msg: SyncCommitteeMessage, node: Eth2Node, msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex) = subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] =
let topic = getSyncCommitteeTopic( let topic = getSyncCommitteeTopic(
node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx) node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx)
node.broadcast(topic, msg) node.broadcast(topic, msg)
proc broadcastSignedContributionAndProof*( proc broadcastSignedContributionAndProof*(
node: Eth2Node, msg: SignedContributionAndProof) = node: Eth2Node, msg: SignedContributionAndProof): Future[SendResult] =
let topic = getSyncCommitteeContributionAndProofTopic( let topic = getSyncCommitteeContributionAndProofTopic(
node.forkDigestAtEpoch(node.getWallEpoch)) node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(topic, msg) node.broadcast(topic, msg)
proc broadcastLightClientFinalityUpdate*( proc broadcastLightClientFinalityUpdate*(
node: Eth2Node, msg: altair.LightClientFinalityUpdate) = node: Eth2Node, msg: altair.LightClientFinalityUpdate):
Future[SendResult] =
let topic = getLightClientFinalityUpdateTopic( let topic = getLightClientFinalityUpdateTopic(
node.forkDigestAtEpoch(msg.attested_header.slot.epoch)) node.forkDigestAtEpoch(msg.attested_header.slot.epoch))
node.broadcast(topic, msg) node.broadcast(topic, msg)
proc broadcastLightClientOptimisticUpdate*( proc broadcastLightClientOptimisticUpdate*(
node: Eth2Node, msg: altair.LightClientOptimisticUpdate) = node: Eth2Node, msg: altair.LightClientOptimisticUpdate):
Future[SendResult] =
let topic = getLightClientOptimisticUpdateTopic( let topic = getLightClientOptimisticUpdateTopic(
node.forkDigestAtEpoch(msg.attested_header.slot.epoch)) node.forkDigestAtEpoch(msg.attested_header.slot.epoch))
node.broadcast(topic, msg) node.broadcast(topic, msg)

View File

@ -950,7 +950,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidAttesterSlashingObjectError, InvalidAttesterSlashingObjectError,
$dres.error()) $dres.error())
dres.get() dres.get()
let res = node.sendAttesterSlashing(slashing) let res = await node.sendAttesterSlashing(slashing)
if res.isErr(): if res.isErr():
return RestApiResponse.jsonError(Http400, return RestApiResponse.jsonError(Http400,
AttesterSlashingValidationError, AttesterSlashingValidationError,
@ -982,7 +982,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidProposerSlashingObjectError, InvalidProposerSlashingObjectError,
$dres.error()) $dres.error())
dres.get() dres.get()
let res = node.sendProposerSlashing(slashing) let res = await node.sendProposerSlashing(slashing)
if res.isErr(): if res.isErr():
return RestApiResponse.jsonError(Http400, return RestApiResponse.jsonError(Http400,
ProposerSlashingValidationError, ProposerSlashingValidationError,
@ -1045,7 +1045,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidVoluntaryExitObjectError, InvalidVoluntaryExitObjectError,
$dres.error()) $dres.error())
dres.get() dres.get()
let res = node.sendVoluntaryExit(exit) let res = await node.sendVoluntaryExit(exit)
if res.isErr(): if res.isErr():
return RestApiResponse.jsonError(Http400, return RestApiResponse.jsonError(Http400,
VoluntaryExitValidationError, VoluntaryExitValidationError,

View File

@ -83,7 +83,6 @@ declarePublicGauge(attached_validator_balance_total,
logScope: topics = "beacval" logScope: topics = "beacval"
type type
SendResult* = Result[void, cstring]
SendBlockResult* = Result[bool, cstring] SendBlockResult* = Result[bool, cstring]
ForkedBlockResult* = Result[ForkedBeaconBlock, string] ForkedBlockResult* = Result[ForkedBeaconBlock, string]
@ -224,7 +223,7 @@ func isGoodForSending(validationResult: ValidationRes): bool =
# to ensure that the message will reach as many peers as possible. # to ensure that the message will reach as many peers as possible.
validationResult.isOk() or validationResult.error[0] == ValidationResult.Ignore validationResult.isOk() or validationResult.error[0] == ValidationResult.Ignore
proc sendAttestation*( proc sendAttestation(
node: BeaconNode, attestation: Attestation, node: BeaconNode, attestation: Attestation,
subnet_id: SubnetId, checkSignature: bool): Future[SendResult] {.async.} = subnet_id: SubnetId, checkSignature: bool): Future[SendResult] {.async.} =
# Validate attestation before sending it via gossip - validation will also # Validate attestation before sending it via gossip - validation will also
@ -236,9 +235,11 @@ proc sendAttestation*(
return return
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastAttestation(subnet_id, attestation) let sendResult =
beacon_attestations_sent.inc() await node.network.broadcastAttestation(subnet_id, attestation)
ok() if sendResult.isOk:
beacon_attestations_sent.inc()
sendResult
else: else:
notice "Produced attestation failed validation", notice "Produced attestation failed validation",
attestation = shortLog(attestation), attestation = shortLog(attestation),
@ -268,18 +269,33 @@ proc handleLightClientUpdates(node: BeaconNode, slot: Slot) {.async.} =
let finalized_slot = latest.finalized_header.slot let finalized_slot = latest.finalized_header.slot
if finalized_slot > node.lightClientPool[].latestForwardedFinalitySlot: if finalized_slot > node.lightClientPool[].latestForwardedFinalitySlot:
template msg(): auto = latest template msg(): auto = latest
node.network.broadcastLightClientFinalityUpdate(msg) let sendResult = await node.network.broadcastLightClientFinalityUpdate(msg)
# Optimization for message with ephemeral validity, whether sent or not
node.lightClientPool[].latestForwardedFinalitySlot = finalized_slot node.lightClientPool[].latestForwardedFinalitySlot = finalized_slot
beacon_light_client_finality_updates_sent.inc()
notice "LC finality update sent", message = shortLog(msg) if sendResult.isOk:
beacon_light_client_finality_updates_sent.inc()
notice "LC finality update sent", message = shortLog(msg)
else:
warn "LC finality update failed to send",
error = sendResult.error()
let attested_slot = latest.attested_header.slot let attested_slot = latest.attested_header.slot
if attested_slot > node.lightClientPool[].latestForwardedOptimisticSlot: if attested_slot > node.lightClientPool[].latestForwardedOptimisticSlot:
let msg = latest.toOptimistic let msg = latest.toOptimistic
node.network.broadcastLightClientOptimisticUpdate(msg) let sendResult =
await node.network.broadcastLightClientOptimisticUpdate(msg)
# Optimization for message with ephemeral validity, whether sent or not
node.lightClientPool[].latestForwardedOptimisticSlot = attested_slot node.lightClientPool[].latestForwardedOptimisticSlot = attested_slot
beacon_light_client_optimistic_updates_sent.inc()
notice "LC optimistic update sent", message = shortLog(msg) if sendResult.isOk:
beacon_light_client_optimistic_updates_sent.inc()
notice "LC optimistic update sent", message = shortLog(msg)
else:
warn "LC optimistic update failed to send",
error = sendResult.error()
proc scheduleSendingLightClientUpdates(node: BeaconNode, slot: Slot) = proc scheduleSendingLightClientUpdates(node: BeaconNode, slot: Slot) =
if not node.config.lightClientDataServe.get: if not node.config.lightClientDataServe.get:
@ -308,10 +324,12 @@ proc sendSyncCommitteeMessage(
return return
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx) let sendResult =
beacon_sync_committee_messages_sent.inc() await node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx)
node.scheduleSendingLightClientUpdates(msg.slot) if sendResult.isOk:
SendResult.ok() beacon_sync_committee_messages_sent.inc()
node.scheduleSendingLightClientUpdates(msg.slot)
sendResult
else: else:
notice "Sync committee message failed validation", notice "Sync committee message failed validation",
msg, error = res.error() msg, error = res.error()
@ -409,9 +427,11 @@ proc sendSyncCommitteeContribution*(
return return
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastSignedContributionAndProof(msg) let sendResult =
beacon_sync_committee_contributions_sent.inc() await node.network.broadcastSignedContributionAndProof(msg)
ok() if sendResult.isOk:
beacon_sync_committee_contributions_sent.inc()
sendResult
else: else:
notice "Sync committee contribution failed validation", notice "Sync committee contribution failed validation",
msg, error = res.error() msg, error = res.error()
@ -439,7 +459,11 @@ proc createAndSendAttestation(node: BeaconNode,
let res = await node.sendAttestation( let res = await node.sendAttestation(
attestation, subnet_id, checkSignature = false) attestation, subnet_id, checkSignature = false)
if not res.isOk(): # Logged in sendAttestation if not res.isOk():
warn "Attestation failed",
validator = shortLog(validator),
attestation = shortLog(attestation),
error = res.error()
return return
if node.config.dumpEnabled: if node.config.dumpEnabled:
@ -766,7 +790,15 @@ proc proposeBlock(node: BeaconNode,
# example a delay in signing. # example a delay in signing.
# We'll start broadcasting it before integrating fully in the chaindag # We'll start broadcasting it before integrating fully in the chaindag
# so that it can start propagating through the network ASAP. # so that it can start propagating through the network ASAP.
node.network.broadcastBeaconBlock(signedBlock) let sendResult = await node.network.broadcastBeaconBlock(signedBlock)
if sendResult.isErr:
warn "Block failed to send",
blockRoot = shortLog(blockRoot), blck = shortLog(blck),
signature = shortLog(signature), validator = shortLog(validator),
error = sendResult.error()
return head
let let
wallTime = node.beaconClock.now() wallTime = node.beaconClock.now()
@ -901,7 +933,8 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
let res = await node.sendSyncCommitteeMessage( let res = await node.sendSyncCommitteeMessage(
msg, subcommitteeIdx, checkSignature = false) msg, subcommitteeIdx, checkSignature = false)
if res.isErr(): if res.isErr():
# Logged in sendSyncCommitteeMessage warn "Sync committee message failed",
error = res.error()
return return
if node.config.dumpEnabled: if node.config.dumpEnabled:
@ -1151,7 +1184,12 @@ proc sendAggregatedAttestations(
signedAP = SignedAggregateAndProof( signedAP = SignedAggregateAndProof(
message: aggregateAndProof, message: aggregateAndProof,
signature: sig) signature: sig)
node.network.broadcastAggregateAndProof(signedAP) let sendResult = await node.network.broadcastAggregateAndProof(signedAP)
if sendResult.isErr:
warn "Aggregated attestation failed to send",
error = sendResult.error()
return
# The subnet on which the attestations (should have) arrived # The subnet on which the attestations (should have) arrived
let let
@ -1351,7 +1389,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
proc sendAttestation*(node: BeaconNode, proc sendAttestation*(node: BeaconNode,
attestation: Attestation): Future[SendResult] {.async.} = attestation: Attestation): Future[SendResult] {.async.} =
# REST/JSON-RPC API helper procedure. # REST helper procedure.
let let
target = node.dag.getBlockRef(attestation.data.target.root).valueOr: target = node.dag.getBlockRef(attestation.data.target.root).valueOr:
notice "Attempt to send attestation for unknown target", notice "Attempt to send attestation for unknown target",
@ -1392,67 +1430,67 @@ proc sendAttestation*(node: BeaconNode,
proc sendAggregateAndProof*(node: BeaconNode, proc sendAggregateAndProof*(node: BeaconNode,
proof: SignedAggregateAndProof): Future[SendResult] {. proof: SignedAggregateAndProof): Future[SendResult] {.
async.} = async.} =
# REST/JSON-RPC API helper procedure. # REST helper procedure.
let res = let res =
await node.processor.aggregateValidator(MsgSource.api, proof) await node.processor.aggregateValidator(MsgSource.api, proof)
return return
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastAggregateAndProof(proof) let sendResult = await node.network.broadcastAggregateAndProof(proof)
notice "Aggregated attestation sent", if sendResult.isOk:
attestation = shortLog(proof.message.aggregate), notice "Aggregated attestation sent",
aggregator_index = proof.message.aggregator_index, attestation = shortLog(proof.message.aggregate),
signature = shortLog(proof.signature) aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature)
ok() sendResult
else: else:
notice "Aggregate and proof failed validation", notice "Aggregated attestation failed validation",
proof = shortLog(proof.message.aggregate), error = res.error() proof = shortLog(proof.message.aggregate), error = res.error()
err(res.error()[1]) err(res.error()[1])
proc sendVoluntaryExit*(node: BeaconNode, proc sendVoluntaryExit*(
exit: SignedVoluntaryExit): SendResult = node: BeaconNode, exit: SignedVoluntaryExit):
# REST/JSON-RPC API helper procedure. Future[SendResult] {.async.} =
# REST helper procedure.
let res = let res =
node.processor[].voluntaryExitValidator(MsgSource.api, exit) node.processor[].voluntaryExitValidator(MsgSource.api, exit)
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastVoluntaryExit(exit) return await node.network.broadcastVoluntaryExit(exit)
ok()
else: else:
notice "Voluntary exit request failed validation", notice "Voluntary exit request failed validation",
exit = shortLog(exit.message), error = res.error() exit = shortLog(exit.message), error = res.error()
err(res.error()[1]) return err(res.error()[1])
proc sendAttesterSlashing*(node: BeaconNode, proc sendAttesterSlashing*(
slashing: AttesterSlashing): SendResult = node: BeaconNode, slashing: AttesterSlashing): Future[SendResult] {.async.} =
# REST/JSON-RPC API helper procedure. # REST helper procedure.
let res = let res =
node.processor[].attesterSlashingValidator(MsgSource.api, slashing) node.processor[].attesterSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastAttesterSlashing(slashing) return await node.network.broadcastAttesterSlashing(slashing)
ok()
else: else:
notice "Attester slashing request failed validation", notice "Attester slashing request failed validation",
slashing = shortLog(slashing), error = res.error() slashing = shortLog(slashing), error = res.error()
err(res.error()[1]) return err(res.error()[1])
proc sendProposerSlashing*(node: BeaconNode, proc sendProposerSlashing*(
slashing: ProposerSlashing): SendResult = node: BeaconNode, slashing: ProposerSlashing): Future[SendResult]
# REST/JSON-RPC API helper procedure. {.async.} =
# REST helper procedure.
let res = let res =
node.processor[].proposerSlashingValidator(MsgSource.api, slashing) node.processor[].proposerSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending: if res.isGoodForSending:
node.network.broadcastProposerSlashing(slashing) return await node.network.broadcastProposerSlashing(slashing)
ok()
else: else:
notice "Proposer slashing request failed validation", notice "Proposer slashing request failed validation",
slashing = shortLog(slashing), error = res.error() slashing = shortLog(slashing), error = res.error()
err(res.error()[1]) return err(res.error()[1])
proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
): Future[SendBlockResult] {.async.} = ): Future[SendBlockResult] {.async.} =
# REST/JSON-RPC API helper procedure. # REST helper procedure.
block: block:
# Start with a quick gossip validation check such that broadcasting the # Start with a quick gossip validation check such that broadcasting the
# block doesn't get the node into trouble # block doesn't get the node into trouble
@ -1465,7 +1503,9 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
# The block passed basic gossip validation - we can "safely" broadcast it now. # The block passed basic gossip validation - we can "safely" broadcast it now.
# In fact, per the spec, we should broadcast it even if it later fails to # In fact, per the spec, we should broadcast it even if it later fails to
# apply to our state. # apply to our state.
node.network.broadcastBeaconBlock(forked) let sendResult = await node.network.broadcastBeaconBlock(forked)
if sendResult.isErr:
return SendBlockResult.err(sendResult.error())
let let
wallTime = node.beaconClock.now() wallTime = node.beaconClock.now()