A LibP2P-based networking stack;
To enable it, comment out the 'withLibp2p' line in nim.cfg The history was squashed in order to remove an accidentally commited binary file. Other changes: * SSZ was adapted to use the common serialization framework * gossibsup.subscribe is not using async handlers at the moment and this allowed me to simplify it
This commit is contained in:
parent
434ba5727f
commit
903cb8a8b5
|
@ -1,5 +1,8 @@
|
||||||
|
import
|
||||||
|
beacon_chain/version
|
||||||
|
|
||||||
packageName = "beacon_chain"
|
packageName = "beacon_chain"
|
||||||
version = "0.0.1"
|
version = versionAsStr
|
||||||
author = "Status Research & Development GmbH"
|
author = "Status Research & Development GmbH"
|
||||||
description = "Eth2.0 research implementation of the beacon chain"
|
description = "Eth2.0 research implementation of the beacon chain"
|
||||||
license = "MIT or Apache License 2.0"
|
license = "MIT or Apache License 2.0"
|
||||||
|
@ -23,7 +26,8 @@ requires "nim >= 0.19.0",
|
||||||
"json_serialization",
|
"json_serialization",
|
||||||
"json_rpc",
|
"json_rpc",
|
||||||
"chronos",
|
"chronos",
|
||||||
"yaml"
|
"yaml",
|
||||||
|
"libp2p"
|
||||||
|
|
||||||
### Helper functions
|
### Helper functions
|
||||||
proc test(name: string, defaultLang = "c") =
|
proc test(name: string, defaultLang = "c") =
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import
|
import
|
||||||
os, json, tables, options,
|
os, json, tables, options,
|
||||||
chronicles, json_serialization, eth/common/eth_types_json_serialization,
|
chronicles, serialization, json_serialization, eth/common/eth_types_json_serialization,
|
||||||
spec/[datatypes, digest, crypto],
|
spec/[datatypes, digest, crypto],
|
||||||
eth/trie/db, ssz
|
eth/trie/db, ssz
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB =
|
||||||
result.backend = backend
|
result.backend = backend
|
||||||
|
|
||||||
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
|
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
|
||||||
db.backend.put(subkey(type value, key), ssz.serialize(value))
|
db.backend.put(subkey(type value, key), SSZ.encode(value))
|
||||||
|
|
||||||
proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
|
proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
|
||||||
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?
|
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?
|
||||||
|
@ -55,7 +55,7 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
|
||||||
# the previous finalized state, if we have that stored. To consider
|
# the previous finalized state, if we have that stored. To consider
|
||||||
# here is that the gap between finalized and present state might be
|
# here is that the gap between finalized and present state might be
|
||||||
# significant (days), meaning replay might be expensive.
|
# significant (days), meaning replay might be expensive.
|
||||||
db.backend.put(subkey(type value, key), ssz.serialize(value))
|
db.backend.put(subkey(type value, key), SSZ.encode(value))
|
||||||
|
|
||||||
proc putState*(db: BeaconChainDB, value: BeaconState) =
|
proc putState*(db: BeaconChainDB, value: BeaconState) =
|
||||||
db.putState(hash_tree_root_final(value), value)
|
db.putState(hash_tree_root_final(value), value)
|
||||||
|
@ -72,7 +72,10 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
||||||
proc get(db: BeaconChainDB, key: auto, T: typedesc): Option[T] =
|
proc get(db: BeaconChainDB, key: auto, T: typedesc): Option[T] =
|
||||||
let res = db.backend.get(key)
|
let res = db.backend.get(key)
|
||||||
if res.len != 0:
|
if res.len != 0:
|
||||||
ssz.deserialize(res, T)
|
try:
|
||||||
|
some(SSZ.decode(res, T))
|
||||||
|
except SerializationError:
|
||||||
|
none(T)
|
||||||
else:
|
else:
|
||||||
none(T)
|
none(T)
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,17 @@
|
||||||
import
|
import
|
||||||
std_shims/[os_shims, objects], net, sequtils, options, tables,
|
std_shims/[os_shims, objects], net, sequtils, options, tables,
|
||||||
chronos, chronicles, confutils, eth/[p2p, keys],
|
chronos, chronicles, confutils,
|
||||||
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
|
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
|
||||||
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
|
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
|
||||||
attestation_pool, block_pool,
|
attestation_pool, block_pool, eth2_network,
|
||||||
mainchain_monitor, gossipsub_protocol, trusted_state_snapshots,
|
mainchain_monitor, trusted_state_snapshots,
|
||||||
eth/trie/db, eth/trie/backends/rocksdb_backend
|
eth/trie/db, eth/trie/backends/rocksdb_backend
|
||||||
|
|
||||||
type
|
type
|
||||||
BeaconNode* = ref object
|
BeaconNode* = ref object
|
||||||
network*: EthereumNode
|
network*: Eth2Node
|
||||||
db*: BeaconChainDB
|
db*: BeaconChainDB
|
||||||
config*: BeaconNodeConf
|
config*: BeaconNodeConf
|
||||||
keys*: KeyPair
|
|
||||||
attachedValidators: ValidatorPool
|
attachedValidators: ValidatorPool
|
||||||
blockPool*: BlockPool
|
blockPool*: BlockPool
|
||||||
state*: StateData
|
state*: StateData
|
||||||
|
@ -21,33 +20,21 @@ type
|
||||||
potentialHeads: seq[Eth2Digest]
|
potentialHeads: seq[Eth2Digest]
|
||||||
|
|
||||||
const
|
const
|
||||||
version = "v0.1" # TODO: read this from the nimble file
|
|
||||||
clientId = "nimbus beacon node " & version
|
|
||||||
|
|
||||||
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
|
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
|
||||||
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
|
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
|
||||||
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"
|
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"
|
||||||
|
|
||||||
|
|
||||||
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
|
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
|
||||||
|
|
||||||
import sync_protocol
|
import sync_protocol
|
||||||
|
|
||||||
|
|
||||||
func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
|
func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
|
||||||
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]
|
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]
|
||||||
|
|
||||||
func slotStart(node: BeaconNode, slot: Slot): Timestamp =
|
func slotStart(node: BeaconNode, slot: Slot): Timestamp =
|
||||||
node.state.data.slotStart(slot)
|
node.state.data.slotStart(slot)
|
||||||
|
|
||||||
proc ensureNetworkKeys*(dataDir: string): KeyPair =
|
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
|
||||||
# TODO:
|
|
||||||
# 1. Check if keys already exist in the data dir
|
|
||||||
# 2. Generate new ones and save them in the directory
|
|
||||||
# if necessary
|
|
||||||
return newKeyPair()
|
|
||||||
|
|
||||||
proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =
|
|
||||||
new result
|
new result
|
||||||
result.config = conf
|
result.config = conf
|
||||||
|
|
||||||
|
@ -83,43 +70,36 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =
|
||||||
result.blockPool = BlockPool.init(result.db)
|
result.blockPool = BlockPool.init(result.db)
|
||||||
result.attestationPool = AttestationPool.init(result.blockPool)
|
result.attestationPool = AttestationPool.init(result.blockPool)
|
||||||
|
|
||||||
result.keys = ensureNetworkKeys(string conf.dataDir)
|
result.network = await createEth2Node(Port conf.tcpPort, Port conf.udpPort)
|
||||||
|
|
||||||
var address: Address
|
|
||||||
address.ip = parseIpAddress("127.0.0.1")
|
|
||||||
address.tcpPort = Port(conf.tcpPort)
|
|
||||||
address.udpPort = Port(conf.udpPort)
|
|
||||||
|
|
||||||
result.network =
|
|
||||||
newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1)
|
|
||||||
let state = result.network.protocolState(BeaconSync)
|
let state = result.network.protocolState(BeaconSync)
|
||||||
state.node = result
|
state.node = result
|
||||||
state.db = result.db
|
state.db = result.db
|
||||||
|
|
||||||
let
|
let head = result.blockPool.get(result.db.getHeadBlock().get())
|
||||||
head = result.blockPool.get(result.db.getHeadBlock().get())
|
|
||||||
|
|
||||||
result.state = result.blockPool.loadTailState()
|
result.state = result.blockPool.loadTailState()
|
||||||
result.blockPool.updateState(result.state, head.get().refs)
|
result.blockPool.updateState(result.state, head.get().refs)
|
||||||
|
|
||||||
writeFile(string(conf.dataDir) / "beacon_node.address",
|
let addressFile = string(conf.dataDir) / "beacon_node.address"
|
||||||
$result.network.listeningAddress)
|
result.network.saveConnectionAddressFile(addressFile)
|
||||||
|
|
||||||
proc connectToNetwork(node: BeaconNode) {.async.} =
|
proc connectToNetwork(node: BeaconNode) {.async.} =
|
||||||
var bootstrapNodes = newSeq[ENode]()
|
var bootstrapNodes = newSeq[BootstrapAddr]()
|
||||||
|
|
||||||
for node in node.config.bootstrapNodes:
|
for node in node.config.bootstrapNodes:
|
||||||
bootstrapNodes.add initENode(node)
|
bootstrapNodes.add BootstrapAddr.init(node)
|
||||||
|
|
||||||
let bootstrapFile = string node.config.bootstrapNodesFile
|
let bootstrapFile = string node.config.bootstrapNodesFile
|
||||||
if bootstrapFile.len > 0:
|
if bootstrapFile.len > 0:
|
||||||
for ln in lines(bootstrapFile):
|
for ln in lines(bootstrapFile):
|
||||||
bootstrapNodes.add initENode(string ln)
|
bootstrapNodes.add BootstrapAddr.init(string ln)
|
||||||
|
|
||||||
if bootstrapNodes.len > 0:
|
if bootstrapNodes.len > 0:
|
||||||
info "Connecting to bootstrap nodes", bootstrapNodes
|
info "Connecting to bootstrap nodes", bootstrapNodes
|
||||||
else:
|
else:
|
||||||
info "Waiting for connections"
|
info "Waiting for connections"
|
||||||
|
|
||||||
await node.network.connectToNetwork(bootstrapNodes)
|
await node.network.connectToNetwork(bootstrapNodes)
|
||||||
|
|
||||||
proc sync*(node: BeaconNode): Future[bool] {.async.} =
|
proc sync*(node: BeaconNode): Future[bool] {.async.} =
|
||||||
|
@ -598,13 +578,13 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
|
||||||
discard # node.onAttestation(attestation)
|
discard # node.onAttestation(attestation)
|
||||||
|
|
||||||
proc run*(node: BeaconNode) =
|
proc run*(node: BeaconNode) =
|
||||||
node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
|
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
|
||||||
node.onBeaconBlock(blck)
|
node.onBeaconBlock(blck)
|
||||||
|
|
||||||
node.network.subscribe(topicAttestations) do (attestation: Attestation):
|
waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation):
|
||||||
node.onAttestation(attestation)
|
node.onAttestation(attestation)
|
||||||
|
|
||||||
node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
|
waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
|
||||||
node.onFetchBlocks(roots)
|
node.onFetchBlocks(roots)
|
||||||
|
|
||||||
let nowSlot = node.state.data.getSlotFromTime()
|
let nowSlot = node.state.data.getSlotFromTime()
|
||||||
|
@ -637,7 +617,7 @@ when isMainModule:
|
||||||
waitFor synchronizeClock()
|
waitFor synchronizeClock()
|
||||||
createPidFile(config.dataDir.string / "beacon_node.pid")
|
createPidFile(config.dataDir.string / "beacon_node.pid")
|
||||||
|
|
||||||
var node = BeaconNode.init config
|
var node = waitFor BeaconNode.init(config)
|
||||||
|
|
||||||
dynamicLogScope(node = node.config.tcpPort - 50000):
|
dynamicLogScope(node = node.config.tcpPort - 50000):
|
||||||
# TODO: while it's nice to cheat by waiting for connections here, we
|
# TODO: while it's nice to cheat by waiting for connections here, we
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
const
|
||||||
|
useDEVP2P = not defined(withLibP2P)
|
||||||
|
|
||||||
|
import
|
||||||
|
options, chronos, version
|
||||||
|
|
||||||
|
const
|
||||||
|
clientId = "Nimbus beacon node v" & versionAsStr
|
||||||
|
|
||||||
|
when useDEVP2P:
|
||||||
|
import
|
||||||
|
eth/[rlp, p2p, keys], gossipsub_protocol
|
||||||
|
|
||||||
|
export
|
||||||
|
p2p, rlp, gossipsub_protocol
|
||||||
|
|
||||||
|
type
|
||||||
|
Eth2Node* = EthereumNode
|
||||||
|
BootstrapAddr* = ENode
|
||||||
|
|
||||||
|
template libp2pProtocol*(name, version: string) {.pragma.}
|
||||||
|
|
||||||
|
proc createEth2Node*(tcpPort, udpPort: Port): Future[Eth2Node] {.async.} =
|
||||||
|
let
|
||||||
|
keys = newKeyPair()
|
||||||
|
address = Address(ip: parseIpAddress("127.0.0.1"),
|
||||||
|
tcpPort: tcpPort,
|
||||||
|
udpPort: udpPort)
|
||||||
|
|
||||||
|
newEthereumNode(keys, address, 0,
|
||||||
|
nil, clientId, minPeers = 1)
|
||||||
|
|
||||||
|
proc saveConnectionAddressFile*(node: Eth2Node, filename: string) =
|
||||||
|
writeFile(filename, $node.listeningAddress)
|
||||||
|
|
||||||
|
proc init*(T: type BootstrapAddr, str: string): T =
|
||||||
|
initENode(str)
|
||||||
|
|
||||||
|
else:
|
||||||
|
import
|
||||||
|
libp2p/daemon/daemonapi, json_serialization, chronicles,
|
||||||
|
libp2p_backend
|
||||||
|
|
||||||
|
export
|
||||||
|
libp2p_backend
|
||||||
|
|
||||||
|
type
|
||||||
|
BootstrapAddr* = PeerInfo
|
||||||
|
|
||||||
|
proc writeValue*(writer: var JsonWriter, value: PeerID) {.inline.} =
|
||||||
|
writer.writeValue value.pretty
|
||||||
|
|
||||||
|
proc readValue*(reader: var JsonReader, value: var PeerID) {.inline.} =
|
||||||
|
value = PeerID.init reader.readValue(string)
|
||||||
|
|
||||||
|
proc writeValue*(writer: var JsonWriter, value: MultiAddress) {.inline.} =
|
||||||
|
writer.writeValue $value
|
||||||
|
|
||||||
|
proc readValue*(reader: var JsonReader, value: var MultiAddress) {.inline.} =
|
||||||
|
value = MultiAddress.init reader.readValue(string)
|
||||||
|
|
||||||
|
proc init*(T: type BootstrapAddr, str: string): T =
|
||||||
|
Json.decode(str, PeerInfo)
|
||||||
|
|
||||||
|
proc createEth2Node*(tcpPort, udpPort: Port): Future[Eth2Node] {.async.} =
|
||||||
|
var node = new Eth2Node
|
||||||
|
await node.init()
|
||||||
|
return node
|
||||||
|
|
||||||
|
proc connectToNetwork*(node: Eth2Node, bootstrapNodes: seq[PeerInfo]) {.async.} =
|
||||||
|
# TODO: perhaps we should do these in parallel
|
||||||
|
for bootstrapNode in bootstrapNodes:
|
||||||
|
try:
|
||||||
|
await node.daemon.connect(bootstrapNode.peer, bootstrapNode.addresses)
|
||||||
|
await node.getPeer(bootstrapNode.peer).performProtocolHandshakes()
|
||||||
|
except PeerDisconnected:
|
||||||
|
error "Failed to connect to bootstrap node", node = bootstrapNode
|
||||||
|
|
||||||
|
proc saveConnectionAddressFile*(node: Eth2Node, filename: string) =
|
||||||
|
let id = waitFor node.daemon.identity()
|
||||||
|
Json.saveFile(filename, id, pretty = false)
|
||||||
|
|
||||||
|
proc loadConnectionAddressFile*(filename: string): PeerInfo =
|
||||||
|
Json.loadFile(filename, PeerInfo)
|
||||||
|
|
|
@ -5,7 +5,7 @@ import
|
||||||
spec/[datatypes, crypto]
|
spec/[datatypes, crypto]
|
||||||
|
|
||||||
type
|
type
|
||||||
TopicMsgHandler = proc(msg: string): Future[void]
|
TopicMsgHandler = proc (msg: string)
|
||||||
|
|
||||||
GossipSubPeer* = ref object
|
GossipSubPeer* = ref object
|
||||||
sentMessages: HashSet[string]
|
sentMessages: HashSet[string]
|
||||||
|
@ -58,16 +58,7 @@ p2pProtocol GossipSub(version = 1,
|
||||||
{.gcsafe.}:
|
{.gcsafe.}:
|
||||||
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
|
let handler = peer.networkState.topicSubscribers.getOrDefault(topic)
|
||||||
if handler != nil:
|
if handler != nil:
|
||||||
await handler(msg)
|
handler(msg)
|
||||||
|
|
||||||
proc subscribeImpl(node: EthereumNode,
|
|
||||||
topic: string,
|
|
||||||
subscriber: TopicMsgHandler) =
|
|
||||||
var gossipNet = node.protocolState(GossipSub)
|
|
||||||
gossipNet.topicSubscribers[topic] = subscriber
|
|
||||||
|
|
||||||
for peer in node.peers(GossipSub):
|
|
||||||
discard peer.subscribeFor(topic)
|
|
||||||
|
|
||||||
proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} =
|
proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[void]] {.gcsafe.} =
|
||||||
var randBytes: array[10, byte];
|
var randBytes: array[10, byte];
|
||||||
|
@ -81,14 +72,15 @@ proc broadcastImpl(node: EthereumNode, topic: string, msg: string): seq[Future[v
|
||||||
if topic in peer.state(GossipSub).subscribedFor:
|
if topic in peer.state(GossipSub).subscribedFor:
|
||||||
result.add peer.emit(topic, msgId, msg)
|
result.add peer.emit(topic, msgId, msg)
|
||||||
|
|
||||||
proc makeMessageHandler[MsgType](userHandler: proc(msg: MsgType): Future[void]): TopicMsgHandler =
|
proc subscribe*[MsgType](node: EthereumNode,
|
||||||
result = proc (msg: string): Future[void] =
|
topic: string,
|
||||||
|
userHandler: proc(msg: MsgType)) {.async.}=
|
||||||
|
var gossipNet = node.protocolState(GossipSub)
|
||||||
|
gossipNet.topicSubscribers[topic] = proc (msg: string) =
|
||||||
userHandler Json.decode(msg, MsgType)
|
userHandler Json.decode(msg, MsgType)
|
||||||
|
|
||||||
macro subscribe*(node: EthereumNode, topic: string, handler: untyped): untyped =
|
for peer in node.peers(GossipSub):
|
||||||
handler.addPragma ident"async"
|
discard peer.subscribeFor(topic)
|
||||||
result = newCall(bindSym"subscribeImpl",
|
|
||||||
node, topic, newCall(bindSym"makeMessageHandler", handler))
|
|
||||||
|
|
||||||
proc broadcast*(node: EthereumNode, topic: string, msg: auto) {.async.} =
|
proc broadcast*(node: EthereumNode, topic: string, msg: auto) {.async.} =
|
||||||
await all(node.broadcastImpl(topic, Json.encode(msg)))
|
await all(node.broadcastImpl(topic, Json.encode(msg)))
|
||||||
|
|
|
@ -0,0 +1,768 @@
|
||||||
|
import
|
||||||
|
options, macros, algorithm,
|
||||||
|
std_shims/[macros_shim, tables_shims], chronos, chronicles,
|
||||||
|
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
|
||||||
|
ssz
|
||||||
|
|
||||||
|
export
|
||||||
|
daemonapi
|
||||||
|
|
||||||
|
type
|
||||||
|
Eth2Node* = ref object of RootObj
|
||||||
|
daemon*: DaemonAPI
|
||||||
|
peers*: Table[PeerID, Peer]
|
||||||
|
protocolStates*: seq[RootRef]
|
||||||
|
|
||||||
|
Peer* = ref object
|
||||||
|
network: Eth2Node
|
||||||
|
id: PeerID
|
||||||
|
connectionState: ConnectionState
|
||||||
|
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
||||||
|
protocolStates*: seq[RootRef]
|
||||||
|
|
||||||
|
EthereumNode = Eth2Node # This alias is needed for state_helpers below
|
||||||
|
|
||||||
|
ProtocolInfoObj* = object
|
||||||
|
name*: string
|
||||||
|
messages*: seq[MessageInfo]
|
||||||
|
index*: int # the position of the protocol in the
|
||||||
|
# ordered list of supported protocols
|
||||||
|
|
||||||
|
# Private fields:
|
||||||
|
peerStateInitializer*: PeerStateInitializer
|
||||||
|
networkStateInitializer*: NetworkStateInitializer
|
||||||
|
handshake*: HandshakeStep
|
||||||
|
disconnectHandler*: DisconnectionHandler
|
||||||
|
|
||||||
|
ProtocolInfo* = ptr ProtocolInfoObj
|
||||||
|
|
||||||
|
MessageInfo* = object
|
||||||
|
name*: string
|
||||||
|
|
||||||
|
# Private fields:
|
||||||
|
thunk*: MessageHandler
|
||||||
|
libp2pProtocol: string
|
||||||
|
printer*: MessageContentPrinter
|
||||||
|
nextMsgResolver*: NextMsgResolver
|
||||||
|
|
||||||
|
CompressedMsgId = tuple
|
||||||
|
protocolIndex, msgId: int
|
||||||
|
|
||||||
|
MessageKind* = enum
|
||||||
|
msgNotification,
|
||||||
|
msgRequest,
|
||||||
|
msgResponse
|
||||||
|
|
||||||
|
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
||||||
|
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
|
||||||
|
HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.}
|
||||||
|
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
|
||||||
|
MessageHandler* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.}
|
||||||
|
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
||||||
|
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
|
||||||
|
|
||||||
|
ConnectionState* = enum
|
||||||
|
None,
|
||||||
|
Connecting,
|
||||||
|
Connected,
|
||||||
|
Disconnecting,
|
||||||
|
Disconnected
|
||||||
|
|
||||||
|
UntypedResponse = object
|
||||||
|
peer*: Peer
|
||||||
|
stream*: P2PStream
|
||||||
|
|
||||||
|
Response*[MsgType] = distinct UntypedResponse
|
||||||
|
|
||||||
|
Bytes = seq[byte]
|
||||||
|
|
||||||
|
DisconnectionReason* = enum
|
||||||
|
BreachOfProtocol
|
||||||
|
|
||||||
|
PeerDisconnected* = object of CatchableError
|
||||||
|
reason*: DisconnectionReason
|
||||||
|
|
||||||
|
const
|
||||||
|
defaultIncomingReqTimeout = 5000
|
||||||
|
defaultOutgoingReqTimeout = 10000
|
||||||
|
|
||||||
|
var
|
||||||
|
gProtocols: seq[ProtocolInfo]
|
||||||
|
|
||||||
|
# The variables above are immutable RTTI information. We need to tell
|
||||||
|
# Nim to not consider them GcSafe violations:
|
||||||
|
template allProtocols: auto = {.gcsafe.}: gProtocols
|
||||||
|
|
||||||
|
proc disconnect*(peer: Peer) {.async.} =
|
||||||
|
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
|
peer.connectionState = Disconnecting
|
||||||
|
await peer.network.daemon.disconnect(peer.id)
|
||||||
|
peer.connectionState = Disconnected
|
||||||
|
peer.network.peers.del(peer.id)
|
||||||
|
|
||||||
|
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
||||||
|
var e = newException(PeerDisconnected, msg)
|
||||||
|
e.reason = r
|
||||||
|
raise e
|
||||||
|
|
||||||
|
proc disconnectAndRaise(peer: Peer,
|
||||||
|
reason: DisconnectionReason,
|
||||||
|
msg: string) {.async.} =
|
||||||
|
let r = reason
|
||||||
|
await peer.disconnect()
|
||||||
|
raisePeerDisconnected(msg, reason)
|
||||||
|
|
||||||
|
proc init*(node: Eth2Node) {.async.} =
|
||||||
|
node.daemon = await newDaemonApi({PSGossipSub})
|
||||||
|
node.daemon.userData = node
|
||||||
|
init node.peers
|
||||||
|
|
||||||
|
newSeq node.protocolStates, allProtocols.len
|
||||||
|
for proto in allProtocols:
|
||||||
|
if proto.networkStateInitializer != nil:
|
||||||
|
node.protocolStates[proto.index] = proto.networkStateInitializer(node)
|
||||||
|
|
||||||
|
for msg in proto.messages:
|
||||||
|
if msg.libp2pProtocol.len > 0:
|
||||||
|
await node.daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
|
||||||
|
|
||||||
|
include eth/p2p/p2p_backends_helpers
|
||||||
|
include eth/p2p/p2p_tracing
|
||||||
|
|
||||||
|
import typetraits
|
||||||
|
|
||||||
|
proc readMsg(stream: P2PStream, MsgType: type,
|
||||||
|
timeout = 10000): Future[Option[MsgType]] {.async.} =
|
||||||
|
var timeout = sleepAsync timeout
|
||||||
|
var sizePrefix: uint32
|
||||||
|
var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix))
|
||||||
|
await readSizePrefix or timeout
|
||||||
|
if not readSizePrefix.finished: return
|
||||||
|
|
||||||
|
debug "EXPECTING MSG", msg = MsgType.name, size = sizePrefix.int
|
||||||
|
|
||||||
|
var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix))
|
||||||
|
copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix))
|
||||||
|
var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int)
|
||||||
|
await readBody or timeout
|
||||||
|
if not readBody.finished: return
|
||||||
|
|
||||||
|
let decoded = SSZ.decode(msgBytes, MsgType)
|
||||||
|
try:
|
||||||
|
return some(decoded)
|
||||||
|
except SerializationError:
|
||||||
|
return
|
||||||
|
|
||||||
|
proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
||||||
|
var stream = await peer.network.daemon.openStream(peer.id, @[protocolId])
|
||||||
|
# TODO how does openStream fail? Set a timeout here and handle it
|
||||||
|
let sent = await stream.transp.write(requestBytes)
|
||||||
|
# TODO: Should I check that `sent` is equal to the desired number of bytes
|
||||||
|
|
||||||
|
proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} =
|
||||||
|
let sent = await stream.transp.write(bytes)
|
||||||
|
# TODO: Should I check that `sent` is equal to the desired number of bytes
|
||||||
|
|
||||||
|
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||||
|
ResponseMsg: type,
|
||||||
|
timeout = 10000): Future[Option[ResponseMsg]] {.async.} =
|
||||||
|
var stream = await peer.network.daemon.openStream(peer.id, @[protocolId])
|
||||||
|
# TODO how does openStream fail? Set a timeout here and handle it
|
||||||
|
let sent = await stream.transp.write(requestBytes)
|
||||||
|
# TODO: Should I check that `sent` is equal to the desired number of bytes
|
||||||
|
return await stream.readMsg(ResponseMsg, timeout)
|
||||||
|
|
||||||
|
proc handshakeImpl(peer: Peer,
|
||||||
|
handshakeSendFut: Future[void],
|
||||||
|
handshakeStream: P2PStream,
|
||||||
|
timeout: int,
|
||||||
|
HandshakeType: type): Future[HandshakeType] {.async.} =
|
||||||
|
await handshakeSendFut
|
||||||
|
let response = await handshakeStream.readMsg(HandshakeType, timeout)
|
||||||
|
if response.isSome:
|
||||||
|
return response.get
|
||||||
|
else:
|
||||||
|
await peer.disconnectAndRaise(BreachOfProtocol, "Handshake not completed in time")
|
||||||
|
|
||||||
|
proc p2pStreamName(MsgType: type): string =
|
||||||
|
mixin msgProtocol, protocolInfo, msgId
|
||||||
|
MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pProtocol
|
||||||
|
|
||||||
|
macro handshake*(peer: Peer, timeout = 10000, sendCall: untyped): untyped =
|
||||||
|
let
|
||||||
|
msgName = $sendCall[0]
|
||||||
|
msgType = newDotExpr(ident"CurrentProtocol", ident(msgName))
|
||||||
|
handshakeStream = ident "handshakeStream"
|
||||||
|
handshakeImpl = bindSym "handshakeImpl"
|
||||||
|
await = ident "await"
|
||||||
|
|
||||||
|
sendCall.insert(1, handshakeStream)
|
||||||
|
|
||||||
|
result = quote do:
|
||||||
|
proc payload(peer: Peer, `handshakeStream`: P2PStream): Future[`msgType`] {.async.} =
|
||||||
|
var `handshakeStream` = `handshakeStream`
|
||||||
|
if `handshakeStream` == nil:
|
||||||
|
`handshakeStream` = `await` openStream(peer.network.daemon,
|
||||||
|
peer.id,
|
||||||
|
@[p2pStreamName(`msgType`)],
|
||||||
|
`timeout`)
|
||||||
|
return `await` `handshakeImpl`(peer, `sendCall`, `handshakeStream`, `timeout`, `msgType`)
|
||||||
|
|
||||||
|
payload(`peer`, `handshakeStream`)
|
||||||
|
|
||||||
|
proc getCompressedMsgId(MsgType: type): CompressedMsgId =
|
||||||
|
mixin msgProtocol, protocolInfo, msgId
|
||||||
|
(protocolIndex: MsgType.msgProtocol.protocolInfo.index, msgId: MsgType.msgId)
|
||||||
|
|
||||||
|
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
|
||||||
|
## This procs awaits a specific P2P message.
|
||||||
|
## Any messages received while waiting will be dispatched to their
|
||||||
|
## respective handlers. The designated message handler will also run
|
||||||
|
## to completion before the future returned by `nextMsg` is resolved.
|
||||||
|
mixin msgProtocol, protocolInfo, msgId
|
||||||
|
let awaitedMsgId = getCompressedMsgId(MsgType)
|
||||||
|
let f = getOrDefault(peer.awaitedMessages, awaitedMsgId)
|
||||||
|
if not f.isNil:
|
||||||
|
return Future[MsgType](f)
|
||||||
|
|
||||||
|
newFuture result
|
||||||
|
peer.awaitedMessages[awaitedMsgId] = result
|
||||||
|
|
||||||
|
proc resolveNextMsgFutures(peer: Peer, msg: auto) =
|
||||||
|
type MsgType = type(msg)
|
||||||
|
let msgId = getCompressedMsgId(MsgType)
|
||||||
|
let future = peer.awaitedMessages.getOrDefault(msgId)
|
||||||
|
if future != nil:
|
||||||
|
Future[MsgType](future).complete msg
|
||||||
|
|
||||||
|
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
|
||||||
|
new result
|
||||||
|
result.id = id
|
||||||
|
result.network = network
|
||||||
|
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
|
||||||
|
result.connectionState = Connected
|
||||||
|
newSeq result.protocolStates, allProtocols.len
|
||||||
|
for i in 0 ..< allProtocols.len:
|
||||||
|
let proto = allProtocols[i]
|
||||||
|
if proto.peerStateInitializer != nil:
|
||||||
|
result.protocolStates[i] = proto.peerStateInitializer(result)
|
||||||
|
|
||||||
|
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
||||||
|
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
||||||
|
for protocol in allProtocols:
|
||||||
|
if protocol.handshake != nil:
|
||||||
|
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
|
||||||
|
|
||||||
|
await all(subProtocolsHandshakes)
|
||||||
|
|
||||||
|
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer =
|
||||||
|
result = node.peers.getOrDefault(peerId)
|
||||||
|
if result == nil:
|
||||||
|
result = Peer.init(node, peerId)
|
||||||
|
node.peers[peerId] = result
|
||||||
|
|
||||||
|
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer =
|
||||||
|
Eth2Node(daemon.userData).getPeer(stream.peer)
|
||||||
|
|
||||||
|
template getRecipient(peer: Peer): Peer =
|
||||||
|
peer
|
||||||
|
|
||||||
|
# TODO: this should be removed eventually
|
||||||
|
template getRecipient(stream: P2PStream): P2PStream =
|
||||||
|
stream
|
||||||
|
|
||||||
|
template getRecipient(response: Response): Peer =
|
||||||
|
UntypedResponse(response).peer
|
||||||
|
|
||||||
|
proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
|
||||||
|
result = ""
|
||||||
|
# TODO: uncommenting the line below increases the compile-time
|
||||||
|
# tremendously (for reasons not yet known)
|
||||||
|
# result = $(cast[ptr MsgType](msg)[])
|
||||||
|
|
||||||
|
proc initProtocol(name: string,
|
||||||
|
peerInit: PeerStateInitializer,
|
||||||
|
networkInit: NetworkStateInitializer): ProtocolInfoObj =
|
||||||
|
result.name = name
|
||||||
|
result.messages = @[]
|
||||||
|
result.peerStateInitializer = peerInit
|
||||||
|
result.networkStateInitializer = networkInit
|
||||||
|
|
||||||
|
proc setEventHandlers(p: ProtocolInfo,
|
||||||
|
handshake: HandshakeStep,
|
||||||
|
disconnectHandler: DisconnectionHandler) =
|
||||||
|
p.handshake = handshake
|
||||||
|
p.disconnectHandler = disconnectHandler
|
||||||
|
|
||||||
|
proc registerMsg(protocol: ProtocolInfo,
|
||||||
|
name: string,
|
||||||
|
thunk: MessageHandler,
|
||||||
|
libp2pProtocol: string,
|
||||||
|
printer: MessageContentPrinter) =
|
||||||
|
protocol.messages.add MessageInfo(name: name,
|
||||||
|
thunk: thunk,
|
||||||
|
libp2pProtocol: libp2pProtocol,
|
||||||
|
printer: printer)
|
||||||
|
|
||||||
|
proc registerProtocol(protocol: ProtocolInfo) =
|
||||||
|
# TODO: This can be done at compile-time in the future
|
||||||
|
let pos = lowerBound(gProtocols, protocol)
|
||||||
|
gProtocols.insert(protocol, pos)
|
||||||
|
for i in 0 ..< gProtocols.len:
|
||||||
|
gProtocols[i].index = i
|
||||||
|
|
||||||
|
template libp2pProtocol*(name, version: string) {.pragma.}
|
||||||
|
|
||||||
|
proc getRequestProtoName(fn: NimNode): NimNode =
|
||||||
|
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
|
||||||
|
# (TODO: file as an issue)
|
||||||
|
|
||||||
|
let pragmas = fn.pragma
|
||||||
|
if pragmas.kind == nnkPragma and pragmas.len > 0:
|
||||||
|
for pragma in pragmas:
|
||||||
|
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
|
||||||
|
return pragma[1]
|
||||||
|
|
||||||
|
error "All stream opening procs must have the 'libp2pProtocol' pragma specified.", fn
|
||||||
|
|
||||||
|
macro p2pProtocolImpl(name: static[string],
|
||||||
|
version: static[uint],
|
||||||
|
body: untyped,
|
||||||
|
timeout: static[int] = defaultOutgoingReqTimeout,
|
||||||
|
shortName: static[string] = "",
|
||||||
|
peerState = type(nil),
|
||||||
|
networkState = type(nil)): untyped =
|
||||||
|
## The macro used to defined P2P sub-protocols. See README.
|
||||||
|
var
|
||||||
|
# XXX: deal with a Nim bug causing the macro params to be
|
||||||
|
# zero when they are captured by a closure:
|
||||||
|
defaultTimeout = timeout
|
||||||
|
protoName = name
|
||||||
|
nextId = -1
|
||||||
|
protoNameIdent = ident(protoName)
|
||||||
|
outTypes = newNimNode(nnkStmtList)
|
||||||
|
outSendProcs = newNimNode(nnkStmtList)
|
||||||
|
outRecvProcs = newNimNode(nnkStmtList)
|
||||||
|
outProcRegistrations = newNimNode(nnkStmtList)
|
||||||
|
response = ident"response"
|
||||||
|
name_openStream = newTree(nnkPostfix, ident("*"), ident"openStream")
|
||||||
|
outputStream = ident"outputStream"
|
||||||
|
currentProtocolSym = ident"CurrentProtocol"
|
||||||
|
protocol = ident(protoName & "Protocol")
|
||||||
|
peerState = verifyStateType peerState.getType
|
||||||
|
networkState = verifyStateType networkState.getType
|
||||||
|
handshake = newNilLit()
|
||||||
|
disconnectHandler = newNilLit()
|
||||||
|
Format = ident"SSZ"
|
||||||
|
Option = bindSym "Option"
|
||||||
|
UntypedResponse = bindSym "UntypedResponse"
|
||||||
|
Response = bindSym "Response"
|
||||||
|
DaemonAPI = bindSym "DaemonAPI"
|
||||||
|
P2PStream = ident "P2PStream"
|
||||||
|
# XXX: Binding the int type causes instantiation failure for some reason
|
||||||
|
# Int = bindSym "int"
|
||||||
|
Int = ident "int"
|
||||||
|
Void = ident "void"
|
||||||
|
Peer = bindSym "Peer"
|
||||||
|
writeField = bindSym "writeField"
|
||||||
|
createNetworkState = bindSym "createNetworkState"
|
||||||
|
createPeerState = bindSym "createPeerState"
|
||||||
|
getOutput = bindSym "getOutput"
|
||||||
|
messagePrinter = bindSym "messagePrinter"
|
||||||
|
initProtocol = bindSym "initProtocol"
|
||||||
|
getRecipient = bindSym "getRecipient"
|
||||||
|
peerFromStream = bindSym "peerFromStream"
|
||||||
|
makeEth2Request = bindSym "makeEth2Request"
|
||||||
|
sendMsg = bindSym "sendMsg"
|
||||||
|
sendBytes = bindSym "sendBytes"
|
||||||
|
getState = bindSym "getState"
|
||||||
|
getNetworkState = bindSym "getNetworkState"
|
||||||
|
resolveNextMsgFutures = bindSym "resolveNextMsgFutures"
|
||||||
|
|
||||||
|
proc augmentUserHandler(userHandlerProc: NimNode,
|
||||||
|
msgKind = msgNotification,
|
||||||
|
extraDefinitions: NimNode = nil) =
|
||||||
|
## Turns a regular proc definition into an async proc and adds
|
||||||
|
## the helpers for accessing the peer and network protocol states.
|
||||||
|
|
||||||
|
userHandlerProc.addPragma ident"gcsafe"
|
||||||
|
userHandlerProc.addPragma ident"async"
|
||||||
|
|
||||||
|
# We allow the user handler to use `openarray` params, but we turn
|
||||||
|
# those into sequences to make the `async` pragma happy.
|
||||||
|
for i in 1 ..< userHandlerProc.params.len:
|
||||||
|
var param = userHandlerProc.params[i]
|
||||||
|
param[^2] = chooseFieldType(param[^2])
|
||||||
|
|
||||||
|
var userHandlerDefinitions = newStmtList()
|
||||||
|
|
||||||
|
userHandlerDefinitions.add quote do:
|
||||||
|
type `currentProtocolSym` = `protoNameIdent`
|
||||||
|
|
||||||
|
if extraDefinitions != nil:
|
||||||
|
userHandlerDefinitions.add extraDefinitions
|
||||||
|
|
||||||
|
# Define local accessors for the peer and the network protocol states
|
||||||
|
# inside each user message handler proc (e.g. peer.state.foo = bar)
|
||||||
|
if peerState != nil:
|
||||||
|
userHandlerDefinitions.add quote do:
|
||||||
|
template state(p: `Peer`): `peerState` =
|
||||||
|
cast[`peerState`](`getState`(p, `protocol`))
|
||||||
|
|
||||||
|
if networkState != nil:
|
||||||
|
userHandlerDefinitions.add quote do:
|
||||||
|
template networkState(p: `Peer`): `networkState` =
|
||||||
|
cast[`networkState`](`getNetworkState`(p.network, `protocol`))
|
||||||
|
|
||||||
|
userHandlerProc.body.insert 0, userHandlerDefinitions
|
||||||
|
|
||||||
|
proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode =
|
||||||
|
## Turns a "named" do block to a regular async proc
|
||||||
|
## (e.g. onPeerConnected do ...)
|
||||||
|
result = newTree(nnkProcDef)
|
||||||
|
doBlock.copyChildrenTo(result)
|
||||||
|
result.name = genSym(nskProc, protoName & handlerName)
|
||||||
|
augmentUserHandler result
|
||||||
|
outRecvProcs.add result
|
||||||
|
|
||||||
|
proc addMsgHandler(n: NimNode, msgKind = msgNotification,
|
||||||
|
responseRecord: NimNode = nil): NimNode =
|
||||||
|
if n[0].kind == nnkPostfix:
|
||||||
|
macros.error("p2pProcotol procs are public by default. " &
|
||||||
|
"Please remove the postfix `*`.", n)
|
||||||
|
|
||||||
|
inc nextId
|
||||||
|
|
||||||
|
let
|
||||||
|
msgIdent = n.name
|
||||||
|
msgName = $n.name
|
||||||
|
|
||||||
|
var
|
||||||
|
userPragmas = n.pragma
|
||||||
|
|
||||||
|
# variables used in the sending procs
|
||||||
|
msgRecipient = ident"msgRecipient"
|
||||||
|
sendTo = ident"sendTo"
|
||||||
|
writer = ident"writer"
|
||||||
|
recordStartMemo = ident"recordStartMemo"
|
||||||
|
reqTimeout: NimNode
|
||||||
|
appendParams = newNimNode(nnkStmtList)
|
||||||
|
paramsToWrite = newSeq[NimNode](0)
|
||||||
|
msgId = newLit(nextId)
|
||||||
|
|
||||||
|
# variables used in the receiving procs
|
||||||
|
receivedMsg = ident"msg"
|
||||||
|
daemon = ident "daemon"
|
||||||
|
stream = ident "stream"
|
||||||
|
await = ident "await"
|
||||||
|
peerIdent = ident "peer"
|
||||||
|
tracing = newNimNode(nnkStmtList)
|
||||||
|
|
||||||
|
# nodes to store the user-supplied message handling proc if present
|
||||||
|
userHandlerProc: NimNode = nil
|
||||||
|
userHandlerCall: NimNode = nil
|
||||||
|
awaitUserHandler = newStmtList()
|
||||||
|
|
||||||
|
# a record type associated with the message
|
||||||
|
msgRecord = newIdentNode(msgName & "Obj")
|
||||||
|
msgRecordFields = newTree(nnkRecList)
|
||||||
|
msgRecordBody = newTree(nnkObjectTy,
|
||||||
|
newEmptyNode(),
|
||||||
|
newEmptyNode(),
|
||||||
|
msgRecordFields)
|
||||||
|
|
||||||
|
result = msgRecord
|
||||||
|
|
||||||
|
if msgKind == msgRequest:
|
||||||
|
# If the request proc has a default timeout specified, remove it from
|
||||||
|
# the signature for now so we can generate the `thunk` proc without it.
|
||||||
|
# The parameter will be added back later only for to the sender proc.
|
||||||
|
# When the timeout is not specified, we use a default one.
|
||||||
|
reqTimeout = popTimeoutParam(n)
|
||||||
|
if reqTimeout == nil:
|
||||||
|
reqTimeout = newTree(nnkIdentDefs,
|
||||||
|
ident"timeout",
|
||||||
|
Int, newLit(defaultTimeout))
|
||||||
|
|
||||||
|
if n.body.kind != nnkEmpty:
|
||||||
|
# Implement the receiving thunk proc that deserialzed the
|
||||||
|
# message parameters and calls the user proc:
|
||||||
|
userHandlerProc = n.copyNimTree
|
||||||
|
userHandlerProc.name = genSym(nskProc, msgName)
|
||||||
|
|
||||||
|
# This is the call to the user supplied handler.
|
||||||
|
# Here we add only the initial params, the rest will be added later.
|
||||||
|
userHandlerCall = newCall(userHandlerProc.name)
|
||||||
|
# When there is a user handler, it must be awaited in the thunk proc.
|
||||||
|
# Above, by default `awaitUserHandler` is set to a no-op statement list.
|
||||||
|
awaitUserHandler = newCall(await, userHandlerCall)
|
||||||
|
|
||||||
|
var extraDefs: NimNode
|
||||||
|
if msgKind == msgRequest:
|
||||||
|
# Request procs need an extra param - the stream where the response
|
||||||
|
# should be written:
|
||||||
|
userHandlerProc.params.insert(1, newIdentDefs(stream, P2PStream))
|
||||||
|
userHandlerCall.add stream
|
||||||
|
let peer = userHandlerProc.params[2][0]
|
||||||
|
extraDefs = quote do:
|
||||||
|
# Jump through some hoops to work aroung
|
||||||
|
# https://github.com/nim-lang/Nim/issues/6248
|
||||||
|
let `response` = `Response`[`responseRecord`](
|
||||||
|
`UntypedResponse`(peer: `peer`, stream: `stream`))
|
||||||
|
|
||||||
|
# Resolve the Eth2Peer from the LibP2P data received in the thunk
|
||||||
|
userHandlerCall.add peerIdent
|
||||||
|
|
||||||
|
augmentUserHandler userHandlerProc, msgKind, extraDefs
|
||||||
|
outRecvProcs.add userHandlerProc
|
||||||
|
|
||||||
|
elif msgName == "status":
|
||||||
|
awaitUserHandler = quote do:
|
||||||
|
`await` `handshake`(`peerIdent`, `stream`)
|
||||||
|
|
||||||
|
for param, paramType in n.typedParams(skip = 1):
|
||||||
|
paramsToWrite.add param
|
||||||
|
|
||||||
|
# Each message has a corresponding record type.
|
||||||
|
# Here, we create its fields one by one:
|
||||||
|
msgRecordFields.add newTree(nnkIdentDefs,
|
||||||
|
newTree(nnkPostfix, ident("*"), param), # The fields are public
|
||||||
|
chooseFieldType(paramType), # some types such as openarray
|
||||||
|
# are automatically remapped
|
||||||
|
newEmptyNode())
|
||||||
|
|
||||||
|
# If there is user message handler, we'll place a call to it by
|
||||||
|
# unpacking the fields of the received message:
|
||||||
|
if userHandlerCall != nil:
|
||||||
|
userHandlerCall.add quote do: get(`receivedMsg`).`param` # newDotExpr(newCall("get", receivedMsg), param)
|
||||||
|
|
||||||
|
when tracingEnabled:
|
||||||
|
tracing = quote do:
|
||||||
|
logReceivedMsg(`stream`.peer, `receivedMsg`.get)
|
||||||
|
|
||||||
|
let requestDataTimeout = newLit(defaultIncomingReqTimeout)
|
||||||
|
|
||||||
|
let thunkName = ident(msgName & "_thunk")
|
||||||
|
var thunkProc = quote do:
|
||||||
|
proc `thunkName`(`daemon`: `DaemonAPI`, `stream`: `P2PStream`) {.async, gcsafe.} =
|
||||||
|
var `receivedMsg` = `await` readMsg(`stream`, `msgRecord`, `requestDataTimeout`)
|
||||||
|
if `receivedMsg`.isNone:
|
||||||
|
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
|
||||||
|
return
|
||||||
|
let `peerIdent` = `peerFromStream`(`daemon`, `stream`)
|
||||||
|
`tracing`
|
||||||
|
`awaitUserHandler`
|
||||||
|
`resolveNextMsgFutures`(`peerIdent`, get(`receivedMsg`))
|
||||||
|
|
||||||
|
for p in userPragmas:
|
||||||
|
thunkProc.addPragma p
|
||||||
|
|
||||||
|
outRecvProcs.add thunkProc
|
||||||
|
|
||||||
|
outTypes.add quote do:
|
||||||
|
# This is a type featuring a single field for each message param:
|
||||||
|
type `msgRecord`* = `msgRecordBody`
|
||||||
|
|
||||||
|
# Add a helper template for accessing the message type:
|
||||||
|
# e.g. p2p.hello:
|
||||||
|
template `msgIdent`*(T: type `protoNameIdent`): type = `msgRecord`
|
||||||
|
template msgId*(T: type `msgRecord`): int = `msgId`
|
||||||
|
template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent`
|
||||||
|
|
||||||
|
var msgSendProc = n
|
||||||
|
let msgSendProcName = n.name
|
||||||
|
outSendProcs.add msgSendProc
|
||||||
|
|
||||||
|
# TODO: check that the first param has the correct type
|
||||||
|
msgSendProc.params[1][0] = sendTo
|
||||||
|
if nextId == 0: msgSendProc.params[1][1] = P2PStream
|
||||||
|
msgSendProc.addPragma ident"gcsafe"
|
||||||
|
|
||||||
|
# Add a timeout parameter for all request procs
|
||||||
|
case msgKind
|
||||||
|
of msgRequest:
|
||||||
|
msgSendProc.params.add reqTimeout
|
||||||
|
of msgResponse:
|
||||||
|
# A response proc must be called with a response object that originates
|
||||||
|
# from a certain request. Here we change the Peer parameter at position
|
||||||
|
# 1 to the correct strongly-typed ResponseType. The incoming procs still
|
||||||
|
# gets the normal Peer paramter.
|
||||||
|
let ResponseType = newTree(nnkBracketExpr, Response, msgRecord)
|
||||||
|
msgSendProc.params[1][1] = ResponseType
|
||||||
|
outSendProcs.add quote do:
|
||||||
|
template send*(r: `ResponseType`, args: varargs[untyped]): auto =
|
||||||
|
`msgSendProcName`(r, args)
|
||||||
|
else: discard
|
||||||
|
|
||||||
|
# We change the return type of the sending proc to a Future.
|
||||||
|
# If this is a request proc, the future will return the response record.
|
||||||
|
let rt = case msgKind
|
||||||
|
of msgRequest: newTree(nnkBracketExpr, Option, responseRecord)
|
||||||
|
of msgResponse, msgNotification: Void
|
||||||
|
msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt)
|
||||||
|
|
||||||
|
let msgBytes = ident"msgBytes"
|
||||||
|
|
||||||
|
# Make the send proc public
|
||||||
|
msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name)
|
||||||
|
|
||||||
|
let initWriter = quote do:
|
||||||
|
var `outputStream` = init OutputStream
|
||||||
|
var `writer` = init(WriterType(`Format`), `outputStream`)
|
||||||
|
var `recordStartMemo` = beginRecord(`writer`, `msgRecord`)
|
||||||
|
|
||||||
|
for param in paramsToWrite:
|
||||||
|
appendParams.add newCall(writeField, writer, newLit($param), param)
|
||||||
|
|
||||||
|
when tracingEnabled:
|
||||||
|
appendParams.add logSentMsgFields(msgRecipient, protocol, msgName, paramsToWrite)
|
||||||
|
|
||||||
|
let finalizeRequest = quote do:
|
||||||
|
endRecord(`writer`, `recordStartMemo`)
|
||||||
|
let `msgBytes` = `getOutput`(`outputStream`)
|
||||||
|
|
||||||
|
var msgProto = newLit("")
|
||||||
|
let sendCall =
|
||||||
|
if msgKind != msgResponse:
|
||||||
|
msgProto = getRequestProtoName(n)
|
||||||
|
|
||||||
|
when false:
|
||||||
|
var openStreamProc = n.copyNimTree
|
||||||
|
var openStreamProc.name = name_openStream
|
||||||
|
openStreamProc.params.insert 1, newIdentDefs(ident"T", msgRecord)
|
||||||
|
|
||||||
|
if msgKind == msgRequest:
|
||||||
|
let timeout = reqTimeout[0]
|
||||||
|
quote: `makeEth2Request`(`msgRecipient`, `msgProto`, `msgBytes`,
|
||||||
|
`responseRecord`, `timeout`)
|
||||||
|
elif nextId == 0:
|
||||||
|
quote: `sendBytes`(`sendTo`, `msgBytes`)
|
||||||
|
else:
|
||||||
|
quote: `sendMsg`(`msgRecipient`, `msgProto`, `msgBytes`)
|
||||||
|
else:
|
||||||
|
quote: `sendBytes`(`UntypedResponse`(`sendTo`).stream, `msgBytes`)
|
||||||
|
|
||||||
|
msgSendProc.body = quote do:
|
||||||
|
let `msgRecipient` = `getRecipient`(`sendTo`)
|
||||||
|
`initWriter`
|
||||||
|
`appendParams`
|
||||||
|
`finalizeRequest`
|
||||||
|
return `sendCall`
|
||||||
|
|
||||||
|
outProcRegistrations.add(
|
||||||
|
newCall(bindSym("registerMsg"),
|
||||||
|
protocol,
|
||||||
|
newLit(msgName),
|
||||||
|
thunkName,
|
||||||
|
msgProto,
|
||||||
|
newTree(nnkBracketExpr, messagePrinter, msgRecord)))
|
||||||
|
|
||||||
|
outTypes.add quote do:
|
||||||
|
# Create a type acting as a pseudo-object representing the protocol
|
||||||
|
# (e.g. p2p)
|
||||||
|
type `protoNameIdent`* = object
|
||||||
|
|
||||||
|
if peerState != nil:
|
||||||
|
outTypes.add quote do:
|
||||||
|
template State*(P: type `protoNameIdent`): type = `peerState`
|
||||||
|
|
||||||
|
if networkState != nil:
|
||||||
|
outTypes.add quote do:
|
||||||
|
template NetworkState*(P: type `protoNameIdent`): type = `networkState`
|
||||||
|
|
||||||
|
for n in body:
|
||||||
|
case n.kind
|
||||||
|
of {nnkCall, nnkCommand}:
|
||||||
|
if eqIdent(n[0], "nextID"):
|
||||||
|
discard
|
||||||
|
elif eqIdent(n[0], "requestResponse"):
|
||||||
|
# `requestResponse` can be given a block of 2 or more procs.
|
||||||
|
# The last one is considered to be a response message, while
|
||||||
|
# all preceeding ones are requests triggering the response.
|
||||||
|
# The system makes sure to automatically insert a hidden `reqId`
|
||||||
|
# parameter used to discriminate the individual messages.
|
||||||
|
block processReqResp:
|
||||||
|
if n.len == 2 and n[1].kind == nnkStmtList:
|
||||||
|
var procs = newSeq[NimNode](0)
|
||||||
|
for def in n[1]:
|
||||||
|
if def.kind == nnkProcDef:
|
||||||
|
procs.add(def)
|
||||||
|
if procs.len > 1:
|
||||||
|
let responseRecord = addMsgHandler(procs[^1],
|
||||||
|
msgKind = msgResponse)
|
||||||
|
for i in 0 .. procs.len - 2:
|
||||||
|
discard addMsgHandler(procs[i],
|
||||||
|
msgKind = msgRequest,
|
||||||
|
responseRecord = responseRecord)
|
||||||
|
|
||||||
|
# we got all the way to here, so everything is fine.
|
||||||
|
# break the block so it doesn't reach the error call below
|
||||||
|
break processReqResp
|
||||||
|
macros.error("requestResponse expects a block with at least two proc definitions")
|
||||||
|
elif eqIdent(n[0], "onPeerConnected"):
|
||||||
|
var handshakeProc = liftEventHandler(n[1], "Handshake")
|
||||||
|
handshakeProc.params.add newIdentDefs(ident"handshakeStream", P2PStream)
|
||||||
|
handshake = handshakeProc.name
|
||||||
|
elif eqIdent(n[0], "onPeerDisconnected"):
|
||||||
|
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect").name
|
||||||
|
else:
|
||||||
|
macros.error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
|
||||||
|
of nnkProcDef:
|
||||||
|
discard addMsgHandler(n)
|
||||||
|
|
||||||
|
of nnkCommentStmt:
|
||||||
|
discard
|
||||||
|
|
||||||
|
else:
|
||||||
|
macros.error("illegal syntax in a P2P protocol definition", n)
|
||||||
|
|
||||||
|
let peerInit = if peerState == nil: newNilLit()
|
||||||
|
else: newTree(nnkBracketExpr, createPeerState, peerState)
|
||||||
|
|
||||||
|
let netInit = if networkState == nil: newNilLit()
|
||||||
|
else: newTree(nnkBracketExpr, createNetworkState, networkState)
|
||||||
|
|
||||||
|
result = newNimNode(nnkStmtList)
|
||||||
|
result.add outTypes
|
||||||
|
result.add quote do:
|
||||||
|
# One global variable per protocol holds the protocol run-time data
|
||||||
|
var p = `initProtocol`(`protoName`, `peerInit`, `netInit`)
|
||||||
|
var `protocol` = addr p
|
||||||
|
|
||||||
|
# The protocol run-time data is available as a pseudo-field
|
||||||
|
# (e.g. `p2p.protocolInfo`)
|
||||||
|
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
|
||||||
|
|
||||||
|
result.add outSendProcs, outRecvProcs, outProcRegistrations
|
||||||
|
result.add quote do:
|
||||||
|
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
|
||||||
|
|
||||||
|
result.add newCall(bindSym("registerProtocol"), protocol)
|
||||||
|
|
||||||
|
when defined(debugP2pProtocol) or defined(debugMacros):
|
||||||
|
echo repr(result)
|
||||||
|
|
||||||
|
macro p2pProtocol*(protocolOptions: untyped, body: untyped): untyped =
|
||||||
|
let protoName = $(protocolOptions[0])
|
||||||
|
result = protocolOptions
|
||||||
|
result[0] = bindSym"p2pProtocolImpl"
|
||||||
|
result.add(newTree(nnkExprEqExpr,
|
||||||
|
ident("name"),
|
||||||
|
newLit(protoName)))
|
||||||
|
result.add(newTree(nnkExprEqExpr,
|
||||||
|
ident("body"),
|
||||||
|
body))
|
||||||
|
|
||||||
|
proc makeMessageHandler[MsgType](msgHandler: proc(msg: MsgType)): P2PPubSubCallback =
|
||||||
|
result = proc(api: DaemonAPI, ticket: PubsubTicket, msg: PubSubMessage): Future[bool] {.async.} =
|
||||||
|
msgHandler SSZ.decode(msg.data, MsgType)
|
||||||
|
return true
|
||||||
|
|
||||||
|
proc subscribe*[MsgType](node: EthereumNode,
|
||||||
|
topic: string,
|
||||||
|
msgHandler: proc(msg: MsgType)) {.async.} =
|
||||||
|
discard await node.daemon.pubsubSubscribe(topic, makeMessageHandler(msgHandler))
|
||||||
|
|
||||||
|
proc broadcast*(node: Eth2Node, topic: string, msg: auto) {.async.} =
|
||||||
|
await node.daemon.pubsubPublish(topic, SSZ.encode(msg))
|
||||||
|
|
|
@ -10,11 +10,36 @@
|
||||||
|
|
||||||
import
|
import
|
||||||
endians, typetraits, options, algorithm,
|
endians, typetraits, options, algorithm,
|
||||||
eth/common, nimcrypto/keccak,
|
faststreams/input_stream, serialization, eth/common, nimcrypto/keccak,
|
||||||
./spec/[crypto, datatypes, digest]
|
./spec/[crypto, datatypes, digest]
|
||||||
|
|
||||||
# ################### Helper functions ###################################
|
# ################### Helper functions ###################################
|
||||||
|
|
||||||
|
export
|
||||||
|
serialization
|
||||||
|
|
||||||
|
type
|
||||||
|
SszReader* = object
|
||||||
|
stream: ByteStreamVar
|
||||||
|
|
||||||
|
SszWriter* = object
|
||||||
|
stream: OutputStreamVar
|
||||||
|
|
||||||
|
SszError* = object of SerializationError
|
||||||
|
CorruptedDataError* = object of SszError
|
||||||
|
|
||||||
|
RecordWritingMemo = object
|
||||||
|
initialStreamPos: int
|
||||||
|
sizePrefixCursor: DelayedWriteCursor
|
||||||
|
|
||||||
|
serializationFormat SSZ,
|
||||||
|
Reader = SszReader,
|
||||||
|
Writer = SszWriter,
|
||||||
|
PreferedOutput = seq[byte]
|
||||||
|
|
||||||
|
proc init*(T: type SszReader, stream: ByteStreamVar): T =
|
||||||
|
result.stream = stream
|
||||||
|
|
||||||
# toBytesSSZ convert simple fixed-length types to their SSZ wire representation
|
# toBytesSSZ convert simple fixed-length types to their SSZ wire representation
|
||||||
func toBytesSSZ(x: SomeInteger): array[sizeof(x), byte] =
|
func toBytesSSZ(x: SomeInteger): array[sizeof(x), byte] =
|
||||||
## Convert directly to bytes the size of the int. (e.g. ``uint16 = 2 bytes``)
|
## Convert directly to bytes the size of the int. (e.g. ``uint16 = 2 bytes``)
|
||||||
|
@ -44,7 +69,7 @@ func toBytesSSZ(x: Eth2Digest): array[32, byte] = x.data
|
||||||
func toBytesSSZ(x: ValidatorPubKey|ValidatorSig): auto = x.getBytes()
|
func toBytesSSZ(x: ValidatorPubKey|ValidatorSig): auto = x.getBytes()
|
||||||
|
|
||||||
type
|
type
|
||||||
TrivialTypes =
|
TrivialType =
|
||||||
# Types that serialize down to a fixed-length array - most importantly,
|
# Types that serialize down to a fixed-length array - most importantly,
|
||||||
# these values don't carry a length prefix in the final encoding. toBytesSSZ
|
# these values don't carry a length prefix in the final encoding. toBytesSSZ
|
||||||
# provides the actual nim-type-to-bytes conversion.
|
# provides the actual nim-type-to-bytes conversion.
|
||||||
|
@ -55,7 +80,7 @@ type
|
||||||
SomeInteger | EthAddress | Eth2Digest | ValidatorPubKey | ValidatorSig |
|
SomeInteger | EthAddress | Eth2Digest | ValidatorPubKey | ValidatorSig |
|
||||||
bool
|
bool
|
||||||
|
|
||||||
func sszLen(v: TrivialTypes): int = toBytesSSZ(v).len
|
func sszLen(v: TrivialType): int = toBytesSSZ(v).len
|
||||||
func sszLen(v: ValidatorIndex): int = toBytesSSZ(v).len
|
func sszLen(v: ValidatorIndex): int = toBytesSSZ(v).len
|
||||||
|
|
||||||
func sszLen(v: object | tuple): int =
|
func sszLen(v: object | tuple): int =
|
||||||
|
@ -68,18 +93,18 @@ func sszLen(v: seq | array): int =
|
||||||
for i in v:
|
for i in v:
|
||||||
result += sszLen(i)
|
result += sszLen(i)
|
||||||
|
|
||||||
# fromBytesSSZUnsafe copy wire representation to a Nim variable, assuming
|
# fromBytesSSZ copies the wire representation to a Nim variable,
|
||||||
# there's enough data in the buffer
|
# assuming there's enough data in the buffer
|
||||||
func fromBytesSSZUnsafe(T: typedesc[SomeInteger], data: pointer): T =
|
func fromBytesSSZ(T: type SomeInteger, data: openarray[byte]): T =
|
||||||
## Convert directly to bytes the size of the int. (e.g. ``uint16 = 2 bytes``)
|
## Convert directly to bytes the size of the int. (e.g. ``uint16 = 2 bytes``)
|
||||||
## All integers are serialized as **little endian**.
|
## All integers are serialized as **little endian**.
|
||||||
## TODO: Assumes data points to a sufficiently large buffer
|
## TODO: Assumes data points to a sufficiently large buffer
|
||||||
|
doAssert data.len == sizeof(result)
|
||||||
# TODO: any better way to get a suitably aligned buffer in nim???
|
# TODO: any better way to get a suitably aligned buffer in nim???
|
||||||
# see also: https://github.com/nim-lang/Nim/issues/9206
|
# see also: https://github.com/nim-lang/Nim/issues/9206
|
||||||
var tmp: uint64
|
var tmp: uint64
|
||||||
var alignedBuf = cast[ptr byte](tmp.addr)
|
var alignedBuf = cast[ptr byte](tmp.addr)
|
||||||
copyMem(alignedBuf, data, result.sizeof)
|
copyMem(alignedBuf, unsafeAddr data[0], result.sizeof)
|
||||||
|
|
||||||
when result.sizeof == 8: littleEndian64(result.addr, alignedBuf)
|
when result.sizeof == 8: littleEndian64(result.addr, alignedBuf)
|
||||||
elif result.sizeof == 4: littleEndian32(result.addr, alignedBuf)
|
elif result.sizeof == 4: littleEndian32(result.addr, alignedBuf)
|
||||||
|
@ -87,143 +112,133 @@ func fromBytesSSZUnsafe(T: typedesc[SomeInteger], data: pointer): T =
|
||||||
elif result.sizeof == 1: copyMem(result.addr, alignedBuf, sizeof(result))
|
elif result.sizeof == 1: copyMem(result.addr, alignedBuf, sizeof(result))
|
||||||
else: {.fatal: "Unsupported type deserialization: " & $(type(result)).name.}
|
else: {.fatal: "Unsupported type deserialization: " & $(type(result)).name.}
|
||||||
|
|
||||||
func fromBytesSSZUnsafe(T: typedesc[bool], data: pointer): T =
|
func fromBytesSSZ(T: type bool, data: openarray[byte]): T =
|
||||||
# TODO: spec doesn't say what to do if the value is >1 - we'll use the C
|
# TODO: spec doesn't say what to do if the value is >1 - we'll use the C
|
||||||
# definition for now, but maybe this should be a parse error instead?
|
# definition for now, but maybe this should be a parse error instead?
|
||||||
fromBytesSSZUnsafe(uint8, data) != 0
|
fromBytesSSZ(uint8, data) != 0
|
||||||
|
|
||||||
func fromBytesSSZUnsafe(T: typedesc[ValidatorIndex], data: pointer): T =
|
func fromBytesSSZ(T: type ValidatorIndex, data: openarray[byte]): T =
|
||||||
## Integers are all encoded as littleendian and not padded
|
## Integers are all encoded as littleendian and not padded
|
||||||
|
doAssert data.len == 3
|
||||||
var tmp: uint32
|
var tmp: uint32
|
||||||
let p = cast[ptr UncheckedArray[byte]](data)
|
tmp = tmp or uint32(data[0])
|
||||||
tmp = tmp or uint32(p[0])
|
tmp = tmp or uint32(data[1]) shl 8
|
||||||
tmp = tmp or uint32(p[1]) shl 8
|
tmp = tmp or uint32(data[2]) shl 16
|
||||||
tmp = tmp or uint32(p[2]) shl 16
|
|
||||||
result = tmp.ValidatorIndex
|
result = tmp.ValidatorIndex
|
||||||
|
|
||||||
func fromBytesSSZUnsafe(T: typedesc[EthAddress], data: pointer): T =
|
func fromBytesSSZ(T: type EthAddress, data: openarray[byte]): T =
|
||||||
copyMem(result.addr, data, sizeof(result))
|
doAssert data.len == sizeof(result)
|
||||||
|
copyMem(result.addr, unsafeAddr data[0], sizeof(result))
|
||||||
|
|
||||||
func fromBytesSSZUnsafe(T: typedesc[Eth2Digest], data: pointer): T =
|
func fromBytesSSZ(T: type Eth2Digest, data: openarray[byte]): T =
|
||||||
copyMem(result.data.addr, data, sizeof(result.data))
|
doAssert data.len == sizeof(result.data)
|
||||||
|
copyMem(result.data.addr, unsafeAddr data[0], sizeof(result.data))
|
||||||
|
|
||||||
proc deserialize[T: TrivialTypes](
|
proc init*(T: type SszWriter, stream: OutputStreamVar): T =
|
||||||
dest: var T, offset: var int, data: openArray[byte]): bool =
|
result.stream = stream
|
||||||
# TODO proc because milagro is problematic
|
|
||||||
if offset + sszLen(dest) > data.len():
|
proc writeValue*(w: var SszWriter, obj: auto)
|
||||||
false
|
|
||||||
|
# This is an alternative lower-level API useful for RPC
|
||||||
|
# frameworks that can simulate the serialization of an
|
||||||
|
# object without constructing an actual instance:
|
||||||
|
proc beginRecord*(w: var SszWriter, T: type): RecordWritingMemo =
|
||||||
|
result.initialStreamPos = w.stream.pos
|
||||||
|
result.sizePrefixCursor = w.stream.delayFixedSizeWrite sizeof(uint32)
|
||||||
|
|
||||||
|
template writeField*(w: var SszWriter, name: string, value: auto) =
|
||||||
|
w.writeValue(value)
|
||||||
|
|
||||||
|
proc endRecord*(w: var SszWriter, memo: RecordWritingMemo) =
|
||||||
|
let finalSize = uint32(w.stream.pos - memo.initialStreamPos - 4)
|
||||||
|
memo.sizePrefixCursor.endWrite(finalSize.toBytesSSZ)
|
||||||
|
|
||||||
|
proc writeValue*(w: var SszWriter, obj: auto) =
|
||||||
|
# We are not using overloads here, because this leads to
|
||||||
|
# slightly better error messages when the user provides
|
||||||
|
# additional overloads for `writeValue`.
|
||||||
|
mixin writeValue
|
||||||
|
|
||||||
|
when obj is ValidatorIndex|TrivialType:
|
||||||
|
w.stream.append obj.toBytesSSZ
|
||||||
|
elif obj is enum:
|
||||||
|
w.stream.append uint64(obj).toBytesSSZ
|
||||||
else:
|
else:
|
||||||
when T is (ValidatorPubKey|ValidatorSig):
|
let memo = w.beginRecord(obj.type)
|
||||||
if dest.init(data[offset..data.len-1]):
|
when obj is seq|array|openarray:
|
||||||
offset += sszLen(dest)
|
# If you get an error here that looks like:
|
||||||
true
|
# type mismatch: got <type range 0..8191(uint64)>
|
||||||
else:
|
# you just used an unsigned int for an array index thinking you'd get
|
||||||
false
|
# away with it (surprise, surprise: you can't, uints are crippled!)
|
||||||
|
# https://github.com/nim-lang/Nim/issues/9984
|
||||||
|
for elem in obj:
|
||||||
|
w.writeValue elem
|
||||||
else:
|
else:
|
||||||
dest = fromBytesSSZUnsafe(T, data[offset].unsafeAddr)
|
obj.serializeFields(fieldName, field):
|
||||||
offset += sszLen(dest)
|
# for research/serialized_sizes, remove when appropriate
|
||||||
true
|
when defined(debugFieldSizes) and obj is (BeaconState|BeaconBlock):
|
||||||
|
let start = w.stream.pos
|
||||||
|
w.writeValue field
|
||||||
|
debugEcho k, ": ", w.stream.pos - start
|
||||||
|
else:
|
||||||
|
w.writeValue field
|
||||||
|
w.endRecord(memo)
|
||||||
|
|
||||||
func deserialize(
|
proc readValue*(r: var SszReader, result: var auto) =
|
||||||
dest: var ValidatorIndex, offset: var int, data: openArray[byte]): bool =
|
# We are not using overloads here, because this leads to
|
||||||
if offset + sszLen(dest) > data.len():
|
# slightly better error messages when the user provides
|
||||||
false
|
# additional overloads for `readValue`.
|
||||||
else:
|
type T = result.type
|
||||||
dest = fromBytesSSZUnsafe(ValidatorIndex, data[offset].unsafeAddr)
|
mixin readValue
|
||||||
offset += sszLen(dest)
|
|
||||||
true
|
|
||||||
|
|
||||||
func deserialize[T: enum](dest: var T, offset: var int, data: openArray[byte]): bool =
|
template checkEof(n: int) =
|
||||||
# TODO er, verify the size here, probably an uint64 but...
|
if not r.stream[].ensureBytes(n):
|
||||||
var tmp: uint64
|
raise newException(UnexpectedEofError, "SSZ has insufficient number of bytes")
|
||||||
if not deserialize(tmp, offset, data):
|
|
||||||
false
|
when result is ValidatorIndex|TrivialType:
|
||||||
else:
|
let bytesToRead = result.sszLen;
|
||||||
|
checkEof bytesToRead
|
||||||
|
|
||||||
|
when result is ValidatorPubKey|ValidatorSig:
|
||||||
|
if not result.init(r.stream.readBytes(bytesToRead)):
|
||||||
|
raise newException(CorruptedDataError, "Failed to load a BLS key or signature")
|
||||||
|
else:
|
||||||
|
result = T.fromBytesSSZ(r.stream.readBytes(bytesToRead))
|
||||||
|
|
||||||
|
elif result is enum:
|
||||||
# TODO what to do with out-of-range values?? rejecting means breaking
|
# TODO what to do with out-of-range values?? rejecting means breaking
|
||||||
# forwards compatibility..
|
# forwards compatibility..
|
||||||
dest = cast[T](tmp)
|
result = cast[T](r.readValue(uint64))
|
||||||
true
|
|
||||||
|
|
||||||
proc deserialize[T: not (enum|TrivialTypes|ValidatorIndex)](
|
elif result is string:
|
||||||
dest: var T, offset: var int, data: openArray[byte]): bool =
|
{.error: "The SSZ format doesn't support the string type yet".}
|
||||||
# Length in bytes, followed by each item
|
|
||||||
var totalLen: uint32
|
|
||||||
if not deserialize(totalLen, offset, data): return false
|
|
||||||
|
|
||||||
if offset + totalLen.int > data.len(): return false
|
|
||||||
|
|
||||||
let itemEnd = offset + totalLen.int
|
|
||||||
when T is seq:
|
|
||||||
# Items are of homogenous type, but not necessarily homogenous length,
|
|
||||||
# cannot pre-allocate item list generically
|
|
||||||
while offset < itemEnd:
|
|
||||||
dest.setLen dest.len + 1
|
|
||||||
if not deserialize(dest[^1], offset, data): return false
|
|
||||||
elif T is array:
|
|
||||||
var i = 0
|
|
||||||
while offset < itemEnd:
|
|
||||||
if not deserialize(dest[i], offset, data): return false
|
|
||||||
i += 1
|
|
||||||
if i > dest.len: return false
|
|
||||||
else:
|
else:
|
||||||
for field in dest.fields:
|
let totalLen = int r.readValue(uint32)
|
||||||
if not deserialize(field, offset, data): return false
|
checkEof totalLen
|
||||||
if offset != itemEnd: return false
|
|
||||||
|
|
||||||
true
|
let endPos = r.stream[].pos + totalLen
|
||||||
|
when T is seq:
|
||||||
|
type ElemType = type(result[0])
|
||||||
|
# Items are of homogenous type, but not necessarily homogenous length,
|
||||||
|
# cannot pre-allocate item list generically
|
||||||
|
while r.stream[].pos < endPos:
|
||||||
|
result.add r.readValue(ElemType)
|
||||||
|
|
||||||
func serialize(dest: var seq[byte], src: TrivialTypes) =
|
elif T is array:
|
||||||
dest.add src.toBytesSSZ()
|
type ElemType = type(result[0])
|
||||||
func serialize(dest: var seq[byte], src: ValidatorIndex) =
|
var i = 0
|
||||||
dest.add src.toBytesSSZ()
|
while r.stream[].pos < endPos:
|
||||||
|
if i > result.len:
|
||||||
|
raise newException(CorruptedDataError, "SSZ includes unexpected bytes past the end of an array")
|
||||||
|
result[i] = r.readValue(ElemType)
|
||||||
|
i += 1
|
||||||
|
|
||||||
func serialize(dest: var seq[byte], x: enum) =
|
|
||||||
# TODO er, verify the size here, probably an uint64 but...
|
|
||||||
serialize dest, uint64(x)
|
|
||||||
|
|
||||||
func serialize[T: not enum](dest: var seq[byte], src: T) =
|
|
||||||
let lenPos = dest.len()
|
|
||||||
|
|
||||||
# Length is a prefix, so we'll put a dummy 0 here and fill it after
|
|
||||||
# serializing
|
|
||||||
dest.add toBytesSSZ(0'u32)
|
|
||||||
|
|
||||||
when T is seq|array:
|
|
||||||
# If you get an error here that looks like:
|
|
||||||
# type mismatch: got <type range 0..8191(uint64)>
|
|
||||||
# you just used an unsigned int for an array index thinking you'd get
|
|
||||||
# away with it (surprise, surprise: you can't, uints are crippled!)
|
|
||||||
# https://github.com/nim-lang/Nim/issues/9984
|
|
||||||
for val in src:
|
|
||||||
serialize dest, val
|
|
||||||
else:
|
|
||||||
when defined(debugFieldSizes) and T is (BeaconState | BeaconBlock):
|
|
||||||
# for research/serialized_sizes, remove when appropriate
|
|
||||||
for name, field in src.fieldPairs:
|
|
||||||
let start = dest.len()
|
|
||||||
serialize dest, field
|
|
||||||
let sz = dest.len() - start
|
|
||||||
debugEcho(name, ": ", sz)
|
|
||||||
else:
|
else:
|
||||||
for field in src.fields:
|
result.deserializeFields(fieldName, field):
|
||||||
serialize dest, field
|
field = r.readValue(field.type)
|
||||||
|
|
||||||
# Write size (we only know it once we've serialized the object!)
|
if r.stream[].pos != endPos:
|
||||||
var objLen = dest.len() - lenPos - 4
|
raise newException(CorruptedDataError, "SSZ includes unexpected bytes past the end of the deserialized object")
|
||||||
littleEndian32(dest[lenPos].addr, objLen.addr)
|
|
||||||
|
|
||||||
# ################### Core functions ###################################
|
|
||||||
|
|
||||||
proc deserialize*(data: openArray[byte],
|
|
||||||
typ: typedesc): auto {.inline.} =
|
|
||||||
# TODO: returns Option[typ]: https://github.com/nim-lang/Nim/issues/9195
|
|
||||||
var ret: typ
|
|
||||||
var offset: int
|
|
||||||
if not deserialize(ret, offset, data): none(typ)
|
|
||||||
else: some(ret)
|
|
||||||
|
|
||||||
func serialize*(value: auto): seq[byte] =
|
|
||||||
serialize(result, value)
|
|
||||||
|
|
||||||
# ################### Hashing ###################################
|
# ################### Hashing ###################################
|
||||||
|
|
||||||
|
@ -251,7 +266,7 @@ func hash(a, b: openArray[byte]): array[32, byte] =
|
||||||
|
|
||||||
# TODO: er, how is this _actually_ done?
|
# TODO: er, how is this _actually_ done?
|
||||||
# Mandatory bug: https://github.com/nim-lang/Nim/issues/9825
|
# Mandatory bug: https://github.com/nim-lang/Nim/issues/9825
|
||||||
func empty(T: typedesc): T = discard
|
func empty(T: type): T = discard
|
||||||
const emptyChunk = empty(array[CHUNK_SIZE, byte])
|
const emptyChunk = empty(array[CHUNK_SIZE, byte])
|
||||||
|
|
||||||
func merkleHash[T](lst: openArray[T]): array[32, byte]
|
func merkleHash[T](lst: openArray[T]): array[32, byte]
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
import
|
import
|
||||||
options, tables,
|
options, tables,
|
||||||
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
|
chronicles, chronos, ranges/bitranges,
|
||||||
spec/[datatypes, crypto, digest],
|
spec/[datatypes, crypto, digest],
|
||||||
beacon_node, beacon_chain_db, block_pool, time, ssz
|
beacon_node, eth2_network, beacon_chain_db, block_pool, time, ssz
|
||||||
|
|
||||||
type
|
type
|
||||||
ValidatorChangeLogEntry* = object
|
ValidatorChangeLogEntry* = object
|
||||||
|
@ -58,7 +58,6 @@ proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], head
|
||||||
|
|
||||||
info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len
|
info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len
|
||||||
|
|
||||||
|
|
||||||
p2pProtocol BeaconSync(version = 1,
|
p2pProtocol BeaconSync(version = 1,
|
||||||
shortName = "bcs",
|
shortName = "bcs",
|
||||||
networkState = BeaconSyncState):
|
networkState = BeaconSyncState):
|
||||||
|
@ -75,10 +74,9 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
bestRoot: Eth2Digest # TODO
|
bestRoot: Eth2Digest # TODO
|
||||||
bestSlot: uint64 = node.state.data.slot
|
bestSlot: uint64 = node.state.data.slot
|
||||||
|
|
||||||
await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch,
|
let m = await handshake(peer, timeout = 500,
|
||||||
bestRoot, bestSlot)
|
status(networkId, latestFinalizedRoot,
|
||||||
|
latestFinalizedEpoch, bestRoot, bestSlot))
|
||||||
let m = await peer.nextMsg(BeaconSync.status)
|
|
||||||
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
|
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
|
||||||
if bestDiff == 0:
|
if bestDiff == 0:
|
||||||
# Nothing to do?
|
# Nothing to do?
|
||||||
|
@ -109,13 +107,25 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
|
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
|
||||||
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)
|
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)
|
||||||
|
|
||||||
proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest,
|
proc status(
|
||||||
latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64)
|
peer: Peer,
|
||||||
|
networkId: int,
|
||||||
|
latestFinalizedRoot: Eth2Digest,
|
||||||
|
latestFinalizedEpoch: uint64,
|
||||||
|
bestRoot: Eth2Digest,
|
||||||
|
bestSlot: uint64) {.libp2pProtocol("hello", "1.0.0").}
|
||||||
|
|
||||||
proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)])
|
proc beaconBlockRoots(
|
||||||
|
peer: Peer,
|
||||||
|
roots: openarray[(Eth2Digest, uint64)]) {.libp2pProtocol("rpc/beacon_block_roots", "1.0.0").}
|
||||||
|
|
||||||
requestResponse:
|
requestResponse:
|
||||||
proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) =
|
proc getBeaconBlockHeaders(
|
||||||
|
peer: Peer,
|
||||||
|
blockRoot: Eth2Digest,
|
||||||
|
slot: uint64,
|
||||||
|
maxHeaders: int,
|
||||||
|
skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} =
|
||||||
# TODO: validate maxHeaders and implement slipSlots
|
# TODO: validate maxHeaders and implement slipSlots
|
||||||
var s = slot
|
var s = slot
|
||||||
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
|
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
|
||||||
|
@ -126,95 +136,25 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
headers.add(db.getBlock(r).get().toHeader)
|
headers.add(db.getBlock(r).get().toHeader)
|
||||||
if headers.len == maxHeaders: break
|
if headers.len == maxHeaders: break
|
||||||
inc s
|
inc s
|
||||||
await peer.beaconBlockHeaders(reqId, headers)
|
await response.send(headers)
|
||||||
|
|
||||||
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])
|
proc beaconBlockHeaders(
|
||||||
|
peer: Peer,
|
||||||
|
blockHeaders: openarray[BeaconBlockHeader])
|
||||||
|
|
||||||
requestResponse:
|
requestResponse:
|
||||||
proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) =
|
proc getBeaconBlockBodies(
|
||||||
|
peer: Peer,
|
||||||
|
blockRoots: openarray[Eth2Digest]) {.libp2pProtocol("rpc/beacon_block_bodies", "1.0.0").} =
|
||||||
# TODO: Validate blockRoots.len
|
# TODO: Validate blockRoots.len
|
||||||
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
|
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
|
||||||
let db = peer.networkState.db
|
let db = peer.networkState.db
|
||||||
for r in blockRoots:
|
for r in blockRoots:
|
||||||
if (let blk = db.getBlock(r); blk.isSome):
|
if (let blk = db.getBlock(r); blk.isSome):
|
||||||
bodies.add(blk.get().body)
|
bodies.add(blk.get().body)
|
||||||
await peer.beaconBlockBodies(reqId, bodies)
|
await response.send(bodies)
|
||||||
|
|
||||||
proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody])
|
proc beaconBlockBodies(
|
||||||
|
peer: Peer,
|
||||||
|
blockBodies: openarray[BeaconBlockBody])
|
||||||
requestResponse:
|
|
||||||
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) =
|
|
||||||
var bb: BeaconBlock
|
|
||||||
var bs: BeaconState
|
|
||||||
# TODO: get the changelog from the DB.
|
|
||||||
await peer.validatorChangeLog(reqId, bb, bs, [], [], @[])
|
|
||||||
|
|
||||||
proc validatorChangeLog(peer: Peer,
|
|
||||||
signedBlock: BeaconBlock,
|
|
||||||
beaconState: BeaconState,
|
|
||||||
added: openarray[ValidatorPubKey],
|
|
||||||
removed: openarray[uint32],
|
|
||||||
order: seq[byte])
|
|
||||||
|
|
||||||
type
|
|
||||||
# A bit shorter names for convenience
|
|
||||||
ChangeLog = BeaconSync.validatorChangeLog
|
|
||||||
ChangeLogEntry = ValidatorChangeLogEntry
|
|
||||||
|
|
||||||
func validate*(log: ChangeLog): bool =
|
|
||||||
# TODO:
|
|
||||||
# Assert that the number of raised bits in log.order (a.k.a population count)
|
|
||||||
# matches the number of elements in log.added
|
|
||||||
# https://en.wikichip.org/wiki/population_count
|
|
||||||
return true
|
|
||||||
|
|
||||||
iterator changes*(log: ChangeLog): ChangeLogEntry =
|
|
||||||
var
|
|
||||||
bits = log.added.len + log.removed.len
|
|
||||||
addedIdx = 0
|
|
||||||
removedIdx = 0
|
|
||||||
|
|
||||||
template nextItem(collection): auto =
|
|
||||||
let idx = `collection Idx`
|
|
||||||
inc `collection Idx`
|
|
||||||
log.collection[idx]
|
|
||||||
|
|
||||||
for i in 0 ..< bits:
|
|
||||||
yield if log.order.getBit(i):
|
|
||||||
ChangeLogEntry(kind: Activation, pubkey: nextItem(added))
|
|
||||||
else:
|
|
||||||
ChangeLogEntry(kind: ValidatorSetDeltaFlags.Exit, index: nextItem(removed))
|
|
||||||
|
|
||||||
proc getValidatorChangeLog*(node: EthereumNode, changeLogHead: Eth2Digest):
|
|
||||||
Future[(Peer, ChangeLog)] {.async.} =
|
|
||||||
while true:
|
|
||||||
let peer = node.randomPeerWith(BeaconSync)
|
|
||||||
if peer == nil: return
|
|
||||||
|
|
||||||
let res = await peer.getValidatorChangeLog(changeLogHead, timeout = 1)
|
|
||||||
if res.isSome:
|
|
||||||
return (peer, res.get)
|
|
||||||
|
|
||||||
proc applyValidatorChangeLog*(log: ChangeLog,
|
|
||||||
outBeaconState: var BeaconState): bool =
|
|
||||||
# TODO:
|
|
||||||
#
|
|
||||||
# 1. Validate that the signedBlock state root hash matches the
|
|
||||||
# provided beaconState
|
|
||||||
#
|
|
||||||
# 2. Validate that the applied changelog produces the correct
|
|
||||||
# new change log head
|
|
||||||
#
|
|
||||||
# 3. Check that enough signatures from the known validator set
|
|
||||||
# are present
|
|
||||||
#
|
|
||||||
# 4. Apply all changes to the validator set
|
|
||||||
#
|
|
||||||
|
|
||||||
outBeaconState.finalized_epoch =
|
|
||||||
log.signedBlock.slot div SLOTS_PER_EPOCH
|
|
||||||
|
|
||||||
outBeaconState.validator_registry_delta_chain_tip =
|
|
||||||
log.beaconState.validator_registry_delta_chain_tip
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
const
|
||||||
|
versionMajor* = 0
|
||||||
|
versionMinor* = 1
|
||||||
|
versionBuild* = 10
|
||||||
|
|
||||||
|
template versionAsStr*: string =
|
||||||
|
$versionMajor & "." & $versionMinor & "." & $versionBuild
|
||||||
|
|
2
nim.cfg
2
nim.cfg
|
@ -1,5 +1,7 @@
|
||||||
# https://github.com/nim-lang/Nim/issues/8294#issuecomment-454556051
|
# https://github.com/nim-lang/Nim/issues/8294#issuecomment-454556051
|
||||||
@if windows:
|
@if windows:
|
||||||
-d:"chronicles_colors=NoColors"
|
-d:"chronicles_colors=NoColors"
|
||||||
|
@else
|
||||||
|
# -d:withLibP2P
|
||||||
@end
|
@end
|
||||||
|
|
||||||
|
|
|
@ -6,8 +6,9 @@
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
unittest, nimcrypto, eth/common, sequtils, options, blscurve,
|
unittest, sequtils, options,
|
||||||
../beacon_chain/ssz, ../beacon_chain/spec/datatypes
|
nimcrypto, eth/common, blscurve, serialization/testing/generic_suite,
|
||||||
|
../beacon_chain/ssz, ../beacon_chain/spec/[datatypes, digest]
|
||||||
|
|
||||||
func filled[N: static[int], T](typ: type array[N, T], value: T): array[N, T] =
|
func filled[N: static[int], T](typ: type array[N, T], value: T): array[N, T] =
|
||||||
for val in result.mitems:
|
for val in result.mitems:
|
||||||
|
@ -35,8 +36,7 @@ suite "Simple serialization":
|
||||||
f2: EthAddress.filled(byte 35),
|
f2: EthAddress.filled(byte 35),
|
||||||
f3: MDigest[256].filled(byte 35),
|
f3: MDigest[256].filled(byte 35),
|
||||||
f4: @[byte 'c'.ord, 'o'.ord, 'w'.ord],
|
f4: @[byte 'c'.ord, 'o'.ord, 'w'.ord],
|
||||||
f5: ValidatorIndex(79)
|
f5: ValidatorIndex(79))
|
||||||
)
|
|
||||||
|
|
||||||
var expected_ser = @[
|
var expected_ser = @[
|
||||||
byte 67, 0, 0, 0, # length
|
byte 67, 0, 0, 0, # length
|
||||||
|
@ -49,58 +49,33 @@ suite "Simple serialization":
|
||||||
expected_ser &= [byte 79, 0, 0]
|
expected_ser &= [byte 79, 0, 0]
|
||||||
|
|
||||||
test "Object deserialization":
|
test "Object deserialization":
|
||||||
let deser = expected_ser.deserialize(Foo).get()
|
let deser = SSZ.decode(expected_ser, Foo)
|
||||||
check: expected_deser == deser
|
check: expected_deser == deser
|
||||||
|
|
||||||
test "Object serialization":
|
test "Object serialization":
|
||||||
let ser = expected_deser.serialize()
|
let ser = SSZ.encode(expected_deser)
|
||||||
check: expected_ser == ser
|
check: expected_ser == ser
|
||||||
|
|
||||||
test "Not enough data":
|
test "Not enough data":
|
||||||
check:
|
expect SerializationError:
|
||||||
expected_ser[0..^2].deserialize(Foo).isNone()
|
let x = SSZ.decode(expected_ser[0..^2], Foo)
|
||||||
expected_ser[1..^1].deserialize(Foo).isNone()
|
|
||||||
|
expect SerializationError:
|
||||||
|
let x = SSZ.decode(expected_ser[1..^1], Foo)
|
||||||
|
|
||||||
test "ValidatorIndex roundtrip":
|
test "ValidatorIndex roundtrip":
|
||||||
# https://github.com/nim-lang/Nim/issues/10027
|
# https://github.com/nim-lang/Nim/issues/10027
|
||||||
let v = 79.ValidatorIndex
|
let v = 79.ValidatorIndex
|
||||||
let ser = v.serialize()
|
let ser = SSZ.encode(v)
|
||||||
check:
|
check:
|
||||||
ser.len() == 3
|
ser.len() == 3
|
||||||
deserialize(ser, type(v)).get() == v
|
SSZ.decode(ser, v.type) == v
|
||||||
|
|
||||||
test "Array roundtrip":
|
SSZ.roundripTest [1, 2, 3]
|
||||||
let v = [1, 2, 3]
|
SSZ.roundripTest @[1, 2, 3]
|
||||||
let ser = v.serialize()
|
SSZ.roundripTest SigKey.random().getKey()
|
||||||
check:
|
SSZ.roundripTest BeaconBlock(slot: 42, signature: sign(SigKey.random(), 0'u64, ""))
|
||||||
deserialize(ser, type(v)).get() == v
|
SSZ.roundripTest BeaconState(slot: 42)
|
||||||
|
|
||||||
test "Seq roundtrip":
|
|
||||||
let v = @[1, 2, 3]
|
|
||||||
let ser = v.serialize()
|
|
||||||
|
|
||||||
check:
|
|
||||||
deserialize(ser, type(v)).get() == v
|
|
||||||
|
|
||||||
test "Key roundtrip":
|
|
||||||
let v = SigKey.random().getKey()
|
|
||||||
let ser = v.serialize()
|
|
||||||
|
|
||||||
check:
|
|
||||||
deserialize(ser, type(v)).get() == v
|
|
||||||
|
|
||||||
# Just to see that we can serialize stuff at all
|
|
||||||
test "Roundtrip main eth2 types":
|
|
||||||
let
|
|
||||||
bb = BeaconBlock(
|
|
||||||
slot: 42,
|
|
||||||
signature: sign(SigKey.random(), 0'u64, "")
|
|
||||||
)
|
|
||||||
bs = BeaconState(slot: 42)
|
|
||||||
|
|
||||||
check:
|
|
||||||
bb.serialize().deserialize(BeaconBlock).get() == bb
|
|
||||||
bs.serialize().deserialize(BeaconState).get() == bs
|
|
||||||
|
|
||||||
suite "Tree hashing":
|
suite "Tree hashing":
|
||||||
# TODO Nothing but smoke tests for now..
|
# TODO Nothing but smoke tests for now..
|
||||||
|
@ -117,3 +92,4 @@ suite "Tree hashing":
|
||||||
test "Hash integer":
|
test "Hash integer":
|
||||||
check: hash_tree_root(0x01'u32) == [1'u8, 0, 0, 0] # little endian!
|
check: hash_tree_root(0x01'u32) == [1'u8, 0, 0, 0] # little endian!
|
||||||
check: hash_tree_root(ValidatorIndex(0x01)) == [1'u8, 0, 0] # little endian!
|
check: hash_tree_root(ValidatorIndex(0x01)) == [1'u8, 0, 0] # little endian!
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue