Clean up some obsolete code

This commit is contained in:
Zahary Karadjov 2020-03-23 01:23:21 +02:00 committed by zah
parent d5e4e640b4
commit 4623aa81ec
6 changed files with 82 additions and 103 deletions

View File

@ -190,17 +190,15 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
var bootNodes: seq[ENode]
var bootEnrs: seq[enr.Record]
for node in conf.bootstrapNodes: addBootstrapNode(node, bootNodes, bootEnrs, ourPubKey)
for node in conf.bootstrapNodes:
addBootstrapNode(node, bootNodes, bootEnrs, ourPubKey)
loadBootstrapFile(string conf.bootstrapNodesFile, bootNodes, bootEnrs, ourPubKey)
let persistentBootstrapFile = conf.dataDir / "bootstrap_nodes.txt"
if fileExists(persistentBootstrapFile):
loadBootstrapFile(persistentBootstrapFile, bootNodes, bootEnrs, ourPubKey)
let
network = await createEth2Node(conf, bootNodes)
addressFile = string(conf.dataDir) / "beacon_node.address"
network.saveConnectionAddressFile(addressFile)
let network = await createEth2Node(conf)
let rpcServer = if conf.rpcEnabled:
RpcServer.init(conf.rpcAddress, conf.rpcPort)
@ -252,13 +250,12 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
return res
proc connectToNetwork(node: BeaconNode) {.async.} =
if node.bootstrapNodes.len > 0:
info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes
if node.bootstrapEnrs.len > 0:
info "Connecting to bootstrap nodes", bootstrapEnrs = node.bootstrapEnrs
else:
info "Waiting for connections"
await node.network.connectToNetwork(node.bootstrapNodes,
node.bootstrapEnrs)
await node.network.connectToNetwork(node.bootstrapEnrs)
template findIt(s: openarray, predicate: untyped): int =
var res = -1

View File

