Alternative approach of Ethereum P2P networking.

This commit is contained in:
cheatfate 2018-08-06 12:53:03 +03:00
parent 7fac4cbf68
commit eb72c6042f
4 changed files with 1819 additions and 0 deletions

348
eth_p2p/eth.nim Normal file
View File

@ -0,0 +1,348 @@
#
# Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
import asyncdispatch2, stint, rlp, eth_common, chronicles
import protocols, peer
const
# Max number of items we can ask for in ETH requests. These are the values
# used in geth and if we ask for more than this the peers will disconnect
# from us.
MaxStateFetch* = 384
MaxBodiesFetch* = 128
MaxReceiptsFetch* = 256
MaxHeadersFetch* = 192
const
MsgStatus* = 0x00
MsgNewBlockHashes* = 0x01
MsgTransactions* = 0x02
# eth/61
MsgGetBlockHashes* = 0x03
MsgBlockHashes* = 0x04
MsgGetBlocks* = 0x05
MsgBlocks* = 0x06
MsgNewBlock* = 0x07
MsgBlockHashesFromNumber* = 0x08
# eth/62
MsgGetBlockHeaders* = 0x03
MsgBlockHeaders* = 0x04
MsgGetBlockBodies* = 0x05
MsgBlockBodies* = 0x06
# eth/63
MsgGetNodeData* = 0x0D
MsgNodeData* = 0x0E
MsgGetReceipts* = 0x0F
MsgReceipts* = 0x10
const
EthereumCap61* = initECap("eth", 61)
EthereumCap62* = initECap("eth", 62)
EthereumCap63* = initECap("eth", 63)
proc ethGetCmd*(epcap: EPeerCap, cmd: int): int =
## Checks if specific message is supported by capability/protocol ``epcap``
## and returns (zero based) specific to protocol message id.
## If `cmd` identifier is not supported by specific protocol ``epcap`` -1 will
## be returned.
result = -1
let cmdId = epcap.protoId(cmd)
if epcap.cap == EthereumCap61:
if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions,
MsgGetBlockHashes, MsgBlockHashes, MsgGetBlocks,
MsgBlocks, MsgNewBlock, MsgBlockHashesFromNumber}:
result = cmdId
elif epcap.cap == EthereumCap62:
if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions,
MsgGetBlockHeaders, MsgBlockHeaders, MsgGetBlockBodies,
MsgBlockBodies, MsgNewBlock}:
result = cmdId
elif epcap.cap == EthereumCap63:
if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions,
MsgGetBlockHeaders, MsgBlockHeaders, MsgGetBlockBodies,
MsgBlockBodies, MsgNewBlock, MsgGetNodeData, MsgNodeData,
MsgGetReceipts, MsgReceipts}:
result = cmdId
else:
discard
proc sendStatus*(peer: Peer, cap: EPeerCap, networkId: int,
tdifficulty: UInt256, bestHash: Hash256,
genesisHash: Hash256): Future[bool] {.async.} =
## Send `Status` message to remote peer
if peer.state notin {ConnectionState.None, Connected}: return false
let eindex = peer.supports([EthereumCap61, EthereumCap62, EthereumCap63])
doAssert(eindex >= 0, "Peer do not support eth status()")
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgStatus)
writer.append(int(cmdId))
writer.startList(5)
writer.append(int(cap.version))
writer.append(int(networkId))
writer.append(tdifficulty)
writer.append(bestHash)
writer.append(genesisHash)
debug "Sending Status message", peer = $peer, version = $cap.version
result = await peer.sendMessage(writer.finish())
proc sendNewBlockHashes*(peer: Peer,
hashes: seq[Hash256]): Future[bool] {.async.} =
if peer.state != Connected: return false
let eindex = peer.supports(EthereumCap61)
doAssert(eindex >= 0, "Peer do not support NewBlockHashes()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgNewBlockHashes)
writer.append(int(cmdId))
writer.startList(len(hashes))
for hash in hashes:
writer.append(hash)
debug "Sending NewBlockHashes message", peer = $peer, version = $cap.version
result = await peer.sendMessage(writer.finish())
proc sendNewBlockHashes*(peer: Peer,
bhashes: seq[tuple[hash: Hash256, num: UInt256]]): Future[bool] {.async.} =
if peer.state != Connected: return false
let eindex = peer.supports(EthereumCap62)
doAssert(eindex >= 0, "Peer do not support NewBlockHashes()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgNewBlockHashes)
writer.append(int(cmdId))
writer.startList(len(bhashes))
for item in bhashes:
writer.startList(2)
writer.append(item.hash)
writer.append(item.num)
debug "Sending NewBlockHashes message", peer = $peer, version = $cap.version
result = await peer.sendMessage(writer.finish())
proc sendBlockHashes*(peer: Peer,
hashes: seq[Hash256]): Future[bool] {.async.} =
if peer.state != Connected: return false
let eindex = peer.supports(EthereumCap61)
doAssert(eindex >= 0, "Peer do not support NewBlockHashes()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgBlockHashes)
writer.append(int(cmdId))
writer.startList(len(hashes))
for hash in hashes:
writer.append(hash)
debug "Sending BlockHashes message", peer = $peer, version = $cap.version
result = await peer.sendMessage(writer.finish())
# Missing 0x01:sendNewBlockHashes eth/61
# Missing 0x02:sendTransactions eth/61
# Missing 0x06:sendBlocks eth/61
# Missing 0x07:newBlock eth/61
# Missing 0x04:sendBlockHeaders eth/62
# Missing 0x06:sendBlockBodies eth/62
# Missing 0x0E:sendNodeDat eth/63
# Missing 0x10:sendReceipts eth/63
template sendReceive(peer: Peer, epcap: EPeerCap, sendMsg: BytesRange,
msgId: int, name: string): EthereumMessage =
let startTime = fastEpochTime()
var msg = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
var sendfut = peer.sendMessage(sendMsg)
yield sendfut
let res = sendfut.read()
if res:
var fut = newFuture[EthereumMessage](name)
peer.subscribe(epcap, msgId, fut)
try:
var msgfut = wait(fut, peer.responseTimeout)
yield msgfut
msg = msgfut.read()
msg.elapsed = int(fastEpochTime() - startTime)
peer.unsubscribe(epcap, msgId)
except AsyncTimeoutError:
msg = EthereumMessage(id: MsgTimeout, data: zeroBytesRlp)
msg
proc getBlockHashesFromNumber*(peer: Peer, number: UInt256,
maxBlocks: int): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap61)
doAssert(eindex >= 0, "Peer do not support getblockHashesFromNumber()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgBlockHashesFromNumber)
writer.append(int(cmdId))
writer.startList(2)
writer.append(number)
writer.append(maxBlocks)
debug "Sending BlockHashesFromNumber message", peer = $peer,
version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlockHashes,
"eth.getBlockHashesFromNumber")
proc getBlockHashes*(peer: Peer, hash: Hash256,
maxBlocks: int): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap61)
doAssert(eindex >= 0, "Peer do not support getBlockHashes()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetBlockHashes)
writer.append(int(cmdId))
writer.startList(2)
writer.append(hash)
writer.append(maxBlocks)
debug "Sending GetBlockHashes message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlockHashes,
"eth.getBlockHashes")
proc getBlocks*(peer: Peer,
hashes: seq[Hash256]): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap61)
doAssert(eindex >= 0, "Peer do not support getBlocks()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgBlocks)
writer.append(int(cmdId))
writer.startList(len(hashes))
for item in hashes:
writer.append(item)
debug "Sending GetBlocks message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlocks, "eth.getBlocks")
proc getBlockHeaders*(peer: Peer, blok: UInt256, maxHeaders: int, skip: int,
reverse: bool): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap62)
doAssert(eindex >= 0, "Peer do not support getBlockHeaders()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetBlockHeaders)
writer.append(int(cmdId))
writer.startList(4)
writer.append(blok)
writer.append(maxHeaders)
writer.append(skip)
if reverse:
writer.append(int(1))
else:
writer.append(int(0))
debug "Sending GetBlockHeaders message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlockHeaders,
"eth.getBlockHeaders")
proc getBlockHeaders*(peer: Peer, blok: Hash256, maxHeaders: int, skip: int,
reverse: bool): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap62)
doAssert(eindex >= 0, "Peer do not support getBlockHeaders()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetBlockHeaders)
writer.append(int(cmdId))
writer.startList(4)
writer.append(blok)
writer.append(maxHeaders)
writer.append(skip)
if reverse:
writer.append(int(1))
else:
writer.append(int(0))
debug "Sending GetBlockHeaders message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlockHeaders,
"eth.getBlockHeaders")
proc getBlockBodies*(peer: Peer,
hashes: seq[Hash256]): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap62)
doAssert(eindex >= 0, "Peer do not support getBlockBodies()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetBlockHeaders)
writer.append(int(cmdId))
writer.startList(len(hashes))
for item in hashes:
writer.append(item)
debug "Sending GetBlockBodies message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgBlockBodies,
"eth.getBlockBodies")
proc getNodeData*(peer: Peer,
hashes: seq[Hash256]): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap63)
doAssert(eindex >= 0, "Peer do not support getNodeData()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetNodeData)
writer.append(int(cmdId))
writer.startList(len(hashes))
for item in hashes:
writer.append(item)
debug "Sending GetNodeData message", peer = $peer, version = $cap.version
result = peer.sendReceive(cap, writer.finish(), MsgNodeData,
"eth.getNodeData")
proc getReceipts*(peer: Peer,
hashes: seq[Hash256]): Future[EthereumMessage] {.async.} =
result = EthereumMessage(id: MsgBad, data: zeroBytesRlp)
if peer.state != Connected: return
let eindex = peer.supports(EthereumCap63)
doAssert(eindex >= 0, "Peer do not support getReceipts()")
let cap = peer.caps[eindex]
var writer = initRlpWriter()
let cmdId = cap.cmdId(MsgGetNodeData)
writer.append(int(cmdId))
writer.startList(len(hashes))
for item in hashes:
writer.append(item)
debug "Sending GetReceipts message", peer = $peer,
version = $cap.cap.version
result = peer.sendReceive(cap, writer.finish(), MsgReceipts,
"eth.getReceipts")

1008
eth_p2p/peer.nim Normal file

File diff suppressed because it is too large Load Diff

314
eth_p2p/protocols.nim Normal file
View File

@ -0,0 +1,314 @@
#
# Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
import algorithm, hashes
type
ECap* = distinct int
## Internal representation of Ethereum Capability
CapName* = array[3, char]
## Internal representation of Ethereum Capability protocol name
RlpCap* = object
## RLP presentation of Ethereum Capability
name*: CapName
version*: int
EPeerCap* = object
## Synchronized Ethereum Capability
cap*: ECap
offset*: int
index*: int
ECapList* = seq[ECap]
## Ethereum Capabilities list object
RlpCapList* = seq[RlpCap]
## RLP serialized list of Ethereum Capabilities
EPeerCapList* = seq[EPeerCap]
## Synchronized list of Ethereum Capabilities
proc newECapList*(): ECapList =
## Create new empty Ethereum Capabilities list.
result = newSeq[ECap]()
proc initECap*(name: string, version: int): ECap =
## Create new Ethereum Capability using protocol name ``name`` and protocol
## version ``version``.
assert(len(name) >= 3)
assert(version >= 0 and version <= 255)
result = ECap((int(name[0]) shl 24) or (int(name[1]) shl 16) or
(int(name[2]) shl 8) or (int(version) and 0xFF))
proc initECap*(rcap: RlpCap): ECap =
## Create new Ethereum Capability using RLP serialized capability.
assert(rcap.version >= 0 and rcap.version <= 255)
result = ECap((int(rcap.name[0]) shl 24) or (int(rcap.name[1]) shl 16) or
(int(rcap.name[2]) shl 8) or (int(rcap.version) and 0xFF))
proc initRCap*(ecap: ECap): RlpCap =
## Create RLP serialized Ethereum Capability from internal representation of
## Ethereum Capability ``ecap``.
result.name[0] = chr((int(ecap) shr 24) and 0xFF)
result.name[1] = chr((int(ecap) shr 16) and 0xFF)
result.name[2] = chr((int(ecap) shr 8) and 0xFF)
result.version = int(ecap) and 0xFF
proc hash*(ecap: ECap): Hash {.inline.} =
## Calculate ``Hash`` for Ethereum Capability ``ecap``.
result = Hash(ecap)
proc `$`*(ecap: ECap): string =
## Get string representation of Ethereum Capability ``ecap``.
result = newStringOfCap(8)
result.setLen(4)
result[3] = '/'
result[2] = chr((int(ecap) shr 8) and 0xFF)
result[1] = chr((int(ecap) shr 16) and 0xFF)
result[0] = chr((int(ecap) shr 24) and 0xFF)
result.add($(int(ecap) and 0xFF))
proc `$`*(rcap: RlpCap): string =
## Get string representation of RLP serialized Ethereum Capability ``rcap``.
result = newStringOfCap(8)
result.setLen(4)
result[3] = '/'
result[2] = rcap.name[2]
result[1] = rcap.name[1]
result[0] = rcap.name[0]
result.add($rcap.version)
proc cmpProto*(ecap1, ecap2: ECap): int =
## Compare protocols of Ethereum Capabilities ``ecap1`` and ``ecap2``.
result = ((int(ecap1) shr 8) and 0xFFFFFF) -
((int(ecap2) shr 8) and 0xFFFFFF)
proc cmpVersion*(ecap1, ecap2: ECap): int =
## Compare versions of Ethereum Capabilities ``ecap1`` and ``ecap2``.
result = (int(ecap1) and 0xFF) - (int(ecap2) and 0xFF)
proc version*(ecap: ECap): int =
## Get version of Ethereum Capability ``ecap`` as integer.
result = (int(ecap) and 0xFF)
proc protocol*(ecap: ECap): string =
## Get protocol of Ethereum Capability ``ecap`` as string.
result = newString(3)
result[0] = chr((int(ecap) shr 24) and 0xFF)
result[1] = chr((int(ecap) shr 16) and 0xFF)
result[2] = chr((int(ecap) shr 8) and 0xFF)
proc protocol*(epcap: EPeerCap): string {.inline.} =
## Get protocol of Ethereum Capability ``epcap`` as string.
result = epcap.cap.protocol()
proc version*(epcap: EPeerCap): int {.inline.} =
## Get version of Ethereum Capability ``ecap`` as integer.
result = epcap.cap.version()
proc `==`*(x: ECap, y: ECap): bool {.borrow.}
## Compare Ethereum Capabilities ``x`` and ``y``.
type
EProtocol* = object
cap*: ECap
length*: int
const
EthereumProtocols* = [
EProtocol(cap: initECap("eth", 61), length: 9),
EProtocol(cap: initECap("eth", 62), length: 8),
EProtocol(cap: initECap("eth", 63), length: 16),
EProtocol(cap: initECap("les", 1), length: 15),
EProtocol(cap: initECap("les", 2), length: 21)
]
proc protoLength*(cap: ECap): int =
## Get number of commands for Ethereum Capability ``cap``.
for item in EthereumProtocols:
if item.cap == cap:
result = item.length
break
proc cmp*(x, y: ECap): int =
## Comparison function for sorting Ethereum Capabilities.
if x == y: return 0
if int(x) < int(y): return -1
return 1
proc sync*(secap, ecap: ECapList): EPeerCapList =
## Synchronize local and remote lists of Ethereum Capabilities, and calculate
## protocol commands' offsets.
##
## Please note, that ``secap`` list must be sorted!
result = newSeq[EPeerCap]()
var curindex = 0
for cap1 in secap:
for cap2 in ecap:
if cap1 == cap2:
if len(result) == 0:
# first added protocol has 0x10 offset
result.add(EPeerCap(cap: cap1, offset: 16, index: curindex))
inc(curindex)
else:
let prev = result[^1]
if cmpProto(prev.cap, cap1) == 0:
if cmpVersion(prev.cap, cap1) < 0:
# replacing same protocol with most recent version
result[^1] = EPeerCap(cap: cap1, offset: prev.offset)
else:
# adding new protocol with offset
let offset = prev.offset + protoLength(prev.cap)
result.add(EPeerCap(cap: cap1, offset: offset, index: curindex))
inc(curindex)
proc register*(lcap: var ECapList, cap: ECap) =
## Registers Ethereum Capability ``cap`` in list ``lcap``.
##
## Procedure keeps list ``lcap`` sorted.
if len(lcap) == 0:
lcap.add(cap)
else:
for item in lcap:
if item == cap:
return
lcap.add(cap)
sort(lcap, cmp)
proc register*(lcap: var ECapList, caps: openarray[ECap]) =
## Registers array of Ethereum Capabilities ``caps`` in list ``lcap``.
##
## Procedure keeps ``lcap`` sorted.
for item in caps:
lcap.register(item)
proc unregister*(lcap: var ECapList, cap: ECap) =
## Unregister Ethereum Capability ``cap`` from list ``lcap``.
##
## Procedure keeps list ``lcap`` sorted.
var scap: seq[ECap]
for item in lcap:
if item != cap:
scap.add(item)
if len(scap) != len(lcap):
shallowCopy(lcap, scap)
proc unregister*(lcap: var ECapList, caps: openarray[ECap]) =
## Unregister array of Ethereum Capabilities from list ``lcap``.
##
## Procedure keeps list ``lcap`` sorted.
for item in caps:
lcap.unregister(item)
proc newECapList*(caps: openarray[ECap]): ECapList =
## Create new Ethereum Capabilities list and populate it with capabilities
## from ``caps``.
result = newSeq[ECap]()
result.register(caps)
proc newECapList*(cap: ECap): ECapList =
## Create new Ethereum Capabilities list and register capability ``cap``
## in it.
result = newSeq[ECap]()
result.register(cap)
proc `$`*(lcap: ECapList): string =
## Get string representation of Ethereum Capabilities list ``lcap``.
result = ""
for item in lcap:
if len(result) > 0:
result.add(", ")
result.add($item)
else:
result.add($item)
proc `$`*(pcap: EPeerCapList): string =
## Get string representation of synchronized remote peer capabilities
## ``pcap``.
result = ""
for item in pcap:
if len(result) > 0:
result.add(", ")
result.add($item.cap)
result.add(" (")
result.add($item.offset)
result.add("/")
result.add($item.index)
result.add(")")
else:
result.add($item.cap)
result.add(" (")
result.add($item.offset)
result.add(" / ")
result.add($item.index)
result.add(")")
proc `$`*(rcap: RlpCapList): string =
## Get string representation of RLP serialized Ethereum Capabilities list.
result = ""
for item in rcap:
if len(result) > 0:
result.add(", ")
result.add($item)
else:
result.add($item)
proc cmdId*(epcap: EPeerCap, cmdid: int): int {.inline.} =
## Get actual command id of command with ``cmdid`` using data from
## synchronized peer capability ``epcap``.
result = cmdid + epcap.offset
proc cmdId*(pcap: EPeerCapList, proto: string, cmdid: int): int =
## Get actual command id from protocol with name ``proto`` and list of
## synchronized peer capabilities ``pcap``.
result = cmdid
var cap = initECap(proto, 0)
for item in pcap:
if cmpProto(item.cap, cap) == 0:
result += item.offset
break
proc protoId*(epcap: EPeerCap, cmdid: int): int {.inline.} =
## Get sub-protocol specific ``message id`` (zero based) from peer's cmd id.
result = cmdid - epcap.offset
proc fromRlp*(lrlpcap: openarray[RlpCap]): ECapList =
## Convert list of RLP serialized Ethereum Capabilities to list of Ethereum
## Capabilities.
result = newECapList()
for item in lrlpcap:
result.register(initECap(item))
proc toRlp*(lcap: openarray[ECap]): RlpCapList =
## Convert list of Ethereum Capabilities ``lcap`` to list of RLP serialized
## Ethereum Capabilities.
result = newSeq[RlpCap]()
for item in lcap:
result.add(initRCap(item))
when isMainModule:
var lcaplist = newECapList()
var rcaplist = newECapList()
var a = initECap("eth", 61)
var b = initECap("eth", 62)
var c = initECap("eth", 63)
var d = initECap("les", 1)
var e = initECap("les", 2)
var f = initECap("par", 1)
var g = initECap("par", 2)
lcaplist.register([a, b, c, d])
rcaplist.register([e, d, d, c, b, a, f, g])
echo "local ", lcaplist
echo "remote ", rcaplist
echo sync(lcaplist, rcaplist)

149
tests/testpeer.nim Normal file
View File

@ -0,0 +1,149 @@
import asyncdispatch2, eth_keys, eth_common, rlp, chronicles, nimcrypto, stint
import peer, protocols, eth, enode
const
NodeKey = "1b5b2a9c891067139c2aac53f66a84e2888ce494407a21c662dc546150e7e170"
# enode://425f2261ef52010ed833bdbebbc67c36dfc208c0330e2c248fadef3feeb291c677265289bf64437055116b7d1dc3f78be5122d0041f021c01a501b876c664d4f@[::]:30303
ENodeAddress = "enode://410a034fbd91e872cbe52a5fb5bec1f030d4239dab35efbdf53b6a3f09f42a84965f0d5c76c44d42b791339ff249675ec7fc656d69d10cb95b315a1136f6634e@192.168.2.10:30303"
GenesisHash = "D4E56740F876AEF8C010B86A40D5F56745A118D0906A34E69AEC8C0DB1CB8FA3"
proc getInterface(en: EthereumNode, peer: Peer, epcap: EPeerCap): EInterface =
var
version: int
network: int
totalDifficulty: UInt256
bestHash: Hash256
genesisHash: Hash256
var currentTotalDifficulty = 0.u256
var currentBestHash: Hash256
var currentGenesisHash: Hash256
var hash = fromHex(GenesisHash)
copyMem(addr currentGenesisHash, addr hash[0], 32)
proc handshake(peer: Peer): Future[bool] {.async.} =
# Sending `Status` message to remote peer.
let res = await peer.sendStatus(epcap, en.network, currentTotalDifficulty,
currentBestHash, currentGenesisHash)
if not res: return false
# Waiting for `Status` message from remote peer.
var msg = await peer.getMessage(epcap)
# Converting synchronized command id back to protocol message id.
var ethid = epcap.ethGetCmd(msg.id)
if msg.id == MsgBad or msg.id == MsgDisconnect:
## Received message is rather incorrect or disconnect.
result = false
else:
if ethid == MsgStatus:
result = true
# Decoding `Status` message frame.
if (not msg.data.isList()) or (msg.data.listLen() != 5):
debug "Malformed status message received", peer = $peer,
isList = msg.data.isList(),
listLength = msg.data.listLen()
result = false
if not result: return
try:
msg.data.enterList()
version = msg.data.read(int)
network = msg.data.read(int)
totalDifficulty = msg.data.read(UInt256)
bestHash = msg.data.read(Hash256)
genesisHash = msg.data.read(Hash256)
except:
debug "Malformed status message received", peer = $peer
result = false
if not result: return
# Verification of received data from `Status` message frame.
if version != epcap.version():
debug "Sub-protocol version did not match", peer = $peer,
remoteVersion = $version,
localVersion = $epcap.version
await peer.disconnect(UselessPeer)
return false
# Remote network id must be equal to our network id
if network != en.network:
debug "Different network id specified", peer = $peer,
remoteNetwork = network,
localNetwork = en.network
await peer.disconnect(UselessPeer)
return false
# Remote Genesis must be equal to our Genesis
if genesisHash != currentGenesisHash:
debug "Genesis hash did not match", peer = $peer,
remoteGenesis = $genesisHash,
localGenesis = $currentGenesisHash
await peer.disconnect(UselessPeer)
return false
debug "Ethereum Protocol started", peer = $peer,
version = $epcap.version
result = true
else:
# There must be no other messages, except `Status` message.
debug "Incorrect message received", peer = $peer, msgId = $msg.id
result = false
proc run(peer: Peer) {.async.} =
while true:
var msg = await peer.getMessage(epcap)
if msg.id == MsgBad:
# Remote peer sent malformed message or get disconnected without reason.
# You don't need to close peer here, this is just notification so you
# can break your cycle.
debug "Sub-protocol received notification", peer = $peer
break
elif msg.id == MsgDisconnect:
# Remote peer send `Disconnect` message with a reason.
# You don't need to close peer here, this is just notification so you
# can break your cycle.
debug "Sub-protocol received disconnect notification", peer = $peer
break
else:
# Here we can get any message specific exactly to this protocol.
let ethid = epcap.ethGetCmd(msg.id)
if ethid == -1:
# Received message with id, which is not related to protocol
debug "Sub-protocol received incorrect message", peer = $peer,
msgid = $msg.id
# peer.disconnect() will do `peer.close()` for us
await peer.disconnect(BreachOfProtocol)
break
else:
debug "Sub-protocol received message", peer = $peer,
msgid = $msg.id,
ethid = $ethid
if ethid == MsgGetBlockHeaders:
debug "Received MsgGetBlockHeaders", peer = $peer
elif ethid == MsgGetBlockBodies:
debug "Received MsgGetBlockBodies", peer = $peer
elif ethid == MsgGetNodeData:
debug "Received MsgGetNodeData", peer = $peer
elif ethid == MsgGetReceipts:
debug "Received MsgGetReceipts", peer = $peer
new result
result.handshake = handshake
result.run = run
proc test() {.async.} =
var en = newEthereumNode(1, initPrivateKey(NodeKey))
en.registerProtocol(initECap("eth", 63), getInterface)
let peer = await connect(en, ENodeAddress)
await sleepAsync(1000)
var time = await peer.ping()
echo "PONG RECEIVED in ", time, "ms"
await sleepAsync(1000000)
when isMainModule:
waitFor test()