Mechanically remove all mentions of the daemon from the code

This commit is contained in:
Zahary Karadjov 2020-03-22 22:54:47 +02:00 committed by zah
parent 8300cee131
commit 0c018cb68a
19 changed files with 167 additions and 990 deletions

View File

@ -37,7 +37,7 @@ build_script:
test_script:
# the "go-checks" target fails in AppVeyor, for some reason; easier to disable than to debug
- mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_GO_CHECKS=1 P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE
- mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_GO_CHECKS=1 P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE NIMFLAGS="-d:NETWORK_TYPE=libp2p -d:testnet_servers_image"
- mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_GO_CHECKS=1 P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE NIMFLAGS="-d:testnet_servers_image"
- mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_TEST_FIXTURES_SCRIPT=1 DISABLE_GO_CHECKS=1 test
deploy: off

View File

@ -50,6 +50,6 @@ script:
# Building Nim-1.0.4 takes up to 10 minutes on Travis - the time limit after which jobs are cancelled for having no output
- make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC}" V=1 update # to allow a newer Nim version to be detected
- make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC}" LOG_LEVEL=TRACE
- make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC} -d:NETWORK_TYPE=libp2p -d:testnet_servers_image" LOG_LEVEL=TRACE
- make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC} -d:testnet_servers_image" LOG_LEVEL=TRACE
- make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC}" DISABLE_TEST_FIXTURES_SCRIPT=1 test

2
Jenkinsfile vendored
View File

@ -22,7 +22,7 @@ def runStages() {
"tools": {
stage("Tools") {
sh "make -j${env.NPROC}"
sh "make -j${env.NPROC} LOG_LEVEL=TRACE NIMFLAGS='-d:NETWORK_TYPE=libp2p -d:testnet_servers_image'"
sh "make -j${env.NPROC} LOG_LEVEL=TRACE NIMFLAGS='-d:testnet_servers_image'"
}
},
"test suite": {

View File

@ -72,7 +72,7 @@ jobs:
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} CI_CACHE=NimBinaries update
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} fetch-dlls
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE NIMFLAGS="-d:NETWORK_TYPE=libp2p -d:testnet_servers_image"
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} P2PD_CACHE=p2pdCache LOG_LEVEL=TRACE NIMFLAGS="-d:testnet_servers_image"
file build/beacon_node
mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} DISABLE_TEST_FIXTURES_SCRIPT=1 test
displayName: 'build and test'

View File

@ -193,12 +193,6 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
for node in conf.bootstrapNodes: addBootstrapNode(node, bootNodes, bootEnrs, ourPubKey)
loadBootstrapFile(string conf.bootstrapNodesFile, bootNodes, bootEnrs, ourPubKey)
when networkBackend == libp2pDaemon:
for enr in bootEnrs:
let enode = toENode(enr)
if enode.isOk:
bootNodes.add enode.value
let persistentBootstrapFile = conf.dataDir / "bootstrap_nodes.txt"
if fileExists(persistentBootstrapFile):
loadBootstrapFile(persistentBootstrapFile, bootNodes, bootEnrs, ourPubKey)
@ -924,16 +918,9 @@ proc installBeaconApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
return StringOfJson("null")
rpcServer.rpc("getNetworkPeerId") do () -> string:
when networkBackend != libp2p:
raise newException(CatchableError, "Unsupported operation")
else:
return $publicKey(node.network)
return $publicKey(node.network)
rpcServer.rpc("getNetworkPeers") do () -> seq[string]:
when networkBackend != libp2p:
if true:
raise newException(CatchableError, "Unsupported operation")
for peerId, peer in node.network.peerPool:
result.add $peerId

View File

