Added various smaller features

* Perform per-protocol handshakes after connecting

* Initialize the per-protocol Peer and Network states properly

* Add an `EthereumNode` object that can be configured with specific
  capabilities and connected to a specific network.
This commit is contained in:
Zahary Karadjov 2018-07-09 01:08:58 +03:00
parent f1001c45d2
commit 4d17ab1ee5
1 changed files with 307 additions and 164 deletions

View File

@ -16,33 +16,26 @@ import
kademlia, discovery, auth, rlpxcrypt, enode
type
ConnectionState* = enum
None,
Connected,
Disconnecting,
Disconnected
NetworkConnection* = ref object
id: int
EthereumNode* = ref object
networkId*: int
chain*: AbstractChainDB
clientId*: string
connectionState: ConnectionState
keys: KeyPair
listeningPort: Port
address: Address
rlpxCapabilities: seq[Capability]
rlpxProtocols: seq[ProtocolInfo]
listeningServer: StreamServer
protocolStates: seq[RootRef]
chainDb: AbstractChainDB
keyPair: KeyPair
address: Address
clientId: string
discovery: DiscoveryProtocol
peerPool: PeerPool
OutstandingRequest = object
reqId: int
future: FutureBase
timeoutAt: uint64
Peer* = ref object
transp: StreamTransport
dispatcher: Dispatcher
nextReqId: int
network: NetworkConnection
network: EthereumNode
secretsState: SecretState
connectionState: ConnectionState
remote*: Node
@ -50,7 +43,13 @@ type
outstandingRequests: seq[Deque[OutstandingRequest]]
awaitedMessages: seq[FutureBase]
OutstandingRequest = object
reqId: int
future: FutureBase
timeoutAt: uint64
PeerPool* = ref object
network: EthereumNode
keyPair: KeyPair
networkId: int
minPeers: int
@ -61,11 +60,6 @@ type
running: bool
listenPort*: Port
MessageHandler* = proc(x: Peer, data: Rlp): Future[void]
MessageContentPrinter* = proc(msg: pointer): string
RequestResolver* = proc(msg: pointer, future: FutureBase)
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase)
MessageInfo* = object
id*: int
name*: string
@ -85,7 +79,11 @@ type
version*: int
messages*: seq[MessageInfo]
index: int # the position of the protocol in the
# ordered list of supported protocols
# ordered list of supported protocols
peerStateInitializer: PeerStateInitializer
networkStateInitializer: NetworkStateInitializer
handshake: HandshakeStep
disconnectHandler: DisconnectionHandler
Dispatcher = ref object
# The dispatcher stores the mapping of negotiated message IDs between
@ -103,11 +101,43 @@ type
protocolOffsets: seq[int]
messages: seq[ptr MessageInfo]
MessageHandler = proc(x: Peer, data: Rlp): Future[void]
MessageContentPrinter = proc(msg: pointer): string
RequestResolver = proc(msg: pointer, future: FutureBase)
NextMsgResolver = proc(msgData: Rlp, future: FutureBase)
PeerStateInitializer = proc(peer: Peer): RootRef
NetworkStateInitializer = proc(network: EthereumNode): RootRef
HandshakeStep = proc(peer: Peer): Future[void]
DisconnectionHandler = proc(peer: Peer,
reason: DisconnectionReason): Future[void]
RlpxMessageKind* = enum
rlpxNotification,
rlpxRequest,
rlpxResponse
ConnectionState* = enum
None,
Connecting,
Connected,
Disconnecting,
Disconnected
DisconnectionReason* = enum
DisconnectRequested,
TcpError,
BreachOfProtocol,
UselessPeer,
TooManyPeers,
AlreadyConnected,
IncompatibleProtocolVersion,
NullNodeIdentityReceived,
ClientQuitting,
UnexpectedIdentity,
SelfConnection,
MessageTimeout,
SubprotocolReason = 0x10
UnsupportedProtocol* = object of Exception
# This is raised when you attempt to send a message from a particular
# protocol to a peer that doesn't support the protocol.
@ -119,18 +149,18 @@ logScope:
const
baseProtocolVersion = 4
clientId = "nim-eth-p2p/0.2.0"
defaultReqTimeout = 10000
var
gProtocols: seq[ProtocolInfo]
gCapabilities: seq[Capability]
gDispatchers = initSet[Dispatcher]()
devp2p: ProtocolInfo
# The variables above are immutable RTTI information. We need to tell
# Nim to not consider them GcSafe violations:
template rlpxProtocols: auto = {.gcsafe.}: gProtocols
template rlpxCapabilities: auto = {.gcsafe.}: gCapabilities
template devp2pProtocolInfo: auto = {.gcsafe.}: devp2p
# Dispatcher
@ -145,14 +175,19 @@ proc hash(d: Dispatcher): int =
proc `==`(lhs, rhs: Dispatcher): bool =
lhs.protocolOffsets == rhs.protocolOffsets
proc describeProtocols(d: Dispatcher): string =
result = ""
iterator activeProtocols(d: Dispatcher): ProtocolInfo =
for i in 0 ..< rlpxProtocols.len:
if d.protocolOffsets[i] != -1:
if result.len != 0: result.add(',')
for c in rlpxProtocols[i].name: result.add(c)
yield rlpxProtocols[i]
proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
proc describeProtocols(d: Dispatcher): string =
result = ""
for protocol in d.activeProtocols:
if result.len != 0: result.add(',')
for c in protocol.name: result.add(c)
proc getDispatcher(node: EthereumNode,
otherPeerCapabilities: openarray[Capability]): Dispatcher =
# TODO: sub-optimal solution until progress is made here:
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
@ -164,6 +199,9 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
for i in 0 ..< rlpxProtocols.len:
let localProtocol = rlpxProtocols[i]
if not node.rlpxProtocols.contains(localProtocol):
result.protocolOffsets[i] = -1
continue
block findMatchingProtocol:
for remoteCapability in otherPeerCapabilities:
@ -188,20 +226,31 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
for i in 0 ..< rlpxProtocols.len:
if result.protocolOffsets[i] != -1:
rlpxProtocols[i].messages.copyTo(result.messages, result.protocolOffsets[i])
rlpxProtocols[i].messages.copyTo(result.messages,
result.protocolOffsets[i])
gDispatchers.incl result
# Protocol info objects
#
proc newProtocol(name: string, version: int): ProtocolInfo =
proc newProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfo =
new result
result.name[0] = name[0]
result.name[1] = name[1]
result.name[2] = name[2]
result.version = version
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc nameStr*(p: ProtocolInfo): string =
result = newStringOfCap(3)
@ -257,10 +306,8 @@ proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
if protocol.version > 0:
if gProtocols.isNil: gProtocols = @[]
if gCapabilities.isNil: gCapabilities = @[]
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
gCapabilities.insert(Capability(name: protocol.name, version: protocol.version), pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
else:
@ -499,6 +546,20 @@ template networkState*(connection: Peer, Protocol: typedesc): untyped =
## particular connection.
cast[ref Protocol.NetworkState](connection.getNetworkState(Protocol.protocolInfo))
proc initProtocolState*[T](state: var T, x: Peer|EthereumNode) = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res[], peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res[], network)
return cast[RootRef](res)
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
@ -512,40 +573,77 @@ macro rlpxProtocol*(protoIdentifier: untyped,
body: untyped): untyped =
## The macro used to defined RLPx sub-protocols. See README.
var
nextId = 0
outTypes = newNimNode(nnkStmtList)
outSendProcs = newNimNode(nnkStmtList)
outRecvProcs = newNimNode(nnkStmtList)
outProcRegistrations = newNimNode(nnkStmtList)
protoName = $protoIdentifier
protoNameIdent = newIdentNode(protoName)
resultIdent = newIdentNode "result"
protocol = genSym(nskVar, protoName & "Proto")
newProtocol = bindSym "newProtocol"
rlpFromBytes = bindSym "rlpFromBytes"
read = bindSym "read"
initRlpWriter = bindSym "initRlpWriter"
startList = bindSym "startList"
finish = bindSym "finish"
append = bindSym "append"
sendMsg = bindSym "sendMsg"
Peer = bindSym "Peer"
isSubprotocol = version > 0
stateType: NimNode = nil
networkStateType: NimNode = nil
handshake = newNilLit()
disconnectHandler = newNilLit()
useRequestIds = true
Option = bindSym "Option"
# XXX: Binding the int type causes instantiation failure for some reason
# Int = bindSym "int"
Int = newIdentNode "int"
writeMsgId = bindSym "writeMsgId"
resolveResponseFuture = bindSym "resolveResponseFuture"
registerRequest = bindSym "registerRequest"
isSubprotocol = version > 0
msgThunksAndRegistrations = newNimNode(nnkStmtList)
nextId = 0
finalOutput = newNimNode(nnkStmtList)
stateType: NimNode = nil
networkStateType: NimNode = nil
useRequestIds = true
Peer = bindSym "Peer"
append = bindSym "append"
createNetworkState = bindSym "createNetworkState"
createPeerState = bindSym "createPeerState"
finish = bindSym "finish"
initRlpWriter = bindSym "initRlpWriter"
messagePrinter = bindSym "messagePrinter"
requestResolver = bindSym "requestResolver"
newProtocol = bindSym "newProtocol"
nextMsgResolver = bindSym "nextMsgResolver"
read = bindSym "read"
registerRequest = bindSym "registerRequest"
requestResolver = bindSym "requestResolver"
resolveResponseFuture = bindSym "resolveResponseFuture"
rlpFromBytes = bindSym "rlpFromBytes"
sendMsg = bindSym "sendMsg"
startList = bindSym "startList"
writeMsgId = bindSym "writeMsgId"
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert protoName.len == 3
proc augmentUserHandler(userHandlerProc: NimNode) =
## Turns a regular proc definition into an async proc and adds
## the helpers for accessing the peer and network protocol states.
userHandlerProc.addPragma newIdentNode"async"
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if stateType != nil:
var localStateAccessor = quote:
template state(p: `Peer`): ref `stateType` =
cast[ref `stateType`](p.getState(`protocol`))
userHandlerProc.body.insert 0, localStateAccessor
if networkStateType != nil:
var networkStateAccessor = quote:
template networkState(p: `Peer`): ref `networkStateType` =
cast[ref `networkStateType`](p.getNetworkState(`protocol`))
userHandlerProc.body.insert 0, networkStateAccessor
proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
var fn = newTree(nnkProcDef)
doBlock.copyChildrenTo(fn)
result = genSym(nskProc, protoName & handlerName)
fn.name = result
augmentUserHandler fn
outRecvProcs.add fn
proc addMsgHandler(msgId: int, n: NimNode,
msgKind = rlpxNotification,
responseMsgId = -1,
@ -636,7 +734,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# message parameters and calls the user proc:
userHandlerProc = n.copyNimTree
userHandlerProc.name = genSym(nskProc, msgName)
userHandlerProc.addPragma newIdentNode"async"
augmentUserHandler userHandlerProc
# This is the call to the user supplied handled. Here we add only the
# initial peer param, while the rest of the params will be added later.
@ -646,23 +744,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall("await", userHandlerCall)
msgThunksAndRegistrations.add(userHandlerProc)
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if stateType != nil:
var localStateAccessor = quote:
template state(p: `Peer`): ref `stateType` =
cast[ref `stateType`](p.getState(`protocol`))
userHandlerProc.body.insert 0, localStateAccessor
if networkStateType != nil:
var networkStateAccessor = quote:
template networkState(p: `Peer`): ref `networkStateType` =
cast[ref `networkStateType`](p.getNetworkState(`protocol`))
userHandlerProc.body.insert 0, networkStateAccessor
outRecvProcs.add(userHandlerProc)
for param, paramType in n.typedParams(skip = 1):
inc paramCount
@ -689,7 +771,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
let thunkName = newIdentNode(msgName & "_thunk")
msgThunksAndRegistrations.add quote do:
outRecvProcs.add quote do:
proc `thunkName`(`msgSender`: `Peer`, data: Rlp) {.async.} =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecord`
@ -697,7 +779,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
`awaitUserHandler`
`callResolvedResponseFuture`
finalOutput.add quote do:
outTypes.add quote do:
# This is a type featuring a single field for each message param:
type `msgRecord`* = `msgRecordBody`
@ -748,9 +830,9 @@ macro rlpxProtocol*(protoIdentifier: untyped,
`appendParams`
`senderEpilogue`
finalOutput.add msgSendProc
outSendProcs.add msgSendProc
msgThunksAndRegistrations.add(
outProcRegistrations.add(
newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(msgId),
@ -760,17 +842,11 @@ macro rlpxProtocol*(protoIdentifier: untyped,
newTree(nnkBracketExpr, requestResolver, msgRecord),
newTree(nnkBracketExpr, nextMsgResolver, msgRecord)))
result = finalOutput
result.add quote do:
# One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`protoName`, `version`)
# Create a type actining as a pseudo-object representing the protocol (e.g. p2p)
outTypes.add quote do:
# Create a type acting as a pseudo-object representing the protocol
# (e.g. p2p)
type `protoNameIdent`* = object
# The protocol run-time data is available as a pseudo-field (e.g. `p2p.protocolInfo`)
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
for n in body:
case n.kind
of {nnkCall, nnkCommand}:
@ -810,6 +886,11 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# break the block so it doesn't reach the error call below
break processReqResp
macros.error("requestResponse expects a block with at least two proc definitions")
elif eqIdent(n[0], "onPeerConnected"):
handshake = liftEventHandler(n[1], "Handshake")
elif eqIdent(n[0], "onPeerDisconnected"):
discard
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect")
else:
macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
@ -820,26 +901,25 @@ macro rlpxProtocol*(protoIdentifier: untyped,
macros.error(repr(n[0]) & " is not a recognized protocol option")
of nnkTypeSection:
result.add n
outTypes.add n
for typ in n:
if eqIdent(typ[0], "State"):
stateType = genSym(nskType, protoName & "State")
typ[0] = stateType
result.add quote do:
outTypes.add quote do:
template State*(P: type `protoNameIdent`): typedesc =
`stateType`
elif eqIdent(typ[0], "NetworkState"):
networkStateType = genSym(nskType, protoName & "NetworkState")
typ[0] = networkStateType
result.add quote do:
outTypes.add quote do:
template NetworkState*(P: type `protoNameIdent`): typedesc =
`networkStateType`
else:
macros.error("The only type names allowed within a RLPx protocol definition are 'State' and 'NetworkState'")
of nnkProcDef:
discard addMsgHandler(nextId, n)
inc nextId
@ -847,37 +927,38 @@ macro rlpxProtocol*(protoIdentifier: untyped,
else:
macros.error("illegal syntax in a RLPx protocol definition", n)
result.add(msgThunksAndRegistrations)
let peerInit = if stateType == nil: newNilLit()
else: newTree(nnkBracketExpr, createPeerState, stateType)
let netInit = if networkStateType == nil: newNilLit()
else: newTree(nnkBracketExpr, createNetworkState, stateType)
result = newNimNode(nnkStmtList)
result.add outTypes
result.add quote do:
# One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`protoName`, `version`, `peerInit`, `netInit`)
# The protocol run-time data is available as a pseudo-field
# (e.g. `p2p.protocolInfo`)
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
result.add outSendProcs, outRecvProcs, outProcRegistrations
result.add quote do:
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
result.add newCall(bindSym("registerProtocol"), protocol)
when isMainModule: echo repr(result)
type
DisconnectionReason* = enum
DisconnectRequested,
TcpError,
BreachOfProtocol,
UselessPeer,
TooManyPeers,
AlreadyConnected,
IncompatibleProtocolVersion,
NullNodeIdentityReceived,
ClientQuitting,
UnexpectedIdentity,
SelfConnection,
MessageTimeout,
SubprotocolReason = 0x10
rlpxProtocol p2p, 0:
proc hello(peer: Peer,
version: uint,
clientId: string,
capabilities: seq[Capability],
listenPort: uint,
nodeId: array[RawPublicKeySize, byte]) =
# peer.id = nodeId
peer.dispatcher = getDispatcher(capabilities)
nodeId: array[RawPublicKeySize, byte])
proc disconnect(peer: Peer, reason: DisconnectionReason)
proc sendDisconnectMsg(peer: Peer, reason: DisconnectionReason)
proc ping(peer: Peer) =
discard peer.pong()
@ -885,6 +966,10 @@ rlpxProtocol p2p, 0:
proc pong(peer: Peer) =
discard
proc disconnect(peer: Peer, reason: DisconnectionReason) {.async.} =
discard await peer.sendDisconnectMsg(reason)
# TODO: Any other clean up required?
template `^`(arr): auto =
# passes a stack array with a matching `arrLen`
# variable as an open array
@ -898,27 +983,40 @@ proc check(status: AuthStatus) =
if status != AuthStatus.Success:
raise newException(Exception, "Error: " & $status)
proc connectionEstablished(p: Peer, h: p2p.hello) =
p.dispatcher = getDispatcher(h.capabilities)
proc performSubProtocolHandshakes(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add protocol.handshake(peer)
await all(subProtocolsHandshakes)
peer.connectionState = Connected
proc postHelloSteps(peer: Peer, h: p2p.hello): Future[void] =
peer.dispatcher = getDispatcher(peer.network, h.capabilities)
# The dispatcher has determined our message ID sequence.
# For each message ID, we allocate a potential slot for
# tracking responses to requests.
# (yes, some of the slots won't be used).
p.outstandingRequests.newSeq(p.dispatcher.messages.len)
for d in mitems(p.outstandingRequests): d = initDeque[OutstandingRequest](0)
peer.outstandingRequests.newSeq(peer.dispatcher.messages.len)
for d in mitems(peer.outstandingRequests):
d = initDeque[OutstandingRequest](0)
# similarly, we need a bit of book-keeping data to keep track of the
# potentially concurrent calls to `nextMsg`.
p.awaitedMessages.newSeq(p.dispatcher.messages.len)
# Similarly, we need a bit of book-keeping data to keep track
# of the potentially concurrent calls to `nextMsg`.
peer.awaitedMessages.newSeq(peer.dispatcher.messages.len)
p.nextReqId = 1
peer.nextReqId = 1
# p.id = h.nodeId
newSeq(p.protocolStates, rlpxProtocols.len)
# Initialize all the active protocol states
newSeq(peer.protocolStates, rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
let peerStateInit = protocol.peerStateInitializer
if peerStateInit != nil:
peer.protocolStates[protocol.index] = peerStateInit(peer)
p.connectionState = Connected
# TODO: initialize the sub-protocol states
return performSubProtocolHandshakes(peer)
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) =
@ -927,16 +1025,17 @@ proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
initSecretState(secrets, p.secretsState)
burnMem(secrets)
proc rlpxConnect*(remote: Node, myKeys: KeyPair, listenPort: Port,
clientId: string): Future[Peer] {.async.} =
proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
new result
result.network = node
result.remote = remote
let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort)
try:
result.transp = await connect(ta)
var handshake = newHandshake({Initiator})
handshake.host = myKeys
handshake.host = node.keys
var authMsg: array[AuthMessageMaxEIP8, byte]
var authMsgLen = 0
@ -962,25 +1061,30 @@ proc rlpxConnect*(remote: Node, myKeys: KeyPair, listenPort: Port,
# if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
asyncCheck result.hello(baseProtocolVersion, clientId, rlpxCapabilities,
uint(listenPort), myKeys.pubkey.getRaw())
asyncCheck result.hello(baseProtocolVersion,
node.clientId,
node.rlpxCapabilities,
uint(node.listeningPort),
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
connectionEstablished(result, response)
await postHelloSteps(result, response)
except:
if not isNil(result.transp):
result.transp.close()
proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair,
clientId: string): Future[Peer] {.async.} =
proc rlpxAccept*(node: EthereumNode,
transp: StreamTransport): Future[Peer] {.async.} =
new result
result.transp = transp
result.network = node
var handshake = newHandshake({Responder})
handshake.host = myKeys
handshake.host = node.keys
try:
let initialSize = handshake.expectedLength
@ -1004,9 +1108,9 @@ proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair,
var response = await result.waitSingleMsg(p2p.hello)
let listenPort = transp.localAddress().port
discard result.hello(baseProtocolVersion, clientId,
rlpxCapabilities, listenPort.uint,
myKeys.pubkey.getRaw())
discard result.hello(baseProtocolVersion, node.clientId,
node.rlpxCapabilities, listenPort.uint,
node.keys.pubkey.getRaw())
if validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
@ -1017,7 +1121,7 @@ proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair,
udpPort: remote.port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
connectionEstablished(result, response)
await postHelloSteps(result, response)
except:
transp.close()
@ -1028,10 +1132,12 @@ const
lookupInterval = 5
connectLoopSleepMs = 2000
proc newPeerPool*(chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair,
proc newPeerPool*(network: EthereumNode,
chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair,
discovery: DiscoveryProtocol, clientId: string,
listenPort = Port(30303), minPeers = 10): PeerPool =
result.new()
new result
result.network = network
result.keyPair = keyPair
result.minPeers = minPeers
result.networkId = networkId
@ -1070,7 +1176,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
debug "skipping_connection_to_already_connected_peer", remote
return nil
result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId)
result = await p.network.rlpxConnect(remote)
# expected_exceptions = (
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
@ -1183,10 +1289,36 @@ proc start*(p: PeerPool) =
# await asyncio.sleep(0.5)
# return random.choice(self.peers)
# Ethereum Node
#
proc addProtocol(n: var EthereumNode, p: ProtocolInfo) =
assert n.connectionState == ConnectionState.None
let pos = lowerBound(n.rlpxProtocols, p)
n.rlpxProtocols.insert(p, pos)
n.rlpxCapabilities.insert(Capability(name: p.name, version: p.version), pos)
template addCapability*(n: var EthereumNode, Protocol: typedesc) =
addProtocol(n, Protocol.protocolInfo)
proc newEthereumNode*(keys: KeyPair,
chain: AbstractChainDB,
clientId = clientId,
addAllCapabilities = true): EthereumNode =
result.keys = keys
result.clientId = clientId
result.rlpxProtocols.newSeq 0
result.rlpxCapabilities.newSeq 0
result.connectionState = ConnectionState.None
if addAllCapabilities:
for p in rlpxProtocols:
result.addProtocol(p)
proc processIncoming(server: StreamServer,
remote: StreamTransport): Future[void] {.async, gcsafe.} =
var p2p = getUserData[NetworkConnection](server)
let peerfut = remote.rlpxAccept(p2p.keyPair, p2p.clientId)
var node = getUserData[EthereumNode](server)
let peerfut = node.rlpxAccept(remote)
yield peerfut
if not peerfut.failed:
let peer = peerfut.read()
@ -1196,51 +1328,62 @@ proc processIncoming(server: StreamServer,
$remote.remoteAddress()
remote.close()
proc connectToNetwork*(keyPair: KeyPair,
proc connectToNetwork*(node: var EthereumNode,
address: Address,
chainDb: AbstractChainDB,
listeningPort = Port(30303),
bootstrapNodes: openarray[ENode],
clientId: string,
networkId: int,
startListening = true): NetworkConnection =
new result
result.id = networkId
result.chainDb = chainDb
result.keyPair = keyPair
result.address = address
result.clientId = clientId
result.discovery = newDiscoveryProtocol(keyPair.seckey, address,
bootstrapNodes)
result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery,
clientId, address.tcpPort)
startListening = true) =
assert node.connectionState == ConnectionState.None
node.connectionState = Connecting
node.networkId = networkId
node.listeningPort = listeningPort
node.address = address
node.discovery = newDiscoveryProtocol(node.keys.seckey, address, bootstrapNodes)
node.peerPool = newPeerPool(node, node.chain, networkId,
node.keys, node.discovery,
node.clientId, address.tcpPort)
let ta = initTAddress(address.ip, address.tcpPort)
result.listeningServer = createStreamServer(ta, processIncoming,
{ReuseAddr},
udata = result)
node.listeningServer = createStreamServer(ta, processIncoming,
{ReuseAddr},
udata = addr(node))
node.protocolStates.newSeq(rlpxProtocols.len)
for p in node.rlpxProtocols:
if p.networkStateInitializer != nil:
node.protocolStates[p.index] = p.networkStateInitializer(node)
if startListening:
result.listeningServer.start()
node.listeningServer.start()
proc startListening*(s: NetworkConnection) =
proc startListening*(s: EthereumNode) =
s.listeningServer.start()
proc stopListening*(s: NetworkConnection) =
proc stopListening*(s: EthereumNode) =
s.listeningServer.stop()
when isMainModule:
import rlp
import rlp, strformat
rlpxProtocol aaa, 1:
type State = object
peerName: string
onPeerConnected do (peer: Peer):
discard await peer.hi "Bob"
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason):
debug "peer disconnected", peer
requestResponse:
proc aaaReq(p: Peer, n: int) =
echo "got req ", n
debug "got req ", n
discard await p.aaaRes &"response to {n}"
proc aaaRes(p: Peer, data: string) =
echo "got response ", data
debug "got response ", data
proc hi(p: Peer, name: string) =
p.state.peerName = name