Initial implementation of the LES protocol and its flow control.

This also restores the old module structure of having separate
`peer_pool` and `rlpx` modules. This is made possible by the
new Nim package `package_visible_types` (please refer to its
README for an explanation).

Also introduces more error handling in the low-level RLPx routines.
All detected errors will result in immediate disconnection of the
corresponding peer, which may be detected in the protocols though
the `onPeerDisconnected` event handler.
This commit is contained in:
Zahary Karadjov 2018-10-15 17:36:43 +03:00
parent b38804f873
commit 7828ef0481
18 changed files with 1927 additions and 538 deletions

View File

@ -24,6 +24,8 @@ the `EthereumNode` type:
``` nim
proc newEthereumNode*(keys: KeyPair,
listeningAddress: Address,
networkId: uint,
chain: AbstractChainDB,
clientId = "nim-eth-p2p",
addAllCapabilities = true): EthereumNode =
@ -38,6 +40,14 @@ proc newEthereumNode*(keys: KeyPair,
library for utilities that will help you generate and manage
such keys.
`listeningAddress`:
The network interface and port where your client will be
accepting incoming connections.
`networkId`:
The Ethereum network ID. The client will disconnect immediately
from any peers who don't use the same network.
`chain`:
An abstract instance of the Ethereum blockchain associated
with the node. This library allows you to plug any instance
@ -69,11 +79,9 @@ the network. To start the connection process, call `node.connectToNetwork`:
``` nim
proc connectToNetwork*(node: var EthereumNode,
address: Address,
listeningPort = Port(30303),
bootstrapNodes: openarray[ENode],
networkId: int,
startListening = true)
startListening = true,
enableDiscovery = true)
```
The `EthereumNode` will automatically find and maintan a pool of peers
@ -106,7 +114,7 @@ a 3-letter identifier for the protocol and the current protocol version:
Here is how the [DevP2P wire protocol](https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol) might look like:
``` nim
rlpxProtocol p2p, 0:
rlpxProtocol p2p(version = 0):
proc hello(peer: Peer,
version: uint,
clientId: string,
@ -130,25 +138,28 @@ and the asynchronous code responsible for handling the incoming messages.
### Protocol state
The protocol implementations are expected to maintain a state and to act like
a state machine handling the incoming messages. To achieve this, each protocol
may define a `State` object that can be accessed as a `state` field of the `Peer`
object:
The protocol implementations are expected to maintain a state and to act
like a state machine handling the incoming messages. You are allowed to
define an arbitrary state type that can be specified in the `peerState`
protocol option. Later, instances of the state object can be obtained
though the `state` pseudo-field of the `Peer` object:
``` nim
rlpxProtocol abc, 1:
type State = object
type AbcPeerState = object
receivedMsgsCount: int
rlpxProtocol abc(version = 1,
peerState = AbcPeerState):
proc incomingMessage(p: Peer) =
p.state.receivedMsgsCount += 1
```
Besides the per-peer state demonstrated above, there is also support for
maintaining a network-wide state. In the example above, we'll just have
to change the name of the state type to `NetworkState` and the accessor
expression to `p.network.state`.
Besides the per-peer state demonstrated above, there is also support
for maintaining a network-wide state. It's enabled by specifying the
`networkState` option of the protocol and the state object can be obtained
through accessor of the same name.
The state objects are initialized to zero by default, but you can modify
this behaviour by overriding the following procs for your state types:
@ -158,11 +169,8 @@ proc initProtocolState*(state: var MyPeerState, p: Peer)
proc initProtocolState*(state: var MyNetworkState, n: EthereumNode)
```
Please note that the state type will have to be placed outside of the
protocol definition in order to achieve this.
Sometimes, you'll need to access the state of another protocol. To do this,
specify the protocol identifier to the `state` accessors:
Sometimes, you'll need to access the state of another protocol.
To do this, specify the protocol identifier to the `state` accessors:
``` nim
echo "ABC protocol messages: ", peer.state(abc).receivedMsgCount
@ -234,10 +242,9 @@ be specified for each individual call and the default value can be
overridden on the level of individual message, or the entire protocol:
``` nim
rlpxProtocol abc, 1:
timeout = 5000 # value in milliseconds
useRequestIds = false
rlpxProtocol abc(version = 1,
useRequestIds = false,
timeout = 5000): # value in milliseconds
requestResponse:
proc myReq(dataId: int, timeout = 3000)
proc myRes(data: string)
@ -255,7 +262,7 @@ also include handlers for certain important events such as newly connected
peers or misbehaving or disconnecting peers:
``` nim
rlpxProtocol les, 2:
rlpxProtocol les(version = 2):
onPeerConnected do (peer: Peer):
asyncCheck peer.status [
"networkId": rlp.encode(1),

View File

@ -9,30 +9,31 @@
#
import
tables, deques, macros, sets, algorithm, hashes, times,
random, options, sequtils, typetraits, os,
asyncdispatch2, asyncdispatch2/timer,
rlp, ranges/[stackarrays, ptr_arith], nimcrypto, chronicles,
eth_keys, eth_common,
eth_p2p/[kademlia, discovery, auth, rlpxcrypt, enode]
tables, algorithm, random,
asyncdispatch2, asyncdispatch2/timer, chronicles,
eth_keys, eth_common/eth_types,
eth_p2p/[kademlia, discovery, enode, peer_pool, rlpx],
eth_p2p/private/types
types.forwardPublicTypes
export
enode, kademlia, options
rlpx, enode, kademlia
proc addProtocol(n: var EthereumNode, p: ProtocolInfo) =
proc addCapability*(n: var EthereumNode, p: ProtocolInfo) =
assert n.connectionState == ConnectionState.None
let pos = lowerBound(n.rlpxProtocols, p)
let pos = lowerBound(n.rlpxProtocols, p, rlpx.cmp)
n.rlpxProtocols.insert(p, pos)
n.rlpxCapabilities.insert(Capability(name: p.name, version: p.version), pos)
n.rlpxCapabilities.insert(p.asCapability, pos)
template addCapability*(n: var EthereumNode, Protocol: type) =
addProtocol(n, Protocol.protocolInfo)
addCapability(n, Protocol.protocolInfo)
proc newEthereumNode*(keys: KeyPair,
address: Address,
networkId: uint,
chain: AbstractChainDB,
clientId = clientId,
clientId = "nim-eth-p2p/0.2.0", # TODO: read this value from nimble somehow
addAllCapabilities = true): EthereumNode =
new result
result.keys = keys
@ -45,7 +46,7 @@ proc newEthereumNode*(keys: KeyPair,
if addAllCapabilities:
for p in rlpxProtocols:
result.addProtocol(p)
result.addCapability(p)
proc processIncoming(server: StreamServer,
remote: StreamTransport): Future[void] {.async, gcsafe.} =
@ -69,6 +70,13 @@ proc startListening*(node: EthereumNode) =
udata = cast[pointer](node))
node.listeningServer.start()
proc initProtocolStates*(node: EthereumNode) =
# TODO: This should be moved to a private module
node.protocolStates.newSeq(rlpxProtocols.len)
for p in node.rlpxProtocols:
if p.networkStateInitializer != nil:
node.protocolStates[p.index] = ((p.networkStateInitializer)(node))
proc connectToNetwork*(node: EthereumNode,
bootstrapNodes: seq[ENode],
startListening = true,
@ -80,17 +88,14 @@ proc connectToNetwork*(node: EthereumNode,
node.address,
bootstrapNodes)
node.peerPool = newPeerPool(node, node.chain, node.networkId,
node.peerPool = newPeerPool(node, node.networkId,
node.keys, node.discovery,
node.clientId, node.address.tcpPort)
if startListening:
eth_p2p.startListening(node)
node.protocolStates.newSeq(rlpxProtocols.len)
for p in node.rlpxProtocols:
if p.networkStateInitializer != nil:
node.protocolStates[p.index] = p.networkStateInitializer(node)
node.initProtocolStates()
if startListening:
node.listeningServer.start()

View File

@ -17,14 +17,15 @@ requires "nim > 0.18.0",
"byteutils",
"chronicles",
"asyncdispatch2",
"eth_common"
"eth_common",
"package_visible_types"
proc runTest(name: string, lang = "c") = exec "nim " & lang & " --experimental:ForLoopMacros -r tests/" & name
proc runTest(name: string, lang = "c") =
exec "nim " & lang & " -d:testing --experimental:ForLoopMacros -r tests/" & name
task test, "Runs the test suite":
runTest "testecies"
runTest "testauth"
runTest "testcrypt"
runTest "testenode"
runTest "tdiscovery"
runTest "tserver"
runTest "all_tests"

View File

@ -1,3 +1,13 @@
import
sets, options, random, hashes,
asyncdispatch2, chronicles, eth_common/eth_types,
private/types, rlpx, peer_pool, rlpx_protocols/eth_protocol,
../eth_p2p.nim
const
minPeersToStartSync* = 2 # Wait for consensus of at least this
# number of peers before syncing
type
SyncStatus* = enum
syncSuccess
@ -26,6 +36,8 @@ type
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p))
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
result += (b.numBlocks - 1).u256
@ -228,6 +240,7 @@ proc randomTrustedPeer(ctx: SyncContext): Peer =
inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
debug "start sync ", peer, trustedPeers = ctx.trustedPeers.len
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
@ -280,7 +293,7 @@ proc onPeerConnected(ctx: SyncContext, peer: Peer) =
error "startSyncWithPeer failed", msg = f.readError.msg, peer
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
echo "onPeerDisconnected"
debug "peer disconnected ", peer = p
ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =

View File

@ -0,0 +1,41 @@
import
eth_common/[eth_types, state_accessors]
# TODO: Perhaps we can move this to eth-common
proc getBlockHeaders*(db: AbstractChainDb,
req: BlocksRequest): seq[BlockHeader] =
result = newSeqOfCap[BlockHeader](req.maxResults)
var foundBlock: BlockHeader
if db.getBlockHeader(req.startBlock, foundBlock):
result.add foundBlock
while uint64(result.len) < req.maxResults:
if not db.getSuccessorHeader(foundBlock, foundBlock):
break
result.add foundBlock
template fetcher*(fetcherName, fetchingFunc, InputType, ResultType: untyped) =
proc fetcherName*(db: AbstractChainDb,
lookups: openarray[InputType]): seq[ResultType] =
for lookup in lookups:
let fetched = fetchingFunc(db, lookup)
if fetched.hasData:
# TODO: should there be an else clause here.
# Is the peer responsible of figuring out that
# some of the requested items were not found?
result.add deref(fetched)
fetcher getContractCodes, getContractCode, ContractCodeRequest, Blob
fetcher getBlockBodies, getBlockBody, KeccakHash, BlockBody
fetcher getStorageNodes, getStorageNode, KeccakHash, Blob
fetcher getReceipts, getReceipt, KeccakHash, Receipt
fetcher getProofs, getProof, ProofRequest, Blob
fetcher getHeaderProofs, getHeaderProof, ProofRequest, Blob
proc getHelperTrieProofs*(db: AbstractChainDb,
reqs: openarray[HelperTrieProofRequest],
outNodes: var seq[Blob], outAuxData: var seq[Blob]) =
discard

View File

@ -46,7 +46,7 @@ const
FIND_CONCURRENCY = 3 # parallel find node lookups
ID_SIZE = 256
proc toNodeId(pk: PublicKey): NodeId =
proc toNodeId*(pk: PublicKey): NodeId =
readUintBE[256](keccak256.digest(pk.getRaw()).data)
proc newNode*(pk: PublicKey, address: Address): Node =
@ -67,7 +67,9 @@ proc newNode*(enode: ENode): Node =
proc distanceTo(n: Node, id: NodeId): UInt256 = n.id xor id
proc `$`*(n: Node): string =
# "Node[" & $n.node & "]"
if n == nil:
"Node[local]"
else:
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.data)

210
eth_p2p/mock_peers.nim Normal file
View File

@ -0,0 +1,210 @@
import
macros, deques, algorithm,
asyncdispatch2, eth_keys, rlp, eth_common/eth_types,
private/types, rlpx, ../eth_p2p
type
Action = proc (p: Peer, data: Rlp): Future[void]
ProtocolMessagePair = object
protocol: ProtocolInfo
id: int
ExpectedMsg = object
msg: ProtocolMessagePair
response: Action
MockConf* = ref object
keys*: KeyPair
address*: Address
networkId*: uint
chain*: AbstractChainDb
clientId*: string
waitForHello*: bool
devp2pHandshake: ExpectedMsg
handshakes: seq[ExpectedMsg]
protocols: seq[ProtocolInfo]
expectedMsgs: Deque[ExpectedMsg]
receivedMsgsCount: int
var
nextUnusedMockPort = 40304
proc toAction(a: Action): Action = a
proc toAction[N](actions: array[N, Action]): Action =
mixin await
result = proc (peer: Peer, data: Rlp) {.async.} =
for a in actions:
await a(peer, data)
proc toAction(a: proc (): Future[void]): Action =
result = proc (peer: Peer, data: Rlp) {.async.} =
await a()
proc toAction(a: proc (peer: Peer): Future[void]): Action =
result = proc (peer: Peer, data: Rlp) {.async.} =
await a(peer)
proc delay*(duration: int): Action =
result = proc (p: Peer, data: Rlp) {.async.} =
await sleepAsync(duration)
proc reply(bytes: Bytes): Action =
result = proc (p: Peer, data: Rlp) {.async.} =
await p.sendMsg(bytes)
proc reply*[Msg](msg: Msg): Action =
mixin await
result = proc (p: Peer, data: Rlp) {.async.} =
await p.send(msg)
proc localhostAddress*(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc makeProtoMsgPair(MsgType: type): ProtocolMessagePair =
mixin msgProtocol, protocolInfo
result.protocol = MsgType.msgProtocol.protocolInfo
result.id = MsgType.msgId
proc readReqId*(rlp: Rlp): int =
var r = rlp
return r.read(int)
proc expectationViolationMsg(mock: MockConf,
reason: string,
receivedMsg: ptr MessageInfo): string =
result = "[Mock expectation violated] " & reason & ": " & receivedMsg.name
for i in 0 ..< mock.expectedMsgs.len:
let expected = mock.expectedMsgs[i].msg
result.add "\n " & expected.protocol.messages[expected.id].name
if i == mock.receivedMsgsCount: result.add " <- we are here"
result.add "\n"
proc addProtocol(mock: MockConf, p: ProtocolInfo): ProtocolInfo =
new result
deepCopy(result[], p[])
proc incomingMsgHandler(p: Peer, receivedMsgId: int, rlp: Rlp): Future[void] =
let (receivedMsgProto, receivedMsgInfo) = p.getMsgMetadata(receivedMsgId)
let expectedMsgIdx = mock.receivedMsgsCount
template fail(reason: string) =
stdout.write mock.expectationViolationMsg(reason, receivedMsgInfo)
quit 1
if expectedMsgIdx > mock.expectedMsgs.len:
fail "Mock peer received more messages than expected"
let expectedMsg = mock.expectedMsgs[expectedMsgIdx]
if receivedMsgInfo.id != expectedMsg.msg.id or
receivedMsgProto.name != expectedMsg.msg.protocol.name:
fail "Mock peer received an unexpected message"
inc mock.receivedMsgsCount
if expectedMsg.response != nil:
return expectedMsg.response(p, rlp)
else:
result = newFuture[void]()
result.complete()
for m in mitems(result.messages):
m.thunk = incomingMsgHandler
result.handshake = nil
# TODO This mock conf can override this
result.disconnectHandler = nil
mock.protocols.add result
proc addHandshake*(mock: MockConf, msg: auto) =
var msgInfo = makeProtoMsgPair(msg.type)
msgInfo.protocol = mock.addProtocol(msgInfo.protocol)
let expectedMsg = ExpectedMsg(msg: msgInfo, response: reply(msg))
when msg is p2p.hello:
devp2pHandshake = expectedMsg
else:
mock.handshakes.add expectedMsg
proc addCapability*(mock: MockConf, Protocol: type) =
mixin defaultTestingHandshake, protocolInfo
when compiles(defaultTestingHandshake(Protocol)):
mock.addHandshake(defaultTestingHandshake(Protocol))
else:
discard mock.addProtocol(Protocol.protocolInfo)
proc expectImpl(mock: MockConf, msg: ProtocolMessagePair, action: Action) =
mock.expectedMsgs.addLast ExpectedMsg(msg: msg, response: action)
macro expect*(mock: MockConf, MsgType: type, handler: untyped = nil): untyped =
if handler.kind in {nnkLambda, nnkDo}:
handler.addPragma ident("async")
result = newCall(
bindSym("expectImpl"),
mock,
newCall(bindSym"makeProtoMsgPair", MsgType.getType),
newCall(bindSym"toAction", handler))
proc newMockPeer*(userConfigurator: proc (m: MockConf)): EthereumNode =
var mockConf = new MockConf
mockConf.keys = newKeyPair()
mockConf.address = localhostAddress(nextUnusedMockPort)
inc nextUnusedMockPort
mockConf.networkId = 1'u
mockConf.clientId = "Mock Peer"
mockConf.waitForHello = true
mockConf.expectedMsgs = initDeque[ExpectedMsg]()
userConfigurator(mockConf)
var node = newEthereumNode(mockConf.keys,
mockConf.address,
mockConf.networkId,
mockConf.chain,
mockConf.clientId,
addAllCapabilities = false)
mockConf.handshakes.sort do (lhs, rhs: ExpectedMsg) -> int:
# this is intentially sorted in reverse order, so we
# can add them in the correct order below.
return -cmp(lhs.msg.protocol.index, rhs.msg.protocol.index)
for h in mockConf.handshakes:
mockConf.expectedMsgs.addFirst h
for p in mockConf.protocols:
node.addCapability p
when false:
# TODO: This part doesn't work correctly yet.
# rlpx{Connect,Accept} control the handshake.
if mockConf.devp2pHandshake.response != nil:
mockConf.expectedMsgs.addFirst mockConf.devp2pHandshake
else:
proc sendHello(p: Peer, data: Rlp) {.async.} =
await p.hello(devp2pVersion,
mockConf.clientId,
node.rlpxCapabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
mockConf.expectedMsgs.addFirst ExpectedMsg(
msg: makeProtoMsgPair(p2p.hello),
response: sendHello)
node.initProtocolStates()
node.startListening()
return node
proc rlpxConnect*(node, otherNode: EthereumNode): Future[Peer] =
let otherAsRemote = newNode(initENode(otherNode.keys.pubKey,
otherNode.address))
return rlpx.rlpxConnect(node, otherAsRemote)

View File

@ -1,12 +1,17 @@
# PeerPool attempts to keep connections to at least min_peers
# on the given network.
import
os, tables, times, random,
asyncdispatch2, chronicles, rlp, eth_keys,
private/types, discovery, kademlia, rlpx
const
lookupInterval = 5
connectLoopSleepMs = 2000
proc newPeerPool*(network: EthereumNode,
chainDb: AbstractChainDB, networkId: uint, keyPair: KeyPair,
networkId: uint, keyPair: KeyPair,
discovery: DiscoveryProtocol, clientId: string,
listenPort = Port(30303), minPeers = 10): PeerPool =
new result
@ -72,7 +77,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
# try:
# self.logger.debug("Connecting to %s...", remote)
# peer = await wait_with_token(
# handshake(remote, self.privkey, self.peer_class, self.chaindb, self.network_id),
# handshake(remote, self.privkey, self.peer_class, self.network_id),
# token=self.cancel_token,
# timeout=HANDSHAKE_TIMEOUT)
# return peer
@ -97,40 +102,10 @@ proc lookupRandomNode(p: PeerPool) {.async.} =
proc getRandomBootnode(p: PeerPool): seq[Node] =
@[p.discovery.bootstrapNodes.rand()]
proc peerFinished(p: PeerPool, peer: Peer) =
## Remove the given peer from our list of connected nodes.
## This is passed as a callback to be called when a peer finishes.
p.connectedNodes.del(peer.remote)
for o in p.observers.values:
if not o.onPeerDisconnected.isNil:
o.onPeerDisconnected(peer)
proc run(peer: Peer, peerPool: PeerPool) {.async.} =
# TODO: This is a stub that should be implemented in rlpx.nim
try:
while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == 1:
debug "Run got disconnect msg", reason = nextMsgData.listElem(0).toInt(uint32).DisconnectionReason, peer
break
else:
# debug "Got msg: ", msg = nextMsgId
await peer.dispatchMsg(nextMsgId, nextMsgData)
except:
error "Failed to read from peer",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
peerPool.peerFinished(peer)
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
let peer = await p.connect(n)
if not peer.isNil:
info "Connection established", peer
ensureFuture peer.run(p)
p.connectedNodes[peer.remote] = peer
for o in p.observers.values:
if not o.onPeerConnected.isNil:

View File

@ -1,4 +1,10 @@
block:
import
deques, tables,
package_visible_types,
rlp, asyncdispatch2, eth_common/eth_types, eth_keys,
../enode, ../kademlia, ../discovery, ../options, ../rlpxcrypt
packageTypes:
type
EthereumNode* = ref object
networkId*: uint
@ -15,9 +21,9 @@ block:
peerPool*: PeerPool
Peer* = ref object
transp: StreamTransport
transport: StreamTransport
dispatcher: Dispatcher
nextReqId: int
lastReqId*: int
network*: EthereumNode
secretsState: SecretState
connectionState: ConnectionState
@ -27,7 +33,7 @@ block:
awaitedMessages: seq[FutureBase]
OutstandingRequest = object
reqId: int
id: int
future: FutureBase
timeoutAt: uint64
@ -85,12 +91,14 @@ block:
#
protocolOffsets: seq[int]
messages: seq[ptr MessageInfo]
activeProtocols: seq[ProtocolInfo]
PeerObserver* = object
onPeerConnected*: proc(p: Peer)
onPeerDisconnected*: proc(p: Peer)
MessageHandler = proc(x: Peer, data: Rlp): Future[void]
MessageHandlerDecorator = proc(msgId: int, n: NimNode): NimNode
MessageHandler = proc(x: Peer, msgId: int, data: Rlp): Future[void]
MessageContentPrinter = proc(msg: pointer): string
RequestResolver = proc(msg: pointer, future: FutureBase)
NextMsgResolver = proc(msgData: Rlp, future: FutureBase)
@ -98,7 +106,7 @@ block:
NetworkStateInitializer = proc(network: EthereumNode): RootRef
HandshakeStep = proc(peer: Peer): Future[void]
DisconnectionHandler = proc(peer: Peer,
reason: DisconnectionReason): Future[void]
reason: DisconnectionReason): Future[void] {.gcsafe.}
RlpxMessageKind* = enum
rlpxNotification,
@ -133,9 +141,8 @@ block:
MalformedMessageError* = object of Exception
UnexpectedDisconnectError* = object of Exception
PeerDisconnected* = object of Exception
reason*: DisconnectionReason
UselessPeerError* = object of Exception

File diff suppressed because it is too large Load Diff

View File

@ -12,9 +12,8 @@
## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
import
random, algorithm, hashes,
asyncdispatch2, rlp, stint, eth_common, chronicles,
../../eth_p2p
asyncdispatch2, stint, chronicles, rlp, eth_common/eth_types,
../rlpx, ../private/types, ../blockchain_utils, ../../eth_p2p
type
NewBlockHashesAnnounce* = object
@ -25,26 +24,21 @@ type
header: BlockHeader
body {.rlpInline.}: BlockBody
NetworkState = object
syncing: bool
PeerState = object
initialized: bool
bestBlockHash: KeccakHash
bestDifficulty: DifficultyInt
PeerState = ref object
initialized*: bool
bestBlockHash*: KeccakHash
bestDifficulty*: DifficultyInt
const
maxStateFetch = 384
maxBodiesFetch = 128
maxReceiptsFetch = 256
maxHeadersFetch = 192
protocolVersion = 63
minPeersToStartSync = 2 # Wait for consensus of at least this number of peers before syncing
maxStateFetch* = 384
maxBodiesFetch* = 128
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
protocolVersion* = 63
rlpxProtocol eth, protocolVersion:
useRequestIds = false
type State = PeerState
rlpxProtocol eth(version = protocolVersion,
peerState = PeerState,
useRequestIds = false):
onPeerConnected do (peer: Peer):
let
@ -58,9 +52,9 @@ rlpxProtocol eth, protocolVersion:
bestBlock.blockHash,
chain.genesisHash)
let m = await peer.waitSingleMsg(eth.status)
let m = await peer.nextMsg(eth.status)
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
debug "Suitable peer", peer
debug "suitable peer", peer
else:
raise newException(UselessPeerError, "Eth handshake params mismatch")
peer.state.initialized = true
@ -72,16 +66,7 @@ rlpxProtocol eth, protocolVersion:
networkId: uint,
totalDifficulty: DifficultyInt,
bestHash: KeccakHash,
genesisHash: KeccakHash) =
# verify that the peer is on the same chain:
if peer.network.networkId != networkId or
peer.network.chain.genesisHash != genesisHash:
# TODO: Is there a more specific reason here?
await peer.disconnect(SubprotocolReason)
return
peer.state.bestBlockHash = bestHash
peer.state.bestDifficulty = totalDifficulty
genesisHash: KeccakHash)
proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) =
discard
@ -95,19 +80,7 @@ rlpxProtocol eth, protocolVersion:
await peer.disconnect(BreachOfProtocol)
return
var headers = newSeqOfCap[BlockHeader](request.maxResults)
let chain = peer.network.chain
var foundBlock: BlockHeader
if chain.getBlockHeader(request.startBlock, foundBlock):
headers.add foundBlock
while uint64(headers.len) < request.maxResults:
if not chain.getSuccessorHeader(foundBlock, foundBlock):
break
headers.add foundBlock
await peer.blockHeaders(headers)
await peer.blockHeaders(peer.network.chain.getBlockHeaders(request))
proc blockHeaders(p: Peer, headers: openarray[BlockHeader])
@ -117,18 +90,7 @@ rlpxProtocol eth, protocolVersion:
await peer.disconnect(BreachOfProtocol)
return
var chain = peer.network.chain
var blockBodies = newSeqOfCap[BlockBody](hashes.len)
for hash in hashes:
let blockBody = chain.getBlockBody(hash)
if not blockBody.isNil:
# TODO: should there be an else clause here.
# Is the peer responsible of figuring out that
# some blocks were not found?
blockBodies.add deref(blockBody)
await peer.blockBodies(blockBodies)
await peer.blockBodies(peer.network.chain.getBlockBodies(hashes))
proc blockBodies(peer: Peer, blocks: openarray[BlockBody])
@ -139,18 +101,13 @@ rlpxProtocol eth, protocolVersion:
requestResponse:
proc getNodeData(peer: Peer, hashes: openarray[KeccakHash]) =
await peer.nodeData([])
await peer.nodeData(peer.network.chain.getStorageNodes(hashes))
proc nodeData(peer: Peer, data: openarray[Blob]) =
discard
proc nodeData(peer: Peer, data: openarray[Blob])
requestResponse:
proc getReceipts(peer: Peer, hashes: openarray[KeccakHash]) =
await peer.receipts([])
proc receipts(peer: Peer, receipts: openarray[Receipt]) =
discard
proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p))
await peer.receipts(peer.network.chain.getReceipts(hashes))
proc receipts(peer: Peer, receipts: openarray[Receipt])