@ -63,244 +63,155 @@ proc setupNat(conf: BeaconNodeConf): tuple[ip: IpAddress,
if extPorts.isSome:
(result.tcpPort, result.udpPort) = extPorts.get()
when networkBackend in [libp2p, libp2pDaemon]:
import
os, random,
stew/io, eth/async_utils,
libp2p/[multiaddress, multicodec],
ssz
import
os, random,
stew/io, eth/async_utils,
libp2p/[multiaddress, multicodec],
ssz
export
multiaddress
export
multiaddress
when networkBackend == libp2p:
import
libp2p/standard_setup, libp2p_backend, libp2p/peerinfo, peer_pool
import
libp2p/standard_setup, libp2p_backend, libp2p/peerinfo, peer_pool
export
libp2p_backend, peer_pool, peerinfo
export
libp2p_backend, peer_pool, peerinfo
const
netBackendName* = "libp2p"
networkKeyFilename = "privkey.protobuf"
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: key)
func asEthKey*(key: PrivateKey): keys.PrivateKey =
keys.PrivateKey(data: key.skkey.data)
proc initAddress*(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str)
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
result = address
else:
import
libp2p/daemon/daemonapi, libp2p_daemon_backend
raise newException(MultiAddressError,
"Invalid bootstrap node multi-address")
export
libp2p_daemon_backend
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, Protocol.IPPROTO_TCP, port)
var mainDaemon: DaemonAPI
proc ensureNetworkIdFile(conf: BeaconNodeConf): string =
result = conf.dataDir / networkKeyFilename
if not fileExists(result):
createDir conf.dataDir.string
let pk = PrivateKey.random(Secp256k1)
writeFile(result, pk.getBytes)
proc closeDaemon() {.noconv.} =
if mainDaemon != nil:
info "Shutting down the LibP2P daemon"
waitFor mainDaemon.close()
proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
let privKeyPath = conf.dataDir / networkKeyFilename
var privKey: PrivateKey
if not fileExists(privKeyPath):
createDir conf.dataDir.string
privKey = PrivateKey.random(Secp256k1)
writeFile(privKeyPath, privKey.getBytes())
else:
let keyBytes = readFile(privKeyPath)
privKey = PrivateKey.init(keyBytes.toOpenArrayByte(0, keyBytes.high))
addQuitProc(closeDaemon)
KeyPair(seckey: privKey, pubkey: privKey.getKey())
const
netBackendName* = "libp2p"
networkKeyFilename = "privkey.protobuf"
proc createEth2Node*(conf: BeaconNodeConf,
bootstrapNodes: seq[ENode]): Future[Eth2Node] {.async.} =
var
(extIp, extTcpPort, _) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort)
announcedAddresses = if extIp == globalListeningAddr: @[]
else: @[tcpEndPoint(extIp, extTcpPort)]
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: key)
info "Initializing networking", hostAddress,
announcedAddresses,
bootstrapNodes
func asEthKey*(key: PrivateKey): keys.PrivateKey =
keys.PrivateKey(data: key.skkey.data)
let keys = conf.getPersistentNetKeys
# TODO nim-libp2p still doesn't have support for announcing addresses
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = false)
result = Eth2Node.init(conf, switch, extIp, keys.seckey.asEthKey)
proc initAddress*(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str)
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
result = address
else:
raise newException(MultiAddressError,
"Invalid bootstrap node multi-address")
proc getPersistenBootstrapAddr*(conf: BeaconNodeConf,
ip: IpAddress, port: Port): ENode =
let pair = getPersistentNetKeys(conf)
initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port))
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, Protocol.IPPROTO_TCP, port)
proc shortForm*(id: KeyPair): string =
$PeerID.init(id.pubkey)
proc ensureNetworkIdFile(conf: BeaconNodeConf): string =
result = conf.dataDir / networkKeyFilename
if not fileExists(result):
createDir conf.dataDir.string
let pk = PrivateKey.random(Secp256k1)
writeFile(result, pk.getBytes)
proc toPeerInfo(enode: ENode): PeerInfo =
let
peerId = PeerID.init enode.pubkey.asLibp2pKey
addresses = @[MultiAddress.init enode.toMultiAddressStr]
return PeerInfo.init(peerId, addresses)
proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
let privKeyPath = conf.dataDir / networkKeyFilename
var privKey: PrivateKey
if not fileExists(privKeyPath):
createDir conf.dataDir.string
privKey = PrivateKey.random(Secp256k1)
writeFile(privKeyPath, privKey.getBytes())
else:
let keyBytes = readFile(privKeyPath)
privKey = PrivateKey.init(keyBytes.toOpenArrayByte(0, keyBytes.high))
proc connectToNetwork*(node: Eth2Node,
bootstrapNodes: seq[ENode],
bootstrapEnrs: seq[enr.Record]) {.async.} =
for bootstrapNode in bootstrapEnrs:
debug "Adding known peer", peer = bootstrapNode
node.addKnownPeer bootstrapNode
KeyPair(seckey: privKey, pubkey: privKey.getKey())
await node.start()
proc createEth2Node*(conf: BeaconNodeConf,
bootstrapNodes: seq[ENode]): Future[Eth2Node] {.async.} =
var
(extIp, extTcpPort, _) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort)
announcedAddresses = if extIp == globalListeningAddr: @[]
else: @[tcpEndPoint(extIp, extTcpPort)]
proc checkIfConnectedToBootstrapNode {.async.} =
await sleepAsync(30.seconds)
if bootstrapEnrs.len > 0 and libp2p_successful_dials.value == 0:
fatal "Failed to connect to any bootstrap node. Quitting", bootstrapEnrs
quit 1
info "Initializing networking", hostAddress,
announcedAddresses,
bootstrapNodes
traceAsyncErrors checkIfConnectedToBootstrapNode()
when networkBackend == libp2p:
let keys = conf.getPersistentNetKeys
# TODO nim-libp2p still doesn't have support for announcing addresses
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = false)
result = Eth2Node.init(conf, switch, extIp, keys.seckey.asEthKey)
else:
let keyFile = conf.ensureNetworkIdFile
proc saveConnectionAddressFile*(node: Eth2Node, filename: string) =
writeFile(filename, $node.switch.peerInfo.addrs[0] & "/p2p/" &
node.switch.peerInfo.id)
var daemonFut = if bootstrapNodes.len == 0:
newDaemonApi({PSNoSign, DHTFull, PSFloodSub},
id = keyFile,
hostAddresses = @[hostAddress],
announcedAddresses = announcedAddresses)
else:
newDaemonApi({PSNoSign, DHTFull, PSFloodSub, WaitBootstrap},
id = keyFile,
hostAddresses = @[hostAddress],
announcedAddresses = announcedAddresses,
bootstrapNodes = mapIt(bootstrapNodes, it.toMultiAddressStr),
peersRequired = 1)
func peersCount*(node: Eth2Node): int =
len(node.peerPool)
mainDaemon = await daemonFut
proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async, gcsafe.} =
template execMsgHandler(peerExpr, gossipBytes, gossipTopic) =
inc gossip_messages_received
trace "Incoming pubsub message received",
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
message_id = `$`(sha256.digest(gossipBytes))
msgHandler SSZ.decode(gossipBytes, MsgType)
var identity = await mainDaemon.identity()
info "LibP2P daemon started", peer = identity.peer.pretty(),
addresses = identity.addresses
let incomingMsgHandler = proc(topic: string,
data: seq[byte]) {.async, gcsafe.} =
execMsgHandler "unknown", data, topic
result = await Eth2Node.init(mainDaemon)
await node.switch.subscribe(topic, incomingMsgHandler)
proc getPersistenBootstrapAddr*(conf: BeaconNodeConf,
ip: IpAddress, port: Port): ENode =
let pair = getPersistentNetKeys(conf)
initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port))
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
fut.addCallback do (arg: pointer):
if not(fut.failed):
trace "Outgoing pubsub message has been sent", message_id = `$`(digest)
proc shortForm*(id: KeyPair): string =
$PeerID.init(id.pubkey)
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
inc gossip_messages_sent
let broadcastBytes = SSZ.encode(msg)
var fut = node.switch.publish(topic, broadcastBytes)
traceMessage(fut, sha256.digest(broadcastBytes))
traceAsyncErrors(fut)
proc toPeerInfo(enode: ENode): PeerInfo =
let
peerId = PeerID.init enode.pubkey.asLibp2pKey
addresses = @[MultiAddress.init enode.toMultiAddressStr]
when networkBackend == libp2p:
return PeerInfo.init(peerId, addresses)
else:
return PeerInfo(peer: peerId, addresses: addresses)
# TODO:
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
# The filtering is not implemented properly yet.
iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer =
var peers = newSeq[Peer]()
for _, peer in pairs(node.peers): peers.add peer
shuffle peers
if peers.len > maxPeers: peers.setLen(maxPeers)
for p in peers: yield p
proc connectToNetwork*(node: Eth2Node,
bootstrapNodes: seq[ENode],
bootstrapEnrs: seq[enr.Record]) {.async.} =
when networkBackend == libp2pDaemon:
var connected = false
for bootstrapNode in bootstrapNodes:
try:
let peerInfo = toPeerInfo(bootstrapNode)
when networkBackend == libp2p:
discard await node.switch.dial(peerInfo)
else:
await node.daemon.connect(peerInfo.peer, peerInfo.addresses)
var peer = node.getPeer(peerInfo)
peer.wasDialed = true
await initializeConnection(peer)
connected = true
except CatchableError as err:
error "Failed to connect to bootstrap node",
node = bootstrapNode, err = err.msg
if bootstrapNodes.len > 0 and connected == false:
fatal "Failed to connect to any bootstrap node. Quitting."
quit 1
elif networkBackend == libp2p:
for bootstrapNode in bootstrapEnrs:
debug "Adding known peer", peer = bootstrapNode
node.addKnownPeer bootstrapNode
await node.start()
proc checkIfConnectedToBootstrapNode {.async.} =
await sleepAsync(30.seconds)
if bootstrapEnrs.len > 0 and libp2p_successful_dials.value == 0:
fatal "Failed to connect to any bootstrap node. Quitting", bootstrapEnrs
quit 1
traceAsyncErrors checkIfConnectedToBootstrapNode()
proc saveConnectionAddressFile*(node: Eth2Node, filename: string) =
when networkBackend == libp2p:
writeFile(filename, $node.switch.peerInfo.addrs[0] & "/p2p/" &
node.switch.peerInfo.id)
else:
let id = waitFor node.daemon.identity()
writeFile(filename, $id.addresses[0] & "/p2p/" & id.peer.pretty)
func peersCount*(node: Eth2Node): int =
when networkBackend == libp2p:
len(node.peerPool)
else:
node.peers.len
proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async, gcsafe.} =
template execMsgHandler(peerExpr, gossipBytes, gossipTopic) =
inc gossip_messages_received
trace "Incoming pubsub message received",
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
message_id = `$`(sha256.digest(gossipBytes))
msgHandler SSZ.decode(gossipBytes, MsgType)
when networkBackend == libp2p:
let incomingMsgHandler = proc(topic: string,
data: seq[byte]) {.async, gcsafe.} =
execMsgHandler "unknown", data, topic
await node.switch.subscribe(topic, incomingMsgHandler)
else:
let incomingMsgHandler = proc(api: DaemonAPI,
ticket: PubsubTicket,
msg: PubSubMessage): Future[bool] {.async, gcsafe.} =
execMsgHandler msg.peer, msg.data, msg.topics[0]
return true
discard await node.daemon.pubsubSubscribe(topic, incomingMsgHandler)
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
fut.addCallback do (arg: pointer):
if not(fut.failed):
trace "Outgoing pubsub message has been sent", message_id = `$`(digest)
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
inc gossip_messages_sent
let broadcastBytes = SSZ.encode(msg)
when networkBackend == libp2p:
var fut = node.switch.publish(topic, broadcastBytes)
traceMessage(fut, sha256.digest(broadcastBytes))
traceAsyncErrors(fut)
else:
var fut = node.daemon.pubsubPublish(topic, broadcastBytes)
traceMessage(fut, sha256.digest(broadcastBytes))
traceAsyncErrors(fut)
# TODO:
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
# The filtering is not implemented properly yet.
iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer =
var peers = newSeq[Peer]()
for _, peer in pairs(node.peers): peers.add peer
shuffle peers
if peers.len > maxPeers: peers.setLen(maxPeers)
for p in peers: yield p
else:
{.fatal: "Unsupported network backend".}

