mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-09 05:52:45 +00:00
Implement the even/odd request ID scheme; Handle more edge cases; Break the cyclic imports
This commit is contained in:
parent
96e2a02faf
commit
e228c2dbcb
@ -4,10 +4,10 @@ import
|
|||||||
chronos, chronicles, confutils, serialization/errors,
|
chronos, chronicles, confutils, serialization/errors,
|
||||||
eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils,
|
eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils,
|
||||||
spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator],
|
spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator],
|
||||||
conf, time,
|
conf, time, state_transition, fork_choice, ssz, beacon_chain_db,
|
||||||
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
|
validator_pool, extras, attestation_pool, block_pool, eth2_network,
|
||||||
attestation_pool, block_pool, eth2_network, beacon_node_types,
|
beacon_node_types, mainchain_monitor, trusted_state_snapshots, version,
|
||||||
mainchain_monitor, trusted_state_snapshots, version
|
sync_protocol, request_manager
|
||||||
|
|
||||||
const
|
const
|
||||||
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
|
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
|
||||||
@ -18,13 +18,7 @@ const
|
|||||||
genesisFile = "genesis.json"
|
genesisFile = "genesis.json"
|
||||||
testnetsBaseUrl = "https://serenity-testnets.status.im"
|
testnetsBaseUrl = "https://serenity-testnets.status.im"
|
||||||
|
|
||||||
# #################################################
|
|
||||||
# Careful handling of beacon_node <-> sync_protocol
|
|
||||||
# to avoid recursive dependencies
|
|
||||||
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
|
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
|
||||||
# Forward decl for sync_protocol
|
|
||||||
import sync_protocol, request_manager
|
|
||||||
# #################################################
|
|
||||||
|
|
||||||
func localValidatorsDir(conf: BeaconNodeConf): string =
|
func localValidatorsDir(conf: BeaconNodeConf): string =
|
||||||
conf.dataDir / "validators"
|
conf.dataDir / "validators"
|
||||||
@ -88,6 +82,7 @@ proc saveValidatorKey(keyName, key: string, conf: BeaconNodeConf) =
|
|||||||
|
|
||||||
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
|
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
|
||||||
new result
|
new result
|
||||||
|
result.onBeaconBlock = onBeaconBlock
|
||||||
result.config = conf
|
result.config = conf
|
||||||
result.networkIdentity = getPersistentNetIdentity(conf)
|
result.networkIdentity = getPersistentNetIdentity(conf)
|
||||||
result.nickname = if conf.nodename == "auto": shortForm(result.networkIdentity)
|
result.nickname = if conf.nodename == "auto": shortForm(result.networkIdentity)
|
||||||
@ -654,7 +649,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
|
|||||||
if missingBlocks.len > 0:
|
if missingBlocks.len > 0:
|
||||||
info "Requesting detected missing blocks", missingBlocks
|
info "Requesting detected missing blocks", missingBlocks
|
||||||
node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock):
|
node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock):
|
||||||
node.onBeaconBlock(b)
|
onBeaconBlock(node ,b)
|
||||||
|
|
||||||
let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
|
let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
|
||||||
addTimer(nextSecond) do (p: pointer):
|
addTimer(nextSecond) do (p: pointer):
|
||||||
@ -662,7 +657,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
|
|||||||
|
|
||||||
proc run*(node: BeaconNode) =
|
proc run*(node: BeaconNode) =
|
||||||
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
|
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
|
||||||
node.onBeaconBlock(blck)
|
onBeaconBlock(node, blck)
|
||||||
|
|
||||||
waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation):
|
waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation):
|
||||||
node.onAttestation(attestation)
|
node.onAttestation(attestation)
|
||||||
|
@ -25,6 +25,7 @@ type
|
|||||||
attestationPool*: AttestationPool
|
attestationPool*: AttestationPool
|
||||||
mainchainMonitor*: MainchainMonitor
|
mainchainMonitor*: MainchainMonitor
|
||||||
beaconClock*: BeaconClock
|
beaconClock*: BeaconClock
|
||||||
|
onBeaconBlock*: proc (node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
|
||||||
|
|
||||||
stateCache*: StateData ##\
|
stateCache*: StateData ##\
|
||||||
## State cache object that's used as a scratch pad
|
## State cache object that's used as a scratch pad
|
||||||
|
@ -175,17 +175,22 @@ else:
|
|||||||
result = waitFor daemon.identity()
|
result = waitFor daemon.identity()
|
||||||
waitFor daemon.close()
|
waitFor daemon.close()
|
||||||
|
|
||||||
|
template tcpEndPoint(address, port): auto =
|
||||||
|
MultiAddress.init(address, IPPROTO_TCP, port)
|
||||||
|
|
||||||
proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} =
|
proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} =
|
||||||
var
|
var
|
||||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
|
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
|
||||||
hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort)
|
hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort)
|
||||||
announcedAddresses = if extIp != globalListeningAddr: @[]
|
announcedAddresses = if extIp != globalListeningAddr: @[]
|
||||||
else: @[tcpEndPoint(extIp, extTcpPort)]
|
else: @[tcpEndPoint(extIp, extTcpPort)]
|
||||||
|
keyFile = conf.ensureNetworkIdFile
|
||||||
|
|
||||||
daemon = await newDaemonApi({PSGossipSub},
|
info "Starting LibP2P deamon", hostAddress, announcedAddresses, keyFile
|
||||||
id = conf.ensureNetworkIdFile,
|
let daemon = await newDaemonApi({PSGossipSub},
|
||||||
hostAddresses = @[hostAddress],
|
id = keyFile,
|
||||||
announcedAddresses = announcedAddresses)
|
hostAddresses = @[hostAddress],
|
||||||
|
announcedAddresses = announcedAddresses)
|
||||||
|
|
||||||
return await Eth2Node.init(daemon)
|
return await Eth2Node.init(daemon)
|
||||||
|
|
||||||
@ -195,7 +200,7 @@ else:
|
|||||||
result.addresses = @[tcpEndPoint(ip, port)]
|
result.addresses = @[tcpEndPoint(ip, port)]
|
||||||
|
|
||||||
proc isSameNode*(bootstrapNode: BootstrapAddr, id: Eth2NodeIdentity): bool =
|
proc isSameNode*(bootstrapNode: BootstrapAddr, id: Eth2NodeIdentity): bool =
|
||||||
bootstrapNode == id
|
bootstrapNode.peer == id.peer
|
||||||
|
|
||||||
proc shortForm*(id: Eth2NodeIdentity): string =
|
proc shortForm*(id: Eth2NodeIdentity): string =
|
||||||
# TODO: Make this shorter
|
# TODO: Make this shorter
|
||||||
|
@ -22,6 +22,7 @@ type
|
|||||||
connectionState*: ConnectionState
|
connectionState*: ConnectionState
|
||||||
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
||||||
protocolStates*: seq[RootRef]
|
protocolStates*: seq[RootRef]
|
||||||
|
maxInactivityAllowed*: Duration
|
||||||
|
|
||||||
ConnectionState* = enum
|
ConnectionState* = enum
|
||||||
None,
|
None,
|
||||||
@ -95,6 +96,7 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
|
|||||||
new result
|
new result
|
||||||
result.daemon = daemon
|
result.daemon = daemon
|
||||||
result.daemon.userData = result
|
result.daemon.userData = result
|
||||||
|
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
||||||
init result.peers
|
init result.peers
|
||||||
|
|
||||||
newSeq result.protocolStates, allProtocols.len
|
newSeq result.protocolStates, allProtocols.len
|
||||||
@ -319,7 +321,7 @@ proc implementSendProcBody(sendProc: SendProc) =
|
|||||||
else:
|
else:
|
||||||
quote: `sendBytes`(`UntypedResponder`(`peer`).stream, `bytes`)
|
quote: `sendBytes`(`UntypedResponder`(`peer`).stream, `bytes`)
|
||||||
|
|
||||||
sendProc.useStandardBody(nil, sendCallGenerator)
|
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||||
|
|
||||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
var
|
var
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import
|
import
|
||||||
tables, deques, options, algorithm, std_shims/[macros_shim, tables_shims],
|
tables, deques, options, algorithm, std_shims/[macros_shim, tables_shims],
|
||||||
ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream,
|
ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream,
|
||||||
eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi,
|
eth/async_utils, eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi,
|
||||||
ssz
|
ssz
|
||||||
|
|
||||||
export
|
export
|
||||||
@ -9,10 +9,10 @@ export
|
|||||||
|
|
||||||
const
|
const
|
||||||
# Compression nibble
|
# Compression nibble
|
||||||
NoCompression* = uint 0
|
NoCompression* = byte 0
|
||||||
|
|
||||||
# Encoding nibble
|
# Encoding nibble
|
||||||
SszEncoding* = uint 1
|
SszEncoding* = byte 1
|
||||||
|
|
||||||
beaconChainProtocol = "/eth/serenity/beacon/rpc/1"
|
beaconChainProtocol = "/eth/serenity/beacon/rpc/1"
|
||||||
|
|
||||||
@ -27,13 +27,13 @@ type
|
|||||||
Peer* = ref object
|
Peer* = ref object
|
||||||
network*: Eth2Node
|
network*: Eth2Node
|
||||||
id*: PeerID
|
id*: PeerID
|
||||||
lastSentMsgId*: uint64
|
lastReqId*: uint64
|
||||||
rpcStream*: P2PStream
|
rpcStream*: P2PStream
|
||||||
connectionState*: ConnectionState
|
connectionState*: ConnectionState
|
||||||
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
||||||
outstandingRequests*: seq[Deque[OutstandingRequest]]
|
outstandingRequests*: Table[uint64, OutstandingRequest]
|
||||||
protocolStates*: seq[RootRef]
|
protocolStates*: seq[RootRef]
|
||||||
maxInactivityAllowed: Duration
|
maxInactivityAllowed*: Duration
|
||||||
|
|
||||||
ConnectionState* = enum
|
ConnectionState* = enum
|
||||||
None,
|
None,
|
||||||
@ -69,6 +69,7 @@ type
|
|||||||
id*: uint64
|
id*: uint64
|
||||||
future*: FutureBase
|
future*: FutureBase
|
||||||
timeoutAt*: Moment
|
timeoutAt*: Moment
|
||||||
|
responseThunk*: ThunkProc
|
||||||
|
|
||||||
ProtocolConnection* = object
|
ProtocolConnection* = object
|
||||||
stream*: P2PStream
|
stream*: P2PStream
|
||||||
@ -96,13 +97,9 @@ type
|
|||||||
networkStateInitializer*: NetworkStateInitializer
|
networkStateInitializer*: NetworkStateInitializer
|
||||||
handshake*: HandshakeStep
|
handshake*: HandshakeStep
|
||||||
disconnectHandler*: DisconnectionHandler
|
disconnectHandler*: DisconnectionHandler
|
||||||
dispatcher: Dispatcher
|
|
||||||
|
|
||||||
ProtocolInfo* = ptr ProtocolInfoObj
|
ProtocolInfo* = ptr ProtocolInfoObj
|
||||||
|
|
||||||
Dispatcher* = object
|
|
||||||
messages*: seq[MessageInfo]
|
|
||||||
|
|
||||||
SpecOuterMsgHeader {.packed.} = object
|
SpecOuterMsgHeader {.packed.} = object
|
||||||
compression {.bitsize: 4.}: uint
|
compression {.bitsize: 4.}: uint
|
||||||
encoding {.bitsize: 4.}: uint
|
encoding {.bitsize: 4.}: uint
|
||||||
@ -112,6 +109,10 @@ type
|
|||||||
reqId: uint64
|
reqId: uint64
|
||||||
methodId: uint16
|
methodId: uint16
|
||||||
|
|
||||||
|
ErrorResponse {.packed.} = object
|
||||||
|
outerHeader: SpecOuterMsgHeader
|
||||||
|
innerHeader: SpecInnerMsgHeader
|
||||||
|
|
||||||
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
||||||
NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.}
|
NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.}
|
||||||
|
|
||||||
@ -121,6 +122,7 @@ type
|
|||||||
ThunkProc* = proc(peer: Peer,
|
ThunkProc* = proc(peer: Peer,
|
||||||
stream: P2PStream,
|
stream: P2PStream,
|
||||||
reqId: uint64,
|
reqId: uint64,
|
||||||
|
reqFuture: FutureBase,
|
||||||
msgData: ByteStreamVar): Future[void] {.gcsafe.}
|
msgData: ByteStreamVar): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
||||||
@ -147,6 +149,14 @@ const
|
|||||||
BreachOfProtocol* = FaultOrError
|
BreachOfProtocol* = FaultOrError
|
||||||
# TODO: We should lobby for more disconnection reasons.
|
# TODO: We should lobby for more disconnection reasons.
|
||||||
|
|
||||||
|
template isOdd(val: SomeInteger): bool =
|
||||||
|
type T = type(val)
|
||||||
|
(val and T(1)) != 0
|
||||||
|
|
||||||
|
proc init(T: type SpecOuterMsgHeader,
|
||||||
|
compression, encoding: byte, msgLen: uint64): T =
|
||||||
|
T(compression: compression, encoding: encoding, msgLen: msgLen)
|
||||||
|
|
||||||
proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} =
|
proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} =
|
||||||
await stream.transp.readExactly(addr result, sizeof result)
|
await stream.transp.readExactly(addr result, sizeof result)
|
||||||
|
|
||||||
@ -154,6 +164,10 @@ proc appendPackedObject(stream: OutputStreamVar, value: auto) =
|
|||||||
let valueAsBytes = cast[ptr byte](unsafeAddr(value))
|
let valueAsBytes = cast[ptr byte](unsafeAddr(value))
|
||||||
stream.append makeOpenArray(valueAsBytes, sizeof(value))
|
stream.append makeOpenArray(valueAsBytes, sizeof(value))
|
||||||
|
|
||||||
|
proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc =
|
||||||
|
if methodId.int >= protocol.messages.len: return nil
|
||||||
|
protocol.messages[methodId.int].thunk
|
||||||
|
|
||||||
include libp2p_backends_common
|
include libp2p_backends_common
|
||||||
include eth/p2p/p2p_backends_helpers
|
include eth/p2p/p2p_backends_helpers
|
||||||
include eth/p2p/p2p_tracing
|
include eth/p2p/p2p_tracing
|
||||||
@ -178,7 +192,8 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
|
|||||||
result.id = id
|
result.id = id
|
||||||
result.network = network
|
result.network = network
|
||||||
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
|
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
|
||||||
result.connectionState = Connected
|
result.maxInactivityAllowed = 15.minutes # TODO: read this from the config
|
||||||
|
result.connectionState = None
|
||||||
newSeq result.protocolStates, allProtocols.len
|
newSeq result.protocolStates, allProtocols.len
|
||||||
for i in 0 ..< allProtocols.len:
|
for i in 0 ..< allProtocols.len:
|
||||||
let proto = allProtocols[i]
|
let proto = allProtocols[i]
|
||||||
@ -189,52 +204,51 @@ proc init*[MsgName](T: type ResponderWithId[MsgName],
|
|||||||
peer: Peer, reqId: uint64): T =
|
peer: Peer, reqId: uint64): T =
|
||||||
T(peer: peer, reqId: reqId)
|
T(peer: peer, reqId: reqId)
|
||||||
|
|
||||||
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} =
|
||||||
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
|
||||||
for protocol in allProtocols:
|
|
||||||
if protocol.handshake != nil:
|
|
||||||
subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream))
|
|
||||||
|
|
||||||
await all(subProtocolsHandshakes)
|
|
||||||
debug "All protocols initialized", peer
|
|
||||||
|
|
||||||
proc initializeConnection*(peer: Peer) {.async.} =
|
|
||||||
let daemon = peer.network.daemon
|
|
||||||
try:
|
try:
|
||||||
peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol])
|
var unsentBytes = data.len
|
||||||
await performProtocolHandshakes(peer)
|
while true:
|
||||||
|
# TODO: this looks wrong.
|
||||||
|
# We are always trying to write the same data.
|
||||||
|
# Find all other places where such code is used.
|
||||||
|
unsentBytes -= await peer.rpcStream.transp.write(data)
|
||||||
|
if unsentBytes <= 0: return
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
await reraiseAsPeerDisconnected(peer, "Failed to perform handshake")
|
await peer.disconnect(FaultOrError)
|
||||||
|
# this is usually a "(32) Broken pipe":
|
||||||
|
# FIXME: this exception should be caught somewhere in addMsgHandler() and
|
||||||
|
# sending should be retried a few times
|
||||||
|
raise
|
||||||
|
|
||||||
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} =
|
proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] =
|
||||||
let peer = daemon.peerFromStream(stream)
|
return sendMsg(responder.peer, data)
|
||||||
peer.rpcStream = stream
|
|
||||||
await performProtocolHandshakes(peer)
|
|
||||||
|
|
||||||
proc accepts(d: Dispatcher, methodId: uint16): bool =
|
proc sendErrorResponse(peer: Peer, reqId: uint64,
|
||||||
methodId.int < d.messages.len
|
responseCode: ResponseCode): Future[void] =
|
||||||
|
var resp = ErrorResponse(
|
||||||
|
outerHeader: SpecOuterMsgHeader.init(
|
||||||
|
compression = NoCompression,
|
||||||
|
encoding = SszEncoding,
|
||||||
|
msgLen = uint64 sizeof(SpecInnerMsgHeader)),
|
||||||
|
innerHeader: SpecInnerMsgHeader(
|
||||||
|
reqId: reqId,
|
||||||
|
methodId: uint16(responseCode)))
|
||||||
|
|
||||||
proc invokeThunk(peer: Peer,
|
# TODO: don't allocate the Bytes sequence here
|
||||||
protocol: ProtocolInfo,
|
return peer.sendMsg @(makeOpenArray(cast[ptr byte](addr resp), sizeof resp))
|
||||||
stream: P2PStream,
|
|
||||||
methodId: int,
|
|
||||||
reqId: uint64,
|
|
||||||
msgContents: ByteStreamVar): Future[void] =
|
|
||||||
template raiseInvalidMsgId =
|
|
||||||
raise newException(InvalidMsgIdError,
|
|
||||||
"ETH2 message with an invalid id " & $methodId)
|
|
||||||
|
|
||||||
if methodId >= protocol.dispatcher.messages.len: raiseInvalidMsgId()
|
proc recvAndDispatchMsg*(peer: Peer): Future[PeerLoopExitReason] {.async.} =
|
||||||
var thunk = protocol.dispatcher.messages[methodId].thunk
|
|
||||||
if thunk == nil: raiseInvalidMsgId()
|
|
||||||
|
|
||||||
return thunk(peer, stream, reqId, msgContents)
|
|
||||||
|
|
||||||
proc recvAndDispatchMsg*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream):
|
|
||||||
Future[PeerLoopExitReason] {.async.} =
|
|
||||||
template fail(reason) =
|
template fail(reason) =
|
||||||
return reason
|
return reason
|
||||||
|
|
||||||
|
# For now, we won't try to handle the presence of multiple sub-protocols
|
||||||
|
# since the spec is not defining how they will be mapped to P2P streams.
|
||||||
|
doAssert allProtocols.len == 1
|
||||||
|
|
||||||
|
var
|
||||||
|
stream = peer.rpcStream
|
||||||
|
protocol = allProtocols[0]
|
||||||
|
|
||||||
var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader)
|
var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader)
|
||||||
|
|
||||||
if outerHeader.compression != NoCompression:
|
if outerHeader.compression != NoCompression:
|
||||||
@ -246,44 +260,51 @@ proc recvAndDispatchMsg*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream):
|
|||||||
if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64:
|
if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64:
|
||||||
fail ProtocolViolation
|
fail ProtocolViolation
|
||||||
|
|
||||||
var innerHeader = await stream.readPackedObject(SpecInnerMsgHeader)
|
let
|
||||||
|
innerHeader = await stream.readPackedObject(SpecInnerMsgHeader)
|
||||||
|
reqId = innerHeader.reqId
|
||||||
|
|
||||||
var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64)
|
var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64)
|
||||||
await stream.transp.readExactly(addr msgContent[0], msgContent.len)
|
await stream.transp.readExactly(addr msgContent[0], msgContent.len)
|
||||||
|
|
||||||
var msgContentStream = memoryStream(msgContent)
|
var msgContentStream = memoryStream(msgContent)
|
||||||
|
|
||||||
if protocol.dispatcher.accepts(innerHeader.methodId):
|
if reqId.isOdd:
|
||||||
try:
|
peer.outstandingRequests.withValue(reqId, req):
|
||||||
await invokeThunk(peer, protocol, stream,
|
let thunk = req.responseThunk
|
||||||
innerHeader.methodId.int,
|
let reqFuture = req.future
|
||||||
innerHeader.reqId,
|
peer.outstandingRequests.del(reqId)
|
||||||
msgContentStream)
|
|
||||||
except SerializationError:
|
|
||||||
fail ProtocolViolation
|
|
||||||
except CatchableError:
|
|
||||||
warn ""
|
|
||||||
|
|
||||||
proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} =
|
try:
|
||||||
try:
|
await thunk(peer, stream, reqId, reqFuture, msgContentStream)
|
||||||
var unsentBytes = data.len
|
except SerializationError:
|
||||||
while true:
|
debug "Error during deserialization", err = getCurrentExceptionMsg()
|
||||||
unsentBytes -= await peer.rpcStream.transp.write(data)
|
fail ProtocolViolation
|
||||||
if unsentBytes <= 0: return
|
except CatchableError:
|
||||||
except:
|
# TODO
|
||||||
await peer.disconnect(FaultOrError)
|
warn ""
|
||||||
# this is usually a "(32) Broken pipe":
|
do:
|
||||||
# FIXME: this exception should be caught somewhere in addMsgHandler() and
|
debug "Ignoring late or invalid response ID", peer, id = reqId
|
||||||
# sending should be retried a few times
|
# TODO: skip the message
|
||||||
raise
|
else:
|
||||||
|
let thunk = protocol.getThunk(innerHeader.methodId)
|
||||||
|
if thunk != nil:
|
||||||
|
try:
|
||||||
|
await thunk(peer, stream, reqId, nil, msgContentStream)
|
||||||
|
except SerializationError:
|
||||||
|
debug "Error during deserialization", err = getCurrentExceptionMsg()
|
||||||
|
fail ProtocolViolation
|
||||||
|
except CatchableError:
|
||||||
|
# TODO
|
||||||
|
warn ""
|
||||||
|
else:
|
||||||
|
debug "P2P request method not found", methodId = innerHeader.methodId
|
||||||
|
await peer.sendErrorResponse(reqId, MethodNotFound)
|
||||||
|
|
||||||
proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] =
|
proc dispatchMessages*(peer: Peer): Future[PeerLoopExitReason] {.async.} =
|
||||||
return sendMsg(responder.peer, data)
|
|
||||||
|
|
||||||
proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream):
|
|
||||||
Future[PeerLoopExitReason] {.async.} =
|
|
||||||
while true:
|
while true:
|
||||||
let dispatchedMsgFut = recvAndDispatchMsg(peer, protocol, stream)
|
let dispatchedMsgFut = recvAndDispatchMsg(peer)
|
||||||
|
doAssert peer.maxInactivityAllowed.milliseconds > 0
|
||||||
yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed)
|
yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed)
|
||||||
if not dispatchedMsgFut.finished:
|
if not dispatchedMsgFut.finished:
|
||||||
return InactivePeer
|
return InactivePeer
|
||||||
@ -295,87 +316,70 @@ proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream):
|
|||||||
if status == Success: continue
|
if status == Success: continue
|
||||||
return status
|
return status
|
||||||
|
|
||||||
proc registerRequest(peer: Peer,
|
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
||||||
protocol: ProtocolInfo,
|
peer.initProtocolStates allProtocols
|
||||||
timeout: Duration,
|
|
||||||
responseFuture: FutureBase,
|
|
||||||
responseMethodId: uint16): uint64 =
|
|
||||||
inc peer.lastSentMsgId
|
|
||||||
result = peer.lastSentMsgId
|
|
||||||
|
|
||||||
let timeoutAt = Moment.fromNow(timeout)
|
# Please note that the ordering of operations here is important!
|
||||||
let req = OutstandingRequest(id: result,
|
#
|
||||||
future: responseFuture,
|
# We must first start all handshake procedures and give them a
|
||||||
timeoutAt: timeoutAt)
|
# chance to send any initial packages they might require over
|
||||||
peer.outstandingRequests[responseMethodId.int].addLast req
|
# the network and to yield on their `nextMsg` waits.
|
||||||
|
#
|
||||||
|
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
||||||
|
for protocol in allProtocols:
|
||||||
|
if protocol.handshake != nil:
|
||||||
|
subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream))
|
||||||
|
|
||||||
let requestResolver = protocol.dispatcher.messages[responseMethodId.int].requestResolver
|
# The `dispatchMesssages` loop must be started after this.
|
||||||
proc timeoutExpired(udata: pointer) = requestResolver(nil, responseFuture)
|
# Otherwise, we risk that some of the handshake packets sent by
|
||||||
|
# the other peer may arrrive too early and be processed before
|
||||||
|
# the handshake code got a change to wait for them.
|
||||||
|
#
|
||||||
|
var messageProcessingLoop = peer.dispatchMessages()
|
||||||
|
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
|
||||||
|
if messageProcessingLoop.failed:
|
||||||
|
debug "Ending dispatchMessages loop", peer,
|
||||||
|
err = messageProcessingLoop.error.msg
|
||||||
|
else:
|
||||||
|
debug "Ending dispatchMessages", peer,
|
||||||
|
exitCode = messageProcessingLoop.read
|
||||||
|
traceAsyncErrors peer.disconnect(ClientShutdown)
|
||||||
|
|
||||||
addTimer(timeoutAt, timeoutExpired, nil)
|
# The handshake may involve multiple async steps, so we wait
|
||||||
|
# here for all of them to finish.
|
||||||
|
#
|
||||||
|
await all(subProtocolsHandshakes)
|
||||||
|
|
||||||
|
peer.connectionState = Connected
|
||||||
|
debug "Peer connection initialized", peer
|
||||||
|
|
||||||
|
proc initializeConnection*(peer: Peer) {.async.} =
|
||||||
|
let daemon = peer.network.daemon
|
||||||
|
try:
|
||||||
|
peer.connectionState = Connecting
|
||||||
|
peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol])
|
||||||
|
await performProtocolHandshakes(peer)
|
||||||
|
except CatchableError:
|
||||||
|
await reraiseAsPeerDisconnected(peer, "Failed to perform handshake")
|
||||||
|
|
||||||
|
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} =
|
||||||
|
let peer = daemon.peerFromStream(stream)
|
||||||
|
peer.rpcStream = stream
|
||||||
|
peer.connectionState = Connecting
|
||||||
|
await performProtocolHandshakes(peer)
|
||||||
|
|
||||||
proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo,
|
proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo,
|
||||||
methodId: int, msg: pointer, reqId: uint64) =
|
methodId: int, msg: pointer, reqFuture: FutureBase) =
|
||||||
logScope:
|
|
||||||
msg = protocol.dispatcher.messages[methodId].name
|
|
||||||
msgContents = protocol.dispatcher.messages[methodId].printer(msg)
|
|
||||||
receivedReqId = reqId
|
|
||||||
remotePeer = peer.id
|
|
||||||
|
|
||||||
template resolve(future) =
|
|
||||||
(protocol.dispatcher.messages[methodId].requestResolver)(msg, future)
|
|
||||||
|
|
||||||
template outstandingReqs: auto =
|
|
||||||
peer.outstandingRequests[methodId]
|
|
||||||
|
|
||||||
let msgId = (protocolIdx: protocol.index, methodId: methodId)
|
let msgId = (protocolIdx: protocol.index, methodId: methodId)
|
||||||
|
|
||||||
if peer.awaitedMessages[msgId] != nil:
|
if peer.awaitedMessages[msgId] != nil:
|
||||||
let msgInfo = protocol.dispatcher.messages[methodId]
|
let msgInfo = protocol.messages[methodId]
|
||||||
msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId])
|
msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId])
|
||||||
peer.awaitedMessages[msgId] = nil
|
peer.awaitedMessages[msgId] = nil
|
||||||
|
|
||||||
# TODO: This is not completely sound because we are still using a global
|
if reqFuture != nil and not reqFuture.finished:
|
||||||
# `reqId` sequence (the problem is that we might get a response ID that
|
protocol.messages[methodId].requestResolver(msg, reqFuture)
|
||||||
# matches a request ID for a different type of request). To make the code
|
|
||||||
# correct, we can use a separate sequence per response type, but we have
|
|
||||||
# to first verify that the other Ethereum clients are supporting this
|
|
||||||
# correctly (because then, we'll be reusing the same reqIds for different
|
|
||||||
# types of requests). Alternatively, we can assign a separate interval in
|
|
||||||
# the `reqId` space for each type of response.
|
|
||||||
if reqId > peer.lastSentMsgId:
|
|
||||||
warn "RLPx response without a matching request"
|
|
||||||
return
|
|
||||||
|
|
||||||
var idx = 0
|
|
||||||
while idx < outstandingReqs.len:
|
|
||||||
template req: auto = outstandingReqs()[idx]
|
|
||||||
|
|
||||||
if req.future.finished:
|
|
||||||
doAssert req.timeoutAt <= Moment.now()
|
|
||||||
# Here we'll remove the expired request by swapping
|
|
||||||
# it with the last one in the deque (if necessary):
|
|
||||||
if idx != outstandingReqs.len - 1:
|
|
||||||
req = outstandingReqs.popLast
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
outstandingReqs.shrink(fromLast = 1)
|
|
||||||
# This was the last item, so we don't have any
|
|
||||||
# more work to do:
|
|
||||||
return
|
|
||||||
|
|
||||||
if req.id == reqId:
|
|
||||||
resolve req.future
|
|
||||||
# Here we'll remove the found request by swapping
|
|
||||||
# it with the last one in the deque (if necessary):
|
|
||||||
if idx != outstandingReqs.len - 1:
|
|
||||||
req = outstandingReqs.popLast
|
|
||||||
else:
|
|
||||||
outstandingReqs.shrink(fromLast = 1)
|
|
||||||
return
|
|
||||||
|
|
||||||
inc idx
|
|
||||||
|
|
||||||
debug "late or duplicate reply for a network request"
|
|
||||||
|
|
||||||
proc initProtocol(name: string, version: int,
|
proc initProtocol(name: string, version: int,
|
||||||
peerInit: PeerStateInitializer,
|
peerInit: PeerStateInitializer,
|
||||||
@ -423,28 +427,72 @@ proc prepareRequest(peer: Peer,
|
|||||||
stream: OutputStreamVar,
|
stream: OutputStreamVar,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
responseFuture: FutureBase): DelayedWriteCursor =
|
responseFuture: FutureBase): DelayedWriteCursor =
|
||||||
|
assert peer != nil and
|
||||||
|
protocol != nil and
|
||||||
|
responseFuture != nil and
|
||||||
|
responseMethodId.int < protocol.messages.len
|
||||||
|
|
||||||
let reqId = registerRequest(peer, protocol, timeout,
|
doAssert timeout.milliseconds > 0
|
||||||
responseFuture, responseMethodId)
|
|
||||||
|
|
||||||
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
|
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
|
||||||
|
|
||||||
|
inc peer.lastReqId, 2
|
||||||
|
let reqId = peer.lastReqId
|
||||||
|
|
||||||
stream.appendPackedObject SpecInnerMsgHeader(
|
stream.appendPackedObject SpecInnerMsgHeader(
|
||||||
reqId: reqId,
|
reqId: reqId, methodId: requestMethodId)
|
||||||
methodId: requestMethodId)
|
|
||||||
|
template responseMsgInfo: auto =
|
||||||
|
protocol.messages[responseMethodId.int]
|
||||||
|
|
||||||
|
let
|
||||||
|
requestResolver = responseMsgInfo.requestResolver
|
||||||
|
timeoutAt = Moment.fromNow(timeout)
|
||||||
|
|
||||||
|
peer.outstandingRequests[reqId + 1] = OutstandingRequest(
|
||||||
|
id: reqId,
|
||||||
|
future: responseFuture,
|
||||||
|
timeoutAt: timeoutAt,
|
||||||
|
responseThunk: responseMsgInfo.thunk)
|
||||||
|
|
||||||
|
proc timeoutExpired(udata: pointer) =
|
||||||
|
requestResolver(nil, responseFuture)
|
||||||
|
peer.outstandingRequests.del(reqId + 1)
|
||||||
|
|
||||||
|
addTimer(timeoutAt, timeoutExpired, nil)
|
||||||
|
|
||||||
proc prepareResponse(responder: ResponderWithId,
|
proc prepareResponse(responder: ResponderWithId,
|
||||||
stream: OutputStreamVar): DelayedWriteCursor =
|
stream: OutputStreamVar): DelayedWriteCursor =
|
||||||
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
|
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
|
||||||
|
|
||||||
|
stream.appendPackedObject SpecInnerMsgHeader(
|
||||||
|
reqId: responder.reqId + 1,
|
||||||
|
methodId: uint16(Success))
|
||||||
|
|
||||||
|
proc prepareMsg(peer: Peer, methodId: uint16,
|
||||||
|
stream: OutputStreamVar): DelayedWriteCursor =
|
||||||
|
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
|
||||||
|
|
||||||
|
inc peer.lastReqId, 2
|
||||||
|
stream.appendPackedObject SpecInnerMsgHeader(
|
||||||
|
reqId: peer.lastReqId, methodId: methodId)
|
||||||
|
|
||||||
|
proc finishOuterHeader(headerCursor: DelayedWriteCursor) =
|
||||||
|
var outerHeader = SpecOuterMsgHeader.init(
|
||||||
|
compression = NoCompression,
|
||||||
|
encoding = SszEncoding,
|
||||||
|
msgLen = uint64(headerCursor.totalBytesWrittenAfterCursor))
|
||||||
|
|
||||||
|
headerCursor.endWrite makeOpenArray(cast[ptr byte](addr outerHeader),
|
||||||
|
sizeof outerHeader)
|
||||||
|
|
||||||
proc implementSendProcBody(sendProc: SendProc) =
|
proc implementSendProcBody(sendProc: SendProc) =
|
||||||
let
|
let
|
||||||
msg = sendProc.msg
|
msg = sendProc.msg
|
||||||
delayedWriteCursor = ident "delayedWriteCursor"
|
delayedWriteCursor = ident "delayedWriteCursor"
|
||||||
peer = sendProc.peerParam
|
peer = sendProc.peerParam
|
||||||
|
|
||||||
proc preludeGenerator(stream: NimNode): NimNode =
|
proc preSerializationStep(stream: NimNode): NimNode =
|
||||||
result = newStmtList()
|
|
||||||
case msg.kind
|
case msg.kind
|
||||||
of msgRequest:
|
of msgRequest:
|
||||||
let
|
let
|
||||||
@ -453,15 +501,22 @@ proc implementSendProcBody(sendProc: SendProc) =
|
|||||||
protocol = sendProc.msg.protocol.protocolInfoVar
|
protocol = sendProc.msg.protocol.protocolInfoVar
|
||||||
timeout = sendProc.timeoutParam
|
timeout = sendProc.timeoutParam
|
||||||
|
|
||||||
result.add quote do:
|
quote do:
|
||||||
let `delayedWriteCursor` = `prepareRequest`(
|
var `delayedWriteCursor` = prepareRequest(
|
||||||
`peer`, `protocol`, `requestMethodId`, `responseMethodId`,
|
`peer`, `protocol`, `requestMethodId`, `responseMethodId`,
|
||||||
`stream`, `timeout`, `resultIdent`)
|
`stream`, `timeout`, `resultIdent`)
|
||||||
|
|
||||||
of msgResponse:
|
of msgResponse:
|
||||||
result.add quote do:
|
quote do:
|
||||||
let `delayedWriteCursor` = `prepareResponse`(`peer`, `stream`)
|
var `delayedWriteCursor` = prepareResponse(`peer`, `stream`)
|
||||||
else:
|
|
||||||
discard
|
of msgHandshake, msgNotification:
|
||||||
|
let methodId = newLit(msg.id)
|
||||||
|
quote do:
|
||||||
|
var `delayedWriteCursor` = prepareMsg(`peer`, `methodId`, `stream`)
|
||||||
|
|
||||||
|
proc postSerializationStep(stream: NimNode): NimNode =
|
||||||
|
newCall(bindSym "finishOuterHeader", delayedWriteCursor)
|
||||||
|
|
||||||
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
||||||
let
|
let
|
||||||
@ -471,7 +526,7 @@ proc implementSendProcBody(sendProc: SendProc) =
|
|||||||
|
|
||||||
if msg.kind == msgRequest:
|
if msg.kind == msgRequest:
|
||||||
# In RLPx requests, the returned future was allocated here and passed
|
# In RLPx requests, the returned future was allocated here and passed
|
||||||
# to `registerRequest`. It's already assigned to the result variable
|
# to `prepareRequest`. It's already assigned to the result variable
|
||||||
# of the proc, so we just wait for the sending operation to complete
|
# of the proc, so we just wait for the sending operation to complete
|
||||||
# and we return in a normal way. (the waiting is done, so we can catch
|
# and we return in a normal way. (the waiting is done, so we can catch
|
||||||
# any possible errors).
|
# any possible errors).
|
||||||
@ -481,7 +536,10 @@ proc implementSendProcBody(sendProc: SendProc) =
|
|||||||
# `sendMsg` call.
|
# `sendMsg` call.
|
||||||
quote: return `sendCall`
|
quote: return `sendCall`
|
||||||
|
|
||||||
sendProc.useStandardBody(preludeGenerator, sendCallGenerator)
|
sendProc.useStandardBody(
|
||||||
|
preSerializationStep,
|
||||||
|
postSerializationStep,
|
||||||
|
sendCallGenerator)
|
||||||
|
|
||||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
let
|
let
|
||||||
@ -492,7 +550,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||||||
Format = ident "SSZ"
|
Format = ident "SSZ"
|
||||||
Response = bindSym "Response"
|
Response = bindSym "Response"
|
||||||
ResponderWithId = bindSym "ResponderWithId"
|
ResponderWithId = bindSym "ResponderWithId"
|
||||||
perProtocolMsgId = ident"perProtocolMsgId"
|
perProtocolMsgId = ident "perProtocolMsgId"
|
||||||
|
|
||||||
mount = bindSym "mount"
|
mount = bindSym "mount"
|
||||||
|
|
||||||
@ -508,6 +566,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||||||
stream = ident "stream"
|
stream = ident "stream"
|
||||||
protocol = ident "protocol"
|
protocol = ident "protocol"
|
||||||
response = ident "response"
|
response = ident "response"
|
||||||
|
reqFutureVar = ident "reqFuture"
|
||||||
msgContents = ident "msgContents"
|
msgContents = ident "msgContents"
|
||||||
receivedMsg = ident "receivedMsg"
|
receivedMsg = ident "receivedMsg"
|
||||||
|
|
||||||
@ -546,12 +605,12 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||||||
else:
|
else:
|
||||||
newStmtList()
|
newStmtList()
|
||||||
|
|
||||||
let callResolvePendingFutures = if msg.kind == msgResponse:
|
let callResolvePendingFutures = newCall(
|
||||||
newCall(resolvePendingFutures,
|
resolvePendingFutures, peerVar,
|
||||||
peerVar, protocol.protocolInfoVar,
|
protocol.protocolInfoVar,
|
||||||
msgIdLit, newCall("addr", receivedMsg), reqIdVar)
|
msgIdLit,
|
||||||
else:
|
newCall("addr", receivedMsg),
|
||||||
newStmtList()
|
reqFutureVar)
|
||||||
|
|
||||||
var userHandlerParams = @[peerVar]
|
var userHandlerParams = @[peerVar]
|
||||||
if msg.kind == msgRequest:
|
if msg.kind == msgRequest:
|
||||||
@ -565,6 +624,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||||||
proc `thunkName`(`peerVar`: `Peer`,
|
proc `thunkName`(`peerVar`: `Peer`,
|
||||||
`stream`: `P2PStream`,
|
`stream`: `P2PStream`,
|
||||||
`reqIdVar`: uint64,
|
`reqIdVar`: uint64,
|
||||||
|
`reqFutureVar`: FutureBase,
|
||||||
`msgContents`: `ByteStreamVar`) {.async, gcsafe.} =
|
`msgContents`: `ByteStreamVar`) {.async, gcsafe.} =
|
||||||
var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`)
|
var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`)
|
||||||
`traceMsg`
|
`traceMsg`
|
||||||
|
@ -4,10 +4,6 @@ import
|
|||||||
spec/[datatypes, crypto, digest, helpers], eth/rlp,
|
spec/[datatypes, crypto, digest, helpers], eth/rlp,
|
||||||
beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz
|
beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz
|
||||||
|
|
||||||
from beacon_node import onBeaconBlock
|
|
||||||
# Careful handling of beacon_node <-> sync_protocol
|
|
||||||
# to avoid recursive dependencies
|
|
||||||
|
|
||||||
type
|
type
|
||||||
ValidatorChangeLogEntry* = object
|
ValidatorChangeLogEntry* = object
|
||||||
case kind*: ValidatorSetDeltaFlags
|
case kind*: ValidatorSetDeltaFlags
|
||||||
@ -49,7 +45,7 @@ proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlo
|
|||||||
proc importBlocks(node: BeaconNode,
|
proc importBlocks(node: BeaconNode,
|
||||||
blocks: openarray[BeaconBlock]) =
|
blocks: openarray[BeaconBlock]) =
|
||||||
for blk in blocks:
|
for blk in blocks:
|
||||||
node.onBeaconBlock(blk)
|
node.onBeaconBlock(node, blk)
|
||||||
info "Forward sync imported blocks", len = blocks.len
|
info "Forward sync imported blocks", len = blocks.len
|
||||||
|
|
||||||
proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] =
|
proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] =
|
||||||
@ -105,7 +101,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||||||
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
|
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
|
||||||
if bestDiff >= 0:
|
if bestDiff >= 0:
|
||||||
# Nothing to do?
|
# Nothing to do?
|
||||||
trace "Nothing to sync", peer = peer.remote
|
debug "Nothing to sync", peer
|
||||||
else:
|
else:
|
||||||
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
|
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
|
||||||
# connection if it's too big.
|
# connection if it's too big.
|
||||||
@ -124,7 +120,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||||||
if lastSlot <= s:
|
if lastSlot <= s:
|
||||||
info "Slot did not advance during sync", peer
|
info "Slot did not advance during sync", peer
|
||||||
break
|
break
|
||||||
|
|
||||||
s = lastSlot + 1
|
s = lastSlot + 1
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
Loading…
x
Reference in New Issue
Block a user