View File

@ -0,0 +1,501 @@
import
tables, sets,
chronicles, asyncdispatch2, rlp, eth_common/eth_types,
../../rlpx, ../../private/types, private/les_types
const
maxSamples = 100000
rechargingScale = 1000000
lesStatsKey = "les.flow_control.stats"
lesStatsVer = 0
logScope:
topics = "les flow_control"
# TODO: move this somewhere
proc pop[A, B](t: var Table[A, B], key: A): B =
result = t[key]
t.del(key)
when LesTime is SomeInteger:
template `/`(lhs, rhs: LesTime): LesTime =
lhs div rhs
when defined(testing):
var lesTime* = LesTime(0)
template now(): LesTime = lesTime
template advanceTime(t) = lesTime += LesTime(t)
else:
import times
let startTime = epochTime()
proc now(): LesTime =
return LesTime((times.epochTime() - startTime) * 1000.0)
proc addSample(ra: var StatsRunningAverage; x, y: float64) =
if ra.count >= maxSamples:
let decay = float64(ra.count + 1 - maxSamples) / maxSamples
template applyDecay(x) = x -= x * decay
applyDecay ra.sumX
applyDecay ra.sumY
applyDecay ra.sumXX
applyDecay ra.sumXY
ra.count = maxSamples - 1
inc ra.count
ra.sumX += x
ra.sumY += y
ra.sumXX += x * x
ra.sumXY += x * y
proc calc(ra: StatsRunningAverage): tuple[m, b: float] =
if ra.count == 0:
return
let count = float64(ra.count)
let d = count * ra.sumXX - ra.sumX * ra.sumX
if d < 0.001:
return (m: ra.sumY / count, b: 0.0)
result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d
result.b = (ra.sumY / count) - (result.m * ra.sumX / count)
proc currentRequestsCosts*(network: LesNetwork,
les: ProtocolInfo): seq[ReqCostInfo] =
# Make sure the message costs are already initialized
doAssert network.messageStats.len > les.messages[^1].id,
"Have you called `initFlowControl`"
for msg in les.messages:
var (m, b) = network.messageStats[msg.id].calc()
if m < 0:
b += m
m = 0
if b < 0:
b = 0
result.add ReqCostInfo.init(msgId = msg.id,
baseCost = ReqCostInt(b * 2),
reqCost = ReqCostInt(m * 2))
proc persistMessageStats*(db: AbstractChainDB,
network: LesNetwork) =
doAssert db != nil
# XXX: Because of the package_visible_types template magic, Nim complains
# when we pass the messageStats expression directly to `encodeList`
let stats = network.messageStats
db.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats))
proc loadMessageStats*(network: LesNetwork,
les: ProtocolInfo,
db: AbstractChainDb): bool =
block readFromDB:
if db == nil:
break readFromDB
var stats = db.getSetting(lesStatsKey)
if stats.len == 0:
notice "LES stats not present in the database"
break readFromDB
try:
var statsRlp = rlpFromBytes(stats.toRange)
statsRlp.enterList
let version = statsRlp.read(int)
if version != lesStatsVer:
notice "Found outdated LES stats record"
break readFromDB
statsRlp >> network.messageStats
if network.messageStats.len <= les.messages[^1].id:
notice "Found incomplete LES stats record"
break readFromDB
return true
except RlpError:
error "Error while loading LES message stats",
err = getCurrentExceptionMsg()
newSeq(network.messageStats, les.messages[^1].id + 1)
return false
proc update(s: var FlowControlState, t: LesTime) =
let dt = max(t - s.lastUpdate, LesTime(0))
s.bufValue = min(
s.bufValue + s.minRecharge * dt,
s.bufLimit)
s.lastUpdate = t
proc init(s: var FlowControlState,
bufLimit: BufValueInt, minRecharge: int, t: LesTime) =
s.bufValue = bufLimit
s.bufLimit = bufLimit
s.minRecharge = minRecharge
s.lastUpdate = t
func canMakeRequest(s: FlowControlState,
maxCost: ReqCostInt): (LesTime, float64) =
## Returns the required waiting time before sending a request and
## the estimated buffer level afterwards (as a fraction of the limit)
const safetyMargin = 50
var maxCost = min(
maxCost + safetyMargin * s.minRecharge,
s.bufLimit)
if s.bufValue >= maxCost:
result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit)
else:
result[0] = (maxCost - s.bufValue) / s.minRecharge
func canServeRequest(srv: LesNetwork): bool =
result = srv.reqCount < srv.maxReqCount and
srv.reqCostSum < srv.maxReqCostSum
proc rechargeReqCost(peer: LesPeer, t: LesTime) =
let dt = t - peer.lastRechargeTime
peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale
peer.lastRechargeTime = t
if peer.isRecharging and t >= peer.rechargingEndsAt:
peer.isRecharging = false
peer.reqCostGradient = 0
peer.reqCostVal = 0
proc updateRechargingParams(peer: LesPeer, network: LesNetwork) =
peer.reqCostGradient = 0
if peer.reqCount > 0:
peer.reqCostGradient = rechargingScale / network.reqCount
if peer.isRecharging:
peer.reqCostGradient = (network.rechargingRate * peer.rechargingPower /
network.totalRechargingPower )
peer.rechargingEndsAt = peer.lastRechargeTime +
LesTime(peer.reqCostVal * rechargingScale /
-peer.reqCostGradient )
proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) =
peer.reqCount += reqCountChange
network.reqCount += reqCountChange
doAssert peer.reqCount >= 0 and network.reqCount >= 0
if peer.reqCount == 0:
# All requests have been finished. Start recharging.
peer.isRecharging = true
network.totalRechargingPower += peer.rechargingPower
elif peer.reqCount == reqCountChange and peer.isRecharging:
# `peer.reqCount` must have been 0 for the condition above to hold.
# This is a transition from recharging to serving state.
peer.isRecharging = false
network.totalRechargingPower -= peer.rechargingPower
peer.startReqCostVal = peer.reqCostVal
updateRechargingParams peer, network
proc updateFlowControl(network: LesNetwork, t: LesTime) =
while true:
var firstTime = t
for peer in network.peers:
# TODO: perhaps use a bin heap here
if peer.isRecharging and peer.rechargingEndsAt < firstTime:
firstTime = peer.rechargingEndsAt
let rechargingEndedForSomePeer = firstTime < t
network.reqCostSum = 0
for peer in network.peers:
peer.rechargeReqCost firstTime
network.reqCostSum += peer.reqCostVal
if rechargingEndedForSomePeer:
for peer in network.peers:
if peer.isRecharging:
updateRechargingParams peer, network
else:
network.lastUpdate = t
return
proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) =
if peer.reqCount > 0:
network.updateFlowControl t
network.trackRequests peer, -1
network.updateFlowControl t
proc enlistInFlowControl*(network: LesNetwork,
peer: LesPeer,
peerRechargingPower = 100) =
let t = now()
assert peer.isServer or peer.isClient
# Each Peer must be potential communication partner for us.
# There will be useless peers on the network, but the logic
# should make sure to disconnect them earlier in `onPeerConnected`.
if peer.isServer:
peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.pendingReqs = initTable[int, ReqCostInt]()
if peer.isClient:
peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.lastRechargeTime = t
peer.rechargingEndsAt = t
peer.rechargingPower = peerRechargingPower
network.updateFlowControl t
proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) =
let t = now()
# XXX: perhaps this is not safe with our reqCount logic.
# The original code may depend on the binarity of the `serving` flag.
network.endPendingRequest peer, t
network.updateFlowControl t
proc initFlowControl*(network: LesNetwork, les: ProtocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget: int,
db: AbstractChainDb = nil) =
network.rechargingRate = (rechargingScale * rechargingScale) /
(100 * rechargingScale / reqCostTarget - rechargingScale)
network.maxReqCount = maxReqCount
network.maxReqCostSum = maxReqCostSum
if not network.loadMessageStats(les, db):
warn "Failed to load persisted LES message stats. " &
"Flow control will be re-initilized."
proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) =
peer.localFlowState.update now()
return peer.localFlowState.canMakeRequest(maxCost)
template getRequestCost(peer: LesPeer, localOrRemote: untyped,
msgId, costQuantity: int): ReqCostInt =
template msgCostInfo: untyped = peer.`localOrRemote ReqCosts`[msgId]
min(msgCostInfo.baseCost + msgCostInfo.reqCost * costQuantity,
peer.`localOrRemote FlowState`.bufLimit)
proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer,
msgId, reqId, costQuantity: int) =
let maxCost = peer.getRequestCost(local, msgId, costQuantity)
peer.localFlowState.bufValue -= maxCost
peer.pendingReqsCost += maxCost
peer.pendingReqs[reqId] = peer.pendingReqsCost
proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) =
let bv = min(bv, peer.localFlowState.bufLimit)
if not peer.pendingReqs.hasKey(reqId):
return
let costsSumAtSending = peer.pendingReqs.pop(reqId)
let costsSumChange = peer.pendingReqsCost - costsSumAtSending
peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange
else: 0
peer.localFlowState.lastUpdate = now()
proc acceptRequest*(network: LesNetwork, peer: LesPeer,
msgId, costQuantity: int): Future[bool] {.async.} =
let t = now()
let reqCost = peer.getRequestCost(remote, msgId, costQuantity)
peer.remoteFlowState.update t
network.updateFlowControl t
while not network.canServeRequest:
await sleepAsync(10)
if peer notin network.peers:
# The peer was disconnected or the network
# was shut down while we waited
return false
network.trackRequests peer, +1
network.updateFlowControl network.lastUpdate
if reqCost > peer.remoteFlowState.bufValue:
error "LES peer sent request too early",
recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale /
peer.remoteFlowState.minRecharge
return false
return true
proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer,
msgId: int, quantity: int): BufValueInt =
let t = now()
let costs = peer.remoteReqCosts[msgId]
var reqCost = costs.baseCost + quantity * costs.reqCost
peer.remoteFlowState.update t
peer.remoteFlowState.bufValue -= reqCost
network.endPendingRequest peer, t
let curReqCost = peer.reqCostVal
if curReqCost < peer.remoteFlowState.bufLimit:
let bv = peer.remoteFlowState.bufLimit - curReqCost
if bv > peer.remoteFlowState.bufValue:
peer.remoteFlowState.bufValue = bv
network.messageStats[msgId].addSample(float64(quantity),
float64(curReqCost - peer.startReqCostVal))
return peer.remoteFlowState.bufValue
when defined(testing):
import unittest, random, ../../rlpx
proc isMax(s: FlowControlState): bool =
s.bufValue == s.bufLimit
rlpxProtocol dummyLes(version = 1, shortName = "abc"):
proc a(p: Peer)
proc b(p: Peer)
proc c(p: Peer)
proc d(p: Peer)
proc e(p: Peer)
template fequals(lhs, rhs: float64, epsilon = 0.0001): bool =
abs(lhs-rhs) < epsilon
proc tests* =
randomize(3913631)
suite "les flow control":
suite "running averages":
test "consistent costs":
var s: StatsRunningAverage
for i in 0..100:
s.addSample(5.0, 100.0)
let (cost, base) = s.calc
check:
fequals(cost, 100.0)
fequals(base, 0.0)
test "randomized averages":
proc performTest(qBase, qRandom: int, cBase, cRandom: float64) =
var
s: StatsRunningAverage
expectedFinalCost = cBase + cRandom / 2
error = expectedFinalCost
for samples in [100, 1000, 10000]:
for i in 0..samples:
let q = float64(qBase + rand(10))
s.addSample(q, q * (cBase + rand(cRandom)))
let (newCost, newBase) = s.calc
# With more samples, our error should decrease, getting
# closer and closer to the average (unless we are already close enough)
let newError = abs(newCost - expectedFinalCost)
check newError < error
error = newError
# After enough samples we should be very close the the final result
check error < (expectedFinalCost * 0.02)
performTest(1, 10, 5.0, 100.0)
performTest(1, 4, 200.0, 1000.0)
suite "buffer value calculations":
type TestReq = object
peer: LesPeer
msgId, quantity: int
accepted: bool
setup:
var lesNetwork = new LesNetwork
lesNetwork.peers = initSet[LesPeer]()
lesNetwork.initFlowControl(dummyLes.protocolInfo,
reqCostTarget = 300,
maxReqCount = 5,
maxReqCostSum = 1000)
for i in 0 ..< lesNetwork.messageStats.len:
lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0)
var client = new LesPeer
client.isClient = true
var server = new LesPeer
server.isServer = true
var clientServer = new LesPeer
clientServer.isClient = true
clientServer.isServer = true
var client2 = new LesPeer
client2.isClient = true
var client3 = new LesPeer
client3.isClient = true
var bv: BufValueInt
template enlist(peer: LesPeer) {.dirty.} =
let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo)
peer.remoteReqCosts = reqCosts
peer.localReqCosts = reqCosts
lesNetwork.peers.incl peer
lesNetwork.enlistInFlowControl peer
template startReq(p: LesPeer, msg, q: int): TestReq =
var req: TestReq
req.peer = p
req.msgId = msg
req.quantity = q
req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q)
req
template endReq(req: TestReq): BufValueInt =
bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity)
test "single peer recharging":
lesNetwork.bufferLimit = 1000
lesNetwork.minRechargingRate = 100
enlist client
check:
client.remoteFlowState.isMax
client.rechargingPower > 0
advanceTime 100
let r1 = client.startReq(0, 100)
check r1.accepted
check client.isRecharging == false
advanceTime 50
let r2 = client.startReq(1, 1)
check r2.accepted
check client.isRecharging == false
advanceTime 25
bv = endReq r2
check client.isRecharging == false
advanceTime 130
bv = endReq r1
check client.isRecharging == true
advanceTime 300
lesNetwork.updateFlowControl now()
check:
client.isRecharging == false
client.remoteFlowState.isMax