@ -38,7 +38,6 @@ type
PrivateKey* = crypto.PrivateKey
Bytes = seq[byte]
P2PStream = Connection
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
@ -70,7 +69,7 @@ type
UntypedResponder = object
peer*: Peer
stream*: P2PStream
stream*: Connection
Responder*[MsgType] = distinct UntypedResponder
@ -80,8 +79,6 @@ type
# Private fields:
libp2pCodecName: string
protocolMounter*: MounterProc
printer*: MessageContentPrinter
nextMsgResolver*: NextMsgResolver
ProtocolInfoObj* = object
name*: string
@ -104,12 +101,11 @@ type
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
HandshakeStep* = proc(peer: Peer, conn: Connection): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
DisconnectionReason* = enum
ClientShutDown
@ -148,10 +144,10 @@ declareCounter gossip_messages_received,
"Number of gossip messages received by this peer"
declarePublicGauge libp2p_successful_dials,
"Number of successfully dialed peers"
"Number of successfully dialed peers"
declarePublicGauge libp2p_peers,
"Number of active libp2p peers"
"Number of active libp2p peers"
template libp2pProtocol*(name: string, version: int) {.pragma.}
@ -161,21 +157,12 @@ chronicles.formatIt(Peer): $it
template remote*(peer: Peer): untyped =
peer.info.peerId
# TODO: This exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
#
template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped =
write(stream, bytes)
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
dial(node.switch, peer.info, protocolId)
proc peer(stream: P2PStream): PeerID =
func peerId(conn: Connection): PeerID =
# TODO: Can this be `nil`?
stream.peerInfo.peerId
#
# End of compatibility layer
conn.peerInfo.peerId
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
@ -183,11 +170,12 @@ proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
let peerId = peerInfo.peerId
result = node.peerPool.getOrDefault(peerId)
if result == nil:
# TODO: We should register this peer in the pool!
result = Peer.init(node, peerInfo)
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} =
# TODO: Can this be `nil`?
return network.getPeer(stream.peerInfo)
return network.getPeer(conn.peerInfo)
proc getKey*(peer: Peer): PeerID {.inline.} =
result = peer.info.peerId
@ -208,9 +196,9 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
peer.network.peerPool.release(peer)
peer.info.close()
proc safeClose(stream: P2PStream) {.async.} =
if not stream.closed:
await close(stream)
proc safeClose(conn: Connection) {.async.} =
if not conn.closed:
await close(conn)
proc handleIncomingPeer*(peer: Peer)
@ -243,18 +231,18 @@ proc disconnectAndRaise(peer: Peer,
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
proc readChunk(stream: P2PStream,
proc readChunk(conn: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(stream: P2PStream,
proc readSizePrefix(conn: Connection,
deadline: Future[void]): Future[int] {.async.} =
trace "about to read msg size prefix"
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = stream.readExactly(addr nextByte, 1)
var readNextByte = conn.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
trace "size prefix byte not received in time"
@ -274,7 +262,7 @@ proc readSizePrefix(stream: P2PStream,
of Incomplete:
continue
proc readMsgBytes(stream: P2PStream,
proc readMsgBytes(conn: Connection,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
trace "about to read message bytes", withResponseCode
@ -283,7 +271,7 @@ proc readMsgBytes(stream: P2PStream,
if withResponseCode:
var responseCode: byte
trace "about to read response code"
var readResponseCode = stream.readExactly(addr responseCode, 1)
var readResponseCode = conn.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished:
@ -299,7 +287,7 @@ proc readMsgBytes(stream: P2PStream,
case ResponseCode(responseCode)
of InvalidRequest, ServerError:
let responseErrMsg = await readChunk(stream, string, false, deadline)
let responseErrMsg = await conn.readChunk(string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
return
@ -307,20 +295,20 @@ proc readMsgBytes(stream: P2PStream,
# The response is OK, the execution continues below
discard
var sizePrefix = await readSizePrefix(stream, deadline)
var sizePrefix = await conn.readSizePrefix(deadline)
trace "got msg size prefix", sizePrefix
if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
debug "Failed to read an incoming message size prefix", peer = conn.peerId
return
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
debug "Received SSZ with zero size", peer = conn.peerId
return
trace "about to read msg bytes", len = sizePrefix
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
var readBody = conn.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished:
trace "msg bytes not received in time"
@ -332,11 +320,11 @@ proc readMsgBytes(stream: P2PStream,
except TransportIncompleteError:
return @[]
proc readChunk(stream: P2PStream,
proc readChunk(conn: Connection,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
var msgBytes = await conn.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
@ -346,7 +334,7 @@ proc readChunk(stream: P2PStream,
return
proc readResponse(
stream: P2PStream,
conn: Connection,
MsgType: type,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
@ -354,13 +342,13 @@ proc readResponse(
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await readChunk(stream, E, true, deadline)
let nextRes = await conn.readChunk(E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
if results.len > 0:
return some(results)
else:
return await readChunk(stream, MsgType, true, deadline)
return await conn.readChunk(MsgType, true, deadline)
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
@ -370,7 +358,7 @@ proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
s.getOutput
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
conn: Connection,
err: ref SerializationError,
msgName: string,
msgBytes: Bytes) {.async.} =
@ -378,18 +366,18 @@ proc sendErrorResponse(peer: Peer,
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
await stream.writeAllBytes(responseBytes)
await stream.close()
await conn.write(responseBytes)
await conn.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
conn: Connection,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request", peer, responseCode, errMsg
let responseBytes = encodeErrorMsg(ServerError, errMsg)
await stream.writeAllBytes(responseBytes)
await stream.close()
await conn.write(responseBytes)
await conn.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
@ -409,7 +397,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
await stream.write(bytes)
# TODO There is too much duplication in the responder functions, but
# I hope to reduce this when I increse the reliance on output streams.
@ -419,14 +407,14 @@ proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
await responder.stream.write(bytes)
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
await responder.stream.write(bytes)
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream
@ -435,7 +423,7 @@ proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.
s.appendValue SSZ, sizePrefixed(chunk)
let bytes = s.getOutput
await responder.stream.writeAllBytes(bytes)
await responder.stream.write(bytes)
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
@ -460,14 +448,14 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
await stream.writeAllBytes(bytes)
await stream.write(bytes)
# Read the response
return await stream.readResponse(ResponseMsg, deadline)
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
T(UntypedResponder(peer: peer, stream: stream))
peer: Peer, conn: Connection): T =
T(UntypedResponder(peer: peer, stream: conn))
template write*[M](r: var Responder[M], val: auto): auto =
mixin send
@ -539,7 +527,7 @@ proc implementSendProcBody(sendProc: SendProc) =
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
proc handleIncomingStream(network: Eth2Node, conn: Connection,
MsgType, Format: distinct type) {.async, gcsafe.} =
mixin callUserHandler, RecType
const msgName = typetraits.name(MsgType)
@ -550,21 +538,21 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
# when chronicles.runtimeFilteringEnabled:
# setLogLevel(LogLevel.TRACE)
# defer: setLogLevel(LogLevel.DEBUG)
# trace "incoming " & `msgNameLit` & " stream"
# trace "incoming " & `msgNameLit` & " conn"
let peer = peerFromStream(network, stream)
let peer = peerFromStream(network, conn)
handleIncomingPeer(peer)
defer:
await safeClose(stream)
await safeClose(conn)
let
deadline = sleepAsync RESP_TIMEOUT
msgBytes = await readMsgBytes(stream, false, deadline)
msgBytes = await readMsgBytes(conn, false, deadline)
if msgBytes.len == 0:
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg)
return
type MsgRec = RecType(MsgType)
@ -572,21 +560,21 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
try:
msg = decode(Format, msgBytes, MsgRec)
except SerializationError as err:
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
await sendErrorResponse(peer, conn, err, msgName, msgBytes)
return
except Exception as err:
# TODO. This is temporary code that should be removed after interop.
# It can be enabled only in certain diagnostic builds where it should
# re-raise the exception.
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
await sendErrorResponse(peer, stream, ServerError, err.msg)
await sendErrorResponse(peer, conn, ServerError, err.msg)
raise err
try:
logReceivedMsg(peer, MsgType(msg))
await callUserHandler(peer, stream, msg)
await callUserHandler(peer, conn, msg)
except CatchableError as err:
await sendErrorResponse(peer, stream, ServerError, err.msg)
await sendErrorResponse(peer, conn, ServerError, err.msg)
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
let network = peer.network
@ -727,21 +715,18 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
proc registerMsg(protocol: ProtocolInfo,
name: string,
mounter: MounterProc,
libp2pCodecName: string,
printer: MessageContentPrinter) =
libp2pCodecName: string) =
protocol.messages.add MessageInfo(name: name,
protocolMounter: mounter,
libp2pCodecName: libp2pCodecName,
printer: printer)
libp2pCodecName: libp2pCodecName)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
Responder = bindSym "Responder"
P2PStream = bindSym "P2PStream"
Connection = bindSym "Connection"
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
messagePrinter = bindSym "messagePrinter"
registerMsg = bindSym "registerMsg"
initProtocol = bindSym "initProtocol"
msgVar = ident "msg"
@ -761,7 +746,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.ResponderType = Responder
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
p.onPeerConnected.params.add newIdentDefs(streamVar, Connection)
result.implementMsg = proc (msg: Message) =
let
@ -775,13 +760,13 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection))
msg.initResponderCall.add streamVar
##
## Implement the Thunk:
##
## The protocol handlers in nim-libp2p receive only a `P2PStream`
## The protocol handlers in nim-libp2p receive only a `Connection`
## parameter and there is no way to access the wider context (such
## as the current `Switch`). In our handlers, we may need to list all
## peers in the current network, so we must keep a reference to the
@ -799,12 +784,12 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
if msg.userHandler != nil:
protocol.outRecvProcs.add quote do:
template `callUserHandler`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`,
`streamVar`: `Connection`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc thunk(`streamVar`: `P2PStream`,
proc thunk(`streamVar`: `Connection`,
proto: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
`MsgStrongRecName`, `Format`)
@ -830,8 +815,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocol.protocolInfoVar,
msgNameLit,
mounter,
codecNameLit,
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
codecNameLit))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
@ -903,8 +887,7 @@ proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
KeyPair(seckey: privKey, pubkey: privKey.getKey())
proc createEth2Node*(conf: BeaconNodeConf,
bootstrapNodes: seq[ENode]): Future[Eth2Node] {.async.} =
proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} =
var
(extIp, extTcpPort, _) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort)
@ -912,8 +895,7 @@ proc createEth2Node*(conf: BeaconNodeConf,
else: @[tcpEndPoint(extIp, extTcpPort)]
info "Initializing networking", hostAddress,
announcedAddresses,
bootstrapNodes
announcedAddresses
let keys = conf.getPersistentNetKeys
# TODO nim-libp2p still doesn't have support for announcing addresses
@ -924,9 +906,14 @@ proc createEth2Node*(conf: BeaconNodeConf,
result = Eth2Node.init(conf, switch, extIp, keys.seckey.asEthKey)
proc getPersistenBootstrapAddr*(conf: BeaconNodeConf,
ip: IpAddress, port: Port): ENode =
let pair = getPersistentNetKeys(conf)
initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port))
ip: IpAddress, port: Port): enr.Record =
let
pair = getPersistentNetKeys(conf)
enode = initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port))
return enr.Record.init(1'u64, # sequence number
pair.seckey.asEthKey,
enode.address)
proc shortForm*(id: KeyPair): string =
$PeerID.init(id.pubkey)
@ -938,7 +925,6 @@ proc toPeerInfo(enode: ENode): PeerInfo =
return PeerInfo.init(peerId, addresses)
proc connectToNetwork*(node: Eth2Node,
bootstrapNodes: seq[ENode],
bootstrapEnrs: seq[enr.Record]) {.async.} =
for bootstrapNode in bootstrapEnrs:
debug "Adding known peer", peer = bootstrapNode
@ -954,10 +940,6 @@ proc connectToNetwork*(node: Eth2Node,
traceAsyncErrors checkIfConnectedToBootstrapNode()
proc saveConnectionAddressFile*(node: Eth2Node, filename: string) =
writeFile(filename, $node.switch.peerInfo.addrs[0] & "/p2p/" &
node.switch.peerInfo.id)
func peersCount*(node: Eth2Node): int =
len(node.peerPool)

View File

@ -76,8 +76,8 @@ fi
rm -f beacon_node.log
# Delete any leftover address files from a previous session
if [ -f "${MASTER_NODE_ADDRESS_FILE}" ]; then
rm "${MASTER_NODE_ADDRESS_FILE}"
if [ -f "${MASTER_NODE_PID_FILE}" ]; then
rm "${MASTER_NODE_PID_FILE}"
fi
# to allow overriding the program names
@ -137,7 +137,7 @@ fi
for i in $(seq $MASTER_NODE -1 $TOTAL_USER_NODES); do
if [[ "$i" != "$MASTER_NODE" && "$USE_MULTITAIL" == "no" ]]; then
# Wait for the master node to write out its address file
while [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; do
while [ ! -f "${MASTER_NODE_PID_FILE}" ]; do
sleep 0.1
done
fi

View File

@ -33,7 +33,7 @@ NETWORK_BOOTSTRAP_FILE="${SIMULATION_DIR}/bootstrap_nodes.txt"
BEACON_NODE_BIN="${SIMULATION_DIR}/beacon_node"
BOOTSTRAP_NODE_BIN="${SIMULATION_DIR}/bootstrap_node"
DEPLOY_DEPOSIT_CONTRACT_BIN="${SIMULATION_DIR}/deploy_deposit_contract"
MASTER_NODE_ADDRESS_FILE="${SIMULATION_DIR}/node-${MASTER_NODE}/beacon_node.address"
MASTER_NODE_PID_FILE="${SIMULATION_DIR}/node-${MASTER_NODE}/beacon_node.pid"
BASE_P2P_PORT=30000
BASE_RPC_PORT=7000

View File

@ -1,8 +1,8 @@
#!/bin/bash
if [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; then
if [ ! -f "${MASTER_NODE_PID_FILE}" ]; then
echo Waiting for master node...
while [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; do
while [ ! -f "${MASTER_NODE_PID_FILE}" ]; do
sleep 0.1
done
fi

View File

@ -35,5 +35,5 @@ asyncTest "connect two nodes":
c2.nat = "none"
var n2 = await createEth2Node(c2)
await n2.connectToNetwork(bootstrapNodes = @[n1PersistentAddress])
await n2.connectToNetwork(@[n1PersistentAddress])