View File

@ -1,407 +0,0 @@
import
metrics
type
ResponseCode* = enum
Success
InvalidRequest
ServerError
Bytes = seq[byte]
const
defaultIncomingReqTimeout = 5000
HandshakeTimeout = FaultOrError
# Spec constants
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains
REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
TTFB_TIMEOUT* = 5.seconds
RESP_TIMEOUT* = 10.seconds
readTimeoutErrorMsg = "Exceeded read timeout for a request"
logScope:
topics = "libp2p"
declarePublicGauge libp2p_successful_dials,
"Number of successfully dialed peers"
declarePublicGauge libp2p_peers,
"Number of active libp2p peers"
template libp2pProtocol*(name: string, version: int) {.pragma.}
proc getRequestProtoName(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/ssz")
return newLit("")
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(stream: P2PStream,
deadline: Future[void]): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = stream.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
trace "size prefix byte not received in time"
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(REQ_RESP_MAX_SIZE):
trace "size prefix outside of range", res
return -1
else:
trace "got size prefix", res
return int(res)
of Overflow:
trace "size prefix overflow"
return -1
of Incomplete:
continue
proc readMsgBytes(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
trace "about to read message bytes", withResponseCode
try:
if withResponseCode:
var responseCode: byte
trace "about to read response code"
var readResponseCode = stream.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished:
trace "response code not received in time"
return
if responseCode > ResponseCode.high.byte:
trace "invalid response code", responseCode
return
logScope: responseCode = ResponseCode(responseCode)
trace "got response code"
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await readChunk(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
of Success:
# The response is OK, the execution continues below
discard
var sizePrefix = await readSizePrefix(stream, deadline)
trace "got msg size prefix", sizePrefix
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
return
trace "about to read msg bytes", len = sizePrefix
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished:
trace "msg bytes not received in time"
return
trace "got message bytes", len = sizePrefix
return msgBytes
except TransportIncompleteError:
return @[]
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
except SerializationError as err:
debug "Failed to decode a network message",
msgBytes, errMsg = err.formatMsg("<msg>")
return
proc readResponse(
stream: P2PStream,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await readChunk(stream, E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await readChunk(stream, MsgType, true, deadline)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
s.append byte(responseCode)
s.appendVarint errMsg.len
s.appendValue SSZ, errMsg
s.getOutput
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request", peer, responseCode, errMsg
let responseBytes = encodeErrorMsg(ServerError, errMsg)
await stream.writeAllBytes(responseBytes)
await stream.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
var streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
defer:
await safeClose(stream)
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# TODO There is too much duplication in the responder functions, but
# I hope to reduce this when I increse the reliance on output streams.
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream
for chunk in chunks:
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(chunk)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg)
let stream = streamFut.read
defer:
await safeClose(stream)
# Send the request
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
# Read the response
return await stream.readResponse(ResponseMsg, deadline)
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
T(UntypedResponder(peer: peer, stream: stream))
template write*[M](r: var Responder[M], val: auto): auto =
mixin send
type Msg = M
type MsgRec = RecType(Msg)
when MsgRec is seq|openarray:
type E = ElemType(MsgRec)
when val is E:
sendResponseChunkObj(UntypedResponder(r), val)
elif val is MsgRec:
sendResponseChunks(UntypedResponder(r), val)
else:
{.fatal: "Unepected message type".}
else:
send(r, val)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
await all(subProtocolsHandshakes)
template initializeConnection*(peer: Peer): auto =
performProtocolHandshakes(peer)
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
result.name = name
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc implementSendProcBody(sendProc: SendProc) =
let
msg = sendProc.msg
UntypedResponder = bindSym "UntypedResponder"
await = ident "await"
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recName
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
else:
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
MsgType, Format: distinct type) {.async, gcsafe.} =
mixin callUserHandler, RecType
const msgName = typetraits.name(MsgType)
## Uncomment this to enable tracing on all incoming requests
## You can include `msgNameLit` in the condition to select
## more specific requests:
# when chronicles.runtimeFilteringEnabled:
# setLogLevel(LogLevel.TRACE)
# defer: setLogLevel(LogLevel.DEBUG)
# trace "incoming " & `msgNameLit` & " stream"
let peer = peerFromStream(network, stream)
handleIncomingPeer(peer)
defer:
await safeClose(stream)
let
deadline = sleepAsync RESP_TIMEOUT
msgBytes = await readMsgBytes(stream, false, deadline)
if msgBytes.len == 0:
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
return
type MsgRec = RecType(MsgType)
var msg: MsgRec
try:
msg = decode(Format, msgBytes, MsgRec)
except SerializationError as err:
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
await sendErrorResponse(peer, stream, ServerError, err.msg)
raise err
try:
logReceivedMsg(peer, MsgType(msg))
await callUserHandler(peer, stream, msg)
except CatchableError as err:
await sendErrorResponse(peer, stream, ServerError, err.msg)

View File

@ -1,266 +0,0 @@
import
algorithm, typetraits,
stew/varints, stew/shims/[macros, tables], chronos, chronicles,
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
json_serialization/std/options, eth/p2p/p2p_protocol_dsl,
libp2p_json_serialization, ssz
export
daemonapi, p2pProtocol, libp2p_json_serialization, ssz
type
Eth2Node* = ref object of RootObj
daemon*: DaemonAPI
peers*: Table[PeerID, Peer]
protocolStates*: seq[RootRef]
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
Peer* = ref object
network*: Eth2Node
id*: PeerID
wasDialed*: bool
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
ConnectionState* = enum
None,
Connecting,
Connected,
Disconnecting,
Disconnected
UntypedResponder = object
peer*: Peer
stream*: P2PStream
Responder*[MsgType] = distinct UntypedResponder
MessageInfo* = object
name*: string
# Private fields:
thunk*: ThunkProc
libp2pProtocol: string
printer*: MessageContentPrinter
nextMsgResolver*: NextMsgResolver
ProtocolInfoObj* = object
name*: string
messages*: seq[MessageInfo]
index*: int # the position of the protocol in the
# ordered list of supported protocols
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler
ProtocolInfo* = ptr ProtocolInfoObj
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
template `$`*(peer: Peer): string = $peer.id
chronicles.formatIt(Peer): $it
# TODO: These exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
#
proc writeAllBytes(stream: P2PStream, bytes: seq[byte]) {.async.} =
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver msg bytes")
template readExactly(stream: P2PStream, dst: pointer, dstLen: int): untyped =
readExactly(stream.transp, dst, dstLen)
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
openStream(node.daemon, peer.id, @[protocolId])
#
# End of compatibility layer
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.}
template remote*(peer: Peer): untyped =
# TODO: Can we get a proper address here?
peer.id
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer =
node.getPeer(peerInfo.peer)
proc peerFromStream(node: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
node.getPeer(stream.peer)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
await peer.network.daemon.disconnect(peer.id)
peer.connectionState = Disconnected
peer.network.peers.del(peer.id)
proc safeClose(stream: P2PStream) {.async.} =
if P2PStreamFlags.Closed notin stream.flags:
await close(stream)
proc handleIncomingPeer*(peer: Peer) {.inline.} =
discard
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
new result
result.daemon = daemon
result.daemon.userData = result
result.peers = initTable[PeerID, Peer]()
newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:
if proto.networkStateInitializer != nil:
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
for msg in proto.messages:
if msg.libp2pProtocol.len > 0 and msg.thunk != nil:
await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
new result
result.id = id
result.network = network
result.connectionState = Connected
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
newSeq result.protocolStates, allProtocols.len
for i in 0 ..< allProtocols.len:
let proto = allProtocols[i]
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc registerMsg(protocol: ProtocolInfo,
name: string,
thunk: ThunkProc,
libp2pProtocol: string,
printer: MessageContentPrinter) =
protocol.messages.add MessageInfo(name: name,
thunk: thunk,
libp2pProtocol: libp2pProtocol,
printer: printer)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
Responder = bindSym "Responder"
DaemonAPI = bindSym "DaemonAPI"
P2PStream = ident "P2PStream"
OutputStream = bindSym "OutputStream"
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
messagePrinter = bindSym "messagePrinter"
milliseconds = bindSym "milliseconds"
registerMsg = bindSym "registerMsg"
initProtocol = bindSym "initProtocol"
bindSymOp = bindSym "bindSym"
errVar = ident "err"
msgVar = ident "msg"
msgBytesVar = ident "msgBytes"
daemonVar = ident "daemon"
await = ident "await"
callUserHandler = ident "callUserHandler"
p.useRequestIds = false
p.useSingleRecordInlining = true
new result
result.PeerType = Peer
result.NetworkType = Eth2Node
result.registerProtocol = bindSym "registerProtocol"
result.setEventHandlers = bindSym "setEventHandlers"
result.SerializationFormat = Format
result.ResponderType = Responder
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
result.implementMsg = proc (msg: Message) =
let
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
MsgRecName = msg.recName
MsgStrongRecName = msg.strongRecName
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
msg.initResponderCall.add streamVar
##
## Implemenmt Thunk
##
var thunkName: NimNode
if msg.userHandler != nil:
thunkName = ident(msgName & "_thunk")
let userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
msg.defineThunk quote do:
template `callUserHandler`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`): Future[void] {.gcsafe.} =
return handleIncomingStream(`Eth2Node`(`daemonVar`.userData), `streamVar`,
`MsgStrongRecName`, `Format`)
else:
thunkName = newNilLit()
##
## Implement Senders and Handshake
##
if msg.kind == msgHandshake:
macros.error "Handshake messages are not supported in LibP2P protocols"
else:
var sendProc = msg.createSendProc()
implementSendProcBody sendProc
protocol.outProcRegistrations.add(
newCall(registerMsg,
protocol.protocolInfoVar,
msgNameLit,
thunkName,
getRequestProtoName(msg.procDef),
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)

View File

@ -33,56 +33,39 @@ proc fetchAncestorBlocksFromPeer(
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer
when networkBackend == libp2p:
proc fetchAncestorBlocksFromNetwork(
network: Eth2Node,
rec: FetchRecord,
responseHandler: FetchAncestorsResponseHandler) {.async.} =
var peer: Peer
try:
peer = await network.peerPool.acquire()
let blocks = await peer.beaconBlocksByRoot([rec.root])
if blocks.isSome:
for b in blocks.get:
responseHandler(b)
except CatchableError as err:
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer
finally:
if not(isNil(peer)):
network.peerPool.release(peer)
proc fetchAncestorBlocksFromNetwork(
network: Eth2Node,
rec: FetchRecord,
responseHandler: FetchAncestorsResponseHandler) {.async.} =
var peer: Peer
try:
peer = await network.peerPool.acquire()
let blocks = await peer.beaconBlocksByRoot([rec.root])
if blocks.isSome:
for b in blocks.get:
responseHandler(b)
except CatchableError as err:
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer
finally:
if not(isNil(peer)):
network.peerPool.release(peer)
proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) =
# TODO: we could have some fancier logic here:
#
# * Keeps track of what was requested
# (this would give a little bit of time for the asked peer to respond)
#
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
const ParallelRequests = 2
for i in 0 ..< ParallelRequests:
traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network,
roots.sample(),
responseHandler)
elif networkBackend == libp2pDaemon:
proc fetchAncestorBlocks*(requestManager: RequestManager,
proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) =
# TODO: we could have some fancier logic here:
#
# * Keeps track of what was requested
# (this would give a little bit of time for the asked peer to respond)
#
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
const ParallelRequests = 2
# TODO: we could have some fancier logic here:
#
# * Keeps track of what was requested
# (this would give a little bit of time for the asked peer to respond)
#
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
const ParallelRequests = 2
for i in 0 ..< ParallelRequests:
traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network,
roots.sample(),
responseHandler)
for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync):
traceAsyncErrors peer.fetchAncestorBlocksFromPeer(roots.sample(), responseHandler)

View File

@ -1,12 +1,9 @@
import
options, tables, sets, macros,
chronicles, chronos, stew/ranges/bitranges,
chronicles, chronos, stew/ranges/bitranges, libp2p/switch,
spec/[datatypes, crypto, digest, helpers],
beacon_node_types, eth2_network, block_pool, ssz
when networkBackend == libp2p:
import libp2p/switch
logScope:
topics = "sync"

View File

@ -1,15 +1,3 @@
type
NetworkBackendType* = enum
libp2p
libp2pDaemon
const
NETWORK_TYPE {.strdefine.} = "libp2p"
networkBackend* = when NETWORK_TYPE == "libp2p": libp2p
elif NETWORK_TYPE == "libp2p_daemon": libp2pDaemon
else: {.fatal: "The 'NETWORK_TYPE' should be either 'libp2p', 'libp2p_daemon'" .}
const
copyrights* = "Copyright (c) 2019 Status Research & Development GmbH"
@ -30,5 +18,5 @@ const
$versionMajor & "." & $versionMinor & "." & $versionBuild
fullVersionStr* =
versionAsStr & " (" & gitRevision & ", " & NETWORK_TYPE & ")"
versionAsStr & " (" & gitRevision & ")"

View File

@ -100,8 +100,6 @@ cli do (skipGoerliKey {.
rmDir dataDir
cd rootDir
if testnet == "testnet1":
nimFlags &= " -d:NETWORK_TYPE=libp2p"
exec &"""nim c {nimFlags} -d:"const_preset={preset}" -o:"{beaconNodeBinary}" beacon_chain/beacon_node.nim"""
mkDir dumpDir

View File

@ -16,7 +16,6 @@ add_var () {
}
add_var CONST_PRESET
add_var NETWORK_TYPE
add_var SLOTS_PER_EPOCH
add_var MAX_COMMITTEES_PER_SLOT

View File

@ -1,5 +1,4 @@
CONST_PRESET=minimal
NETWORK_TYPE=libp2p_daemon
QUICKSTART_VALIDATORS=8
RANDOM_VALIDATORS=120
BOOTSTRAP_PORT=9000

View File

@ -1,5 +1,4 @@
CONST_PRESET=minimal
NETWORK_TYPE=libp2p
QUICKSTART_VALIDATORS=8
RANDOM_VALIDATORS=120
BOOTSTRAP_PORT=9100

View File

@ -39,15 +39,10 @@ build_beacon_node () {
$MAKE NIMFLAGS="-o:$OUTPUT_BIN $PARAMS" LOG_LEVEL="${LOG_LEVEL:-DEBUG}" beacon_node
}
build_beacon_node $BEACON_NODE_BIN -d:"NETWORK_TYPE=$NETWORK_TYPE"
build_beacon_node $BEACON_NODE_BIN
if [[ "$BOOTSTRAP_NODE_NETWORK_TYPE" != "$NETWORK_TYPE" ]]; then
build_beacon_node $BOOTSTRAP_NODE_BIN \
--nimcache:nimcache/bootstrap_node \
-d:"NETWORK_TYPE=$BOOTSTRAP_NODE_NETWORK_TYPE"
else
cp $BEACON_NODE_BIN $BOOTSTRAP_NODE_BIN
fi
# DAEMON TODO: This copy is now unnecessary
cp $BEACON_NODE_BIN $BOOTSTRAP_NODE_BIN
if [ ! -f "${LAST_VALIDATOR}" ]; then
echo Building $DEPLOY_DEPOSIT_CONTRACT_BIN

View File

@ -25,11 +25,6 @@ TOTAL_USER_NODES=${USER_NODES:-0}
TOTAL_SYSTEM_NODES=$(( TOTAL_NODES - TOTAL_USER_NODES ))
MASTER_NODE=$(( TOTAL_NODES - 1 ))
# You can run a mixed simulation of daemon and native libp2p nodes
# by changing the variables below:
NETWORK_TYPE=${NETWORK_TYPE:-"libp2p"}
BOOTSTRAP_NODE_NETWORK_TYPE=${BOOTSTRAP_NODE_NETWORK_TYPE:-"libp2p"}
SIMULATION_DIR="${SIM_ROOT}/data"
METRICS_DIR="${SIM_ROOT}/prometheus"
VALIDATORS_DIR="${SIM_ROOT}/validators"

View File

@ -19,7 +19,7 @@ rm -rf ncli/nimcache
../env.sh nim c \
--cpu:i386 --os:linux --gc:none --threads:off \
-d:release -d:clang -d:emscripten -d:noSignalHandler -d:usemalloc \
--nimcache:ncli/nimcache -d:"network_type=none" \
--nimcache:ncli/nimcache \
-u:metrics \
-c ncli

View File

@ -1,2 +1 @@
-d:"network_type=none"
-u:metrics