View File

@ -0,0 +1,113 @@
import
hashes, tables, sets,
package_visible_types,
eth_common/eth_types
packageTypes:
type
AnnounceType* = enum
None,
Simple,
Signed,
Unspecified
ReqCostInfo = object
msgId: int
baseCost, reqCost: ReqCostInt
FlowControlState = object
bufValue, bufLimit: int
minRecharge: int
lastUpdate: LesTime
StatsRunningAverage = object
sumX, sumY, sumXX, sumXY: float64
count: int
LesPeer* = ref object
isServer*: bool
isClient*: bool
announceType*: AnnounceType
bestDifficulty*: DifficultyInt
bestBlockHash*: KeccakHash
bestBlockNumber*: BlockNumber
hasChainSince: HashOrNum
hasStateSince: HashOrNum
relaysTransactions: bool
# The variables below are used to implement the flow control
# mechanisms of LES from our point of view as a server.
# They describe how much load has been generated by this
# particular peer.
reqCount: int # How many outstanding requests are there?
#
rechargingPower: int # Do we give this peer any extra priority
# (implemented as a faster recharning rate)
# 100 is the default. You can go higher and lower.
#
isRecharging: bool # This is true while the peer is not making
# any requests
#
reqCostGradient: int # Measures the speed of recharging or accumulating
# "requests cost" at any given moment.
#
reqCostVal: int # The accumulated "requests cost"
#
rechargingEndsAt: int # When will recharging end?
# (the buffer of the Peer will be fully restored)
#
lastRechargeTime: LesTime # When did we last update the recharging parameters
#
startReqCostVal: int # TODO
remoteFlowState: FlowControlState
remoteReqCosts: seq[ReqCostInfo]
# The next variables are used to limit ourselves as a client in order to
# not violate the control-flow requirements of the remote LES server.
pendingReqs: Table[int, ReqCostInt]
pendingReqsCost: int
localFlowState: FlowControlState
localReqCosts: seq[ReqCostInfo]
LesNetwork* = ref object
peers: HashSet[LesPeer]
messageStats: seq[StatsRunningAverage]
ourAnnounceType*: AnnounceType
# The fields below are relevant when serving data.
bufferLimit: int
minRechargingRate: int
reqCostSum, maxReqCostSum: ReqCostInt
reqCount, maxReqCount: int
sumWeigth: int
rechargingRate: int
totalRechargedUnits: int
totalRechargingPower: int
lastUpdate: LesTime
KeyValuePair = object
key: string
value: Blob
HandshakeError = object of Exception
LesTime = int # this is in milliseconds
BufValueInt = int
ReqCostInt = int
template hash*(peer: LesPeer): Hash = hash(cast[pointer](peer))
template areWeServingData*(network: LesNetwork): bool =
network.maxReqCount != 0
template areWeRequestingData*(network: LesNetwork): bool =
network.ourAnnounceType != AnnounceType.Unspecified

View File

@ -9,64 +9,26 @@
#
import
times,
chronicles, asyncdispatch2, rlp, eth_common/eth_types,
../../eth_p2p
times, tables, options, sets, hashes, strutils, macros,
chronicles, asyncdispatch2, nimcrypto/[keccak, hash],
rlp, eth_common/eth_types, eth_keys,
../rlpx, ../kademlia, ../private/types, ../blockchain_utils,
les/private/les_types, les/flow_control
type
ProofRequest* = object
blockHash*: KeccakHash
accountKey*: Blob
key*: Blob
fromLevel*: uint
HeaderProofRequest* = object
chtNumber*: uint
blockNumber*: uint
fromLevel*: uint
ContractCodeRequest* = object
blockHash*: KeccakHash
key*: EthAddress
HelperTrieProofRequest* = object
subType*: uint
sectionIdx*: uint
key*: Blob
fromLevel*: uint
auxReq*: uint
TransactionStatus* = enum
Unknown,
Queued,
Pending,
Included,
Error
TransactionStatusMsg* = object
status*: TransactionStatus
data*: Blob
PeerState = object
buffer: int
lastRequestTime: float
reportedTotalDifficulty: DifficultyInt
KeyValuePair = object
key: string
value: Blob
les_types.forwardPublicTypes
const
lesVersion = 2'u
maxHeadersFetch = 192
maxBodiesFetch = 32
maxReceiptsFetch = 128
maxCodeFetch = 64
maxProofsFetch = 64
maxHeaderProofsFetch = 64
maxTransactionsFetch = 64
# Handshake properties:
# https://github.com/zsfelfoldi/go-ethereum/wiki/Light-Ethereum-Subprotocol-(LES)
const
keyProtocolVersion = "protocolVersion"
## P: is 1 for the LPV1 protocol version.
@ -110,98 +72,393 @@ const
## see Client Side Flow Control:
## https://github.com/zsfelfoldi/go-ethereum/wiki/Client-Side-Flow-Control-model-for-the-LES-protocol
const
rechargeRate = 0.3
keyAnnounceType = "announceType"
keyAnnounceSignature = "sign"
proc getPeerWithNewestChain(pool: PeerPool): Peer =
discard
proc initProtocolState(network: LesNetwork, node: EthereumNode) =
network.peers = initSet[LesPeer]()
rlpxProtocol les, 2:
proc addPeer(network: LesNetwork, peer: LesPeer) =
network.enlistInFlowControl peer
network.peers.incl peer
type State = PeerState
proc removePeer(network: LesNetwork, peer: LesPeer) =
network.delistFromFlowControl peer
network.peers.excl peer
template costQuantity(quantityExpr: int, max: int) {.pragma.}
proc getCostQuantity(fn: NimNode): tuple[quantityExpr, maxQuantity: NimNode] =
# XXX: `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let p = fn.pragma
assert p.kind == nnkPragma and p.len > 0 and $p[0][0] == "costQuantity"
result.quantityExpr = p[0][1]
result.maxQuantity= p[0][2]
if result.maxQuantity.kind == nnkExprEqExpr:
result.maxQuantity = result.maxQuantity[1]
macro outgoingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
result.body.add quote do:
trackOutgoingRequest(msgRecipient.networkState(les),
msgRecipient.state(les),
perProtocolMsgId, reqId, `costQuantity`)
# echo result.repr
macro incomingResponseDecorator(n: untyped): untyped =
result = n
let trackingCall = quote do:
trackIncomingResponse(msgSender.state(les), reqId, msg.bufValue)
result.body.insert(n.body.len - 1, trackingCall)
# echo result.repr
macro incomingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
template acceptStep(quantityExpr, maxQuantity) {.dirty.} =
let requestCostQuantity = quantityExpr
if requestCostQuantity > maxQuantity:
await peer.disconnect(BreachOfProtocol)
return
let lesPeer = peer.state
let lesNetwork = peer.networkState
if not await acceptRequest(lesNetwork, lesPeer,
perProtocolMsgId,
requestCostQuantity): return
result.body.insert(1, getAst(acceptStep(costQuantity, maxQuantity)))
# echo result.repr
template updateBV: BufValueInt =
bufValueAfterRequest(lesNetwork, lesPeer,
perProtocolMsgId, requestCostQuantity)
func getValue(values: openarray[KeyValuePair],
key: string, T: typedesc): Option[T] =
for v in values:
if v.key == key:
return some(rlp.decode(v.value, T))
func getRequiredValue(values: openarray[KeyValuePair],
key: string, T: typedesc): T =
for v in values:
if v.key == key:
return rlp.decode(v.value, T)
raise newException(HandshakeError,
"Required handshake field " & key & " missing")
rlpxProtocol les(version = lesVersion,
peerState = LesPeer,
networkState = LesNetwork,
outgoingRequestDecorator = outgoingRequestDecorator,
incomingRequestDecorator = incomingRequestDecorator,
incomingResponseThunkDecorator = incomingResponseDecorator):
## Handshake
##
proc status(p: Peer, values: openarray[KeyValuePair]) =
discard
proc status(p: Peer, values: openarray[KeyValuePair])
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
lesPeer = peer.state
lesNetwork = peer.networkState
template `=>`(k, v: untyped): untyped =
KeyValuePair.init(key = k, value = rlp.encode(v))
var lesProperties = @[
keyProtocolVersion => lesVersion,
keyNetworkId => network.networkId,
keyHeadTotalDifficulty => bestBlock.difficulty,
keyHeadHash => bestBlock.blockHash,
keyHeadNumber => bestBlock.blockNumber,
keyGenesisHash => chain.genesisHash
]
lesPeer.remoteReqCosts = currentRequestsCosts(lesNetwork, les.protocolInfo)
if lesNetwork.areWeServingData:
lesProperties.add [
# keyServeHeaders => nil,
keyServeChainSince => 0,
keyServeStateSince => 0,
# keyRelaysTransactions => nil,
keyFlowControlBL => lesNetwork.bufferLimit,
keyFlowControlMRR => lesNetwork.minRechargingRate,
keyFlowControlMRC => lesPeer.remoteReqCosts
]
if lesNetwork.areWeRequestingData:
lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType)
let
s = await peer.nextMsg(les.status)
peerNetworkId = s.values.getRequiredValue(keyNetworkId, uint)
peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash)
peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint)
template requireCompatibility(peerVar, localVar, varName: untyped) =
if localVar != peerVar:
raise newException(HandshakeError,
"Incompatibility detected! $1 mismatch ($2 != $3)" %
[varName, $localVar, $peerVar])
requireCompatibility(peerLesVersion, lesVersion, "les version")
requireCompatibility(peerNetworkId, network.networkId, "network id")
requireCompatibility(peerGenesisHash, chain.genesisHash, "genesis hash")
template `:=`(lhs, key) =
lhs = s.values.getRequiredValue(key, type(lhs))
lesPeer.bestBlockHash := keyHeadHash
lesPeer.bestBlockNumber := keyHeadNumber
lesPeer.bestDifficulty := keyHeadTotalDifficulty
let peerAnnounceType = s.values.getValue(keyAnnounceType, AnnounceType)
if peerAnnounceType.isSome:
lesPeer.isClient = true
lesPeer.announceType = peerAnnounceType.get
else:
lesPeer.announceType = AnnounceType.Simple
lesPeer.hasChainSince := keyServeChainSince
lesPeer.hasStateSince := keyServeStateSince
lesPeer.relaysTransactions := keyRelaysTransactions
lesPeer.localFlowState.bufLimit := keyFlowControlBL
lesPeer.localFlowState.minRecharge := keyFlowControlMRR
lesPeer.localReqCosts := keyFlowControlMRC
lesNetwork.addPeer lesPeer
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.removePeer peer.state
## Header synchronisation
##
proc announce(p: Peer,
proc announce(
peer: Peer,
headHash: KeccakHash,
headNumber: BlockNumber,
headTotalDifficulty: DifficultyInt,
reorgDepth: BlockNumber,
values: openarray[KeyValuePair],
announceType: uint) =
discard
announceType: AnnounceType) =
if peer.state.announceType == AnnounceType.None:
error "unexpected announce message", peer
return
if announceType == AnnounceType.Signed:
let signature = values.getValue(keyAnnounceSignature, Blob)
if signature.isNone:
error "missing announce signature"
return
let sigHash = keccak256.digest rlp.encodeList(headHash,
headNumber,
headTotalDifficulty)
let signerKey = recoverKeyFromSignature(signature.get.initSignature,
sigHash)
if signerKey.toNodeId != peer.remote.id:
error "invalid announce signature"
# TODO: should we disconnect this peer?
return
# TODO: handle new block
requestResponse:
proc getBlockHeaders(p: Peer, BV: uint, req: BlocksRequest) =
discard
proc getBlockHeaders(
peer: Peer,
req: BlocksRequest) {.
costQuantity(req.maxResults.int, max = maxHeadersFetch).} =
proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeader]) =
discard
let headers = peer.network.chain.getBlockHeaders(req)
await peer.blockHeaders(reqId, updateBV(), headers)
proc blockHeaders(
peer: Peer,
bufValue: BufValueInt,
blocks: openarray[BlockHeader])
## On-damand data retrieval
##
requestResponse:
proc getBlockBodies(p: Peer, blocks: openarray[KeccakHash]) =
discard
proc getBlockBodies(
peer: Peer,
blocks: openarray[KeccakHash]) {.
costQuantity(blocks.len, max = maxBodiesFetch).} =
proc blockBodies(p: Peer, BV: uint, bodies: openarray[BlockBody]) =
discard
let blocks = peer.network.chain.getBlockBodies(blocks)
await peer.blockBodies(reqId, updateBV(), blocks)
proc blockBodies(
peer: Peer,
bufValue: BufValueInt,
bodies: openarray[BlockBody])
requestResponse:
proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) =
discard
proc getReceipts(
peer: Peer,
hashes: openarray[KeccakHash])
{.costQuantity(hashes.len, max = maxReceiptsFetch).} =
proc receipts(p: Peer, BV: uint, receipts: openarray[Receipt]) =
discard
let receipts = peer.network.chain.getReceipts(hashes)
await peer.receipts(reqId, updateBV(), receipts)
proc receipts(
peer: Peer,
bufValue: BufValueInt,
receipts: openarray[Receipt])
requestResponse:
proc getProofs(p: Peer, proofs: openarray[ProofRequest]) =
discard
proc getProofs(
peer: Peer,
proofs: openarray[ProofRequest]) {.
costQuantity(proofs.len, max = maxProofsFetch).} =
proc proofs(p: Peer, BV: uint, proofs: openarray[Blob]) =
discard
let proofs = peer.network.chain.getProofs(proofs)
await peer.proofs(reqId, updateBV(), proofs)
proc proofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openarray[Blob])
requestResponse:
proc getContractCodes(p: Peer, requests: seq[ContractCodeRequest]) =
discard
proc getContractCodes(
peer: Peer,
reqs: seq[ContractCodeRequest]) {.
costQuantity(reqs.len, max = maxCodeFetch).} =
proc contractCodes(p: Peer, BV: uint, results: seq[Blob]) =
discard
let results = peer.network.chain.getContractCodes(reqs)
await peer.contractCodes(reqId, updateBV(), results)
proc contractCodes(
peer: Peer,
bufValue: BufValueInt,
results: seq[Blob])
nextID 15
requestResponse:
proc getHeaderProofs(p: Peer, requests: openarray[ProofRequest]) =
discard
proc getHeaderProofs(
peer: Peer,
reqs: openarray[ProofRequest]) {.
costQuantity(reqs.len, max = maxHeaderProofsFetch).} =
proc headerProof(p: Peer, BV: uint, proofs: openarray[Blob]) =
discard
let proofs = peer.network.chain.getHeaderProofs(reqs)
await peer.headerProofs(reqId, updateBV(), proofs)
proc headerProofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openarray[Blob])
requestResponse:
proc getHelperTrieProofs(p: Peer, requests: openarray[HelperTrieProofRequest]) =
discard
proc getHelperTrieProofs(
peer: Peer,
reqs: openarray[HelperTrieProofRequest]) {.
costQuantity(reqs.len, max = maxProofsFetch).} =
proc helperTrieProof(p: Peer, BV: uint, nodes: seq[Blob], auxData: seq[Blob]) =
discard
var nodes, auxData: seq[Blob]
peer.network.chain.getHelperTrieProofs(reqs, nodes, auxData)
await peer.helperTrieProofs(reqId, updateBV(), nodes, auxData)
proc helperTrieProofs(
peer: Peer,
bufValue: BufValueInt,
nodes: seq[Blob],
auxData: seq[Blob])
## Transaction relaying and status retrieval
##
requestResponse:
proc sendTxV2(p: Peer, transactions: openarray[Transaction]) =
discard
proc sendTxV2(
peer: Peer,
transactions: openarray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
proc getTxStatus(p: Peer, transactions: openarray[Transaction]) =
discard
let chain = peer.network.chain
proc txStatus(p: Peer, BV: uint, transactions: openarray[TransactionStatusMsg]) =
discard
var results: seq[TransactionStatusMsg]
for t in transactions:
let hash = t.rlpHash # TODO: this is not optimal, we can compute
# the hash from the request bytes.
# The RLP module can offer a helper Hashed[T]
# to make this easy.
var s = chain.getTransactionStatus(hash)
if s.status == TransactionStatus.Unknown:
chain.addTransactions([t])
s = chain.getTransactionStatus(hash)
results.add s
await peer.txStatus(reqId, updateBV(), results)
proc getTxStatus(
peer: Peer,
transactions: openarray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
let chain = peer.network.chain
var results: seq[TransactionStatusMsg]
for t in transactions:
results.add chain.getTransactionStatus(t.rlpHash)
await peer.txStatus(reqId, updateBV(), results)
proc txStatus(
peer: Peer,
bufValue: BufValueInt,
transactions: openarray[TransactionStatusMsg])
proc configureLes*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Simple,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
doAssert announceType != AnnounceType.Unspecified or maxReqCount > 0
var lesNetwork = node.protocolState(les)
lesNetwork.ourAnnounceType = announceType
initFlowControl(lesNetwork, les.protocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget,
node.chain)
proc configureLesServer*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Unspecified,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
## This is similar to `configureLes`, but with default parameter
## values appropriate for a server.
node.configureLes(announceType, maxReqCount, maxReqCostSum, reqCostTarget)
proc persistLesMessageStats*(node: EthereumNode) =
persistMessageStats(node.chain, node.protocolState(les))

4
tests/all_tests.nim Normal file
View File

@ -0,0 +1,4 @@
import
testecies, testauth, testcrypt,
les/test_flow_control

View File

@ -0,0 +1,5 @@
import
eth_p2p/rlpx_protocols/les/flow_control
flow_control.tests()

1
tests/nim.cfg Normal file
View File

@ -0,0 +1 @@
d:testing

View File

@ -7,31 +7,127 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import sequtils
import eth_keys, asyncdispatch2
import eth_p2p
import
sequtils, strformat, options, unittest,
chronicles, asyncdispatch2, rlp, eth_keys,
eth_p2p, eth_p2p/mock_peers
const clientId = "nim-eth-p2p/0.0.1"
const
clientId = "nim-eth-p2p/0.0.1"
rlpxProtocol dmy, 1: # Rlpx would be useless with no subprotocols. So we define a dummy proto
proc foo(peer: Peer)
type
AbcPeer = ref object
peerName: string
lastResponse: string
XyzPeer = ref object
messages: int
AbcNetwork = ref object
peers: seq[string]
rlpxProtocol abc(version = 1,
peerState = AbcPeer,
networkState = AbcNetwork,
timeout = 100):
onPeerConnected do (peer: Peer):
await peer.hi "Bob"
let response = await peer.nextMsg(abc.hi)
peer.networkState.peers.add response.name
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason):
echo "peer disconnected", peer
requestResponse:
proc abcReq(p: Peer, n: int) =
echo "got req ", n
await p.abcRes(reqId, &"response to #{n}")
proc abcRes(p: Peer, data: string) =
echo "got response ", data
proc hi(p: Peer, name: string) =
echo "got hi from ", name
p.state.peerName = name
let query = 123
echo "sending req #", query
var r = await p.abcReq(query)
if r.isSome:
p.state.lastResponse = r.get.data
else:
p.state.lastResponse = "timeout"
rlpxProtocol xyz(version = 1,
peerState = XyzPeer,
useRequestIds = false,
timeout = 100):
proc foo(p: Peer, s: string, a, z: int) =
p.state.messages += 1
if p.supports(abc):
echo p.state(abc).peerName
proc bar(p: Peer, i: int, s: string)
requestResponse:
proc xyzReq(p: Peer, n: int, timeout = 3000) =
echo "got req ", n
proc xyzRes(p: Peer, data: string) =
echo "got response ", data
proc defaultTestingHandshake(_: type abc): abc.hi =
result.name = "John Doe"
proc localAddress(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc test() {.async.} =
let node1Keys = newKeyPair()
let node1Address = localAddress(30303)
var node1 = newEthereumNode(node1Keys, node1Address, 1, nil)
node1.startListening()
template asyncTest(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()
let node2Keys = newKeyPair()
var node2 = newEthereumNode(node2Keys, localAddress(30304), 1, nil)
asyncTest "network with 3 peers using custom protocols":
let localKeys = newKeyPair()
let localAddress = localAddress(30303)
var localNode = newEthereumNode(localKeys, localAddress, 1, nil)
localNode.initProtocolStates()
localNode.startListening()
let node1AsRemote = newNode(initENode(node1Keys.pubKey, node1Address))
let peer = await node2.rlpxConnect(node1AsRemote)
var mock1 = newMockPeer do (m: MockConf):
m.addHandshake abc.hi(name: "Alice")
doAssert(not peer.isNil)
m.expect(abc.abcReq) do (peer: Peer, data: Rlp):
let reqId = data.readReqId()
await peer.abcRes(reqId, "mock response")
await sleepAsync(100)
let r = await peer.abcReq(1)
assert r.get.data == "response to #1"
m.expect(abc.abcRes)
var mock2 = newMockPeer do (m: MockConf):
m.addCapability xyz
m.addCapability abc
m.expect(abc.abcReq) # we'll let this one time out
m.expect(xyz.xyzReq) do (peer: Peer):
echo "got xyz req"
await peer.xyzRes("mock peer data")
discard await mock1.rlpxConnect(localNode)
let mock2Connection = await localNode.rlpxConnect(mock2)
let r = await mock2Connection.xyzReq(10)
check r.get.data == "mock peer data"
let abcNetState = localNode.protocolState(abc)
check:
abcNetState.peers.len == 2
"Alice" in abcNetState.peers
"John Doe" in abcNetState.peers
waitFor test()