Whisper post-review changes (PR #52)

This commit is contained in:
Zahary Karadjov 2018-11-28 01:52:22 +02:00
parent 1e5eeec6bb
commit ccea6dc7b6
16 changed files with 270 additions and 231 deletions

View File

@ -72,7 +72,7 @@ proc newEthereumNode*(keys: KeyPair,
```
Each supplied protocol identifier is a name of a protocol introduced
by the `rlpxProtocol` macro discussed later in this document.
by the `p2pProtocol` macro discussed later in this document.
Instantiating an `EthereumNode` does not immediately connect you to
the network. To start the connection process, call `node.connectToNetwork`:
@ -108,13 +108,13 @@ the definition of a RLPx protocol:
### RLPx sub-protocols
The sub-protocols are defined with the `rlpxProtocol` macro. It will accept
The sub-protocols are defined with the `p2pProtocol` macro. It will accept
a 3-letter identifier for the protocol and the current protocol version:
Here is how the [DevP2P wire protocol](https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol) might look like:
``` nim
rlpxProtocol p2p(version = 0):
p2pProtocol p2p(version = 0):
proc hello(peer: Peer,
version: uint,
clientId: string,
@ -148,8 +148,8 @@ though the `state` pseudo-field of the `Peer` object:
type AbcPeerState = object
receivedMsgsCount: int
rlpxProtocol abc(version = 1,
peerState = AbcPeerState):
p2pProtocol abc(version = 1,
peerState = AbcPeerState):
proc incomingMessage(p: Peer) =
p.state.receivedMsgsCount += 1
@ -202,7 +202,7 @@ proc handshakeExample(peer: Peer) {.async.} =
There are few things to note in the above example:
1. The `rlpxProtocol` definition created a pseudo-variable named after the
1. The `p2pProtocol` definition created a pseudo-variable named after the
protocol holding various properties of the protocol.
2. Each message defined in the protocol received a corresponding type name,
@ -220,7 +220,7 @@ be dispatched to their respective handlers.
### `requestResponse` pairs
``` nim
rlpxProtocol les, 2:
p2pProtocol les(version = 2):
...
requestResponse:
@ -242,9 +242,9 @@ be specified for each individual call and the default value can be
overridden on the level of individual message, or the entire protocol:
``` nim
rlpxProtocol abc(version = 1,
useRequestIds = false,
timeout = 5000): # value in milliseconds
p2pProtocol abc(version = 1,
useRequestIds = false,
timeout = 5000): # value in milliseconds
requestResponse:
proc myReq(dataId: int, timeout = 3000)
proc myRes(data: string)
@ -262,7 +262,7 @@ also include handlers for certain important events such as newly connected
peers or misbehaving or disconnecting peers:
``` nim
rlpxProtocol les(version = 2):
p2pProtocol les(version = 2):
onPeerConnected do (peer: Peer):
asyncCheck peer.status [
"networkId": rlp.encode(1),

View File

@ -18,14 +18,18 @@ import
export
types, rlpx, enode, kademlia
proc addCapability*(n: var EthereumNode, p: ProtocolInfo) =
assert n.connectionState == ConnectionState.None
let pos = lowerBound(n.rlpxProtocols, p, rlpx.cmp)
n.rlpxProtocols.insert(p, pos)
n.rlpxCapabilities.insert(p.asCapability, pos)
proc addCapability*(node: var EthereumNode, p: ProtocolInfo) =
assert node.connectionState == ConnectionState.None
template addCapability*(n: var EthereumNode, Protocol: type) =
addCapability(n, Protocol.protocolInfo)
let pos = lowerBound(node.protocols, p, rlpx.cmp)
node.protocols.insert(p, pos)
node.capabilities.insert(p.asCapability, pos)
if p.networkStateInitializer != nil:
node.protocolStates[p.index] = p.networkStateInitializer(node)
template addCapability*(node: var EthereumNode, Protocol: type) =
addCapability(node, Protocol.protocolInfo)
proc newEthereumNode*(keys: KeyPair,
address: Address,
@ -38,8 +42,8 @@ proc newEthereumNode*(keys: KeyPair,
result.keys = keys
result.networkId = networkId
result.clientId = clientId
result.rlpxProtocols.newSeq 0
result.rlpxCapabilities.newSeq 0
result.protocols.newSeq 0
result.capabilities.newSeq 0
result.address = address
result.connectionState = ConnectionState.None
@ -47,8 +51,10 @@ proc newEthereumNode*(keys: KeyPair,
result.protocolVersion = if useCompression: devp2pSnappyVersion
else: devp2pVersion
result.protocolStates.newSeq allProtocols.len
if addAllCapabilities:
for p in rlpxProtocols:
for p in allProtocols:
result.addCapability(p)
proc processIncoming(server: StreamServer,
@ -76,13 +82,6 @@ proc startListening*(node: EthereumNode) =
udata = cast[pointer](node))
node.listeningServer.start()
proc initProtocolStates*(node: EthereumNode) =
# TODO: This should be moved to a private module
node.protocolStates.newSeq(rlpxProtocols.len)
for p in node.rlpxProtocols:
if p.networkStateInitializer != nil:
node.protocolStates[p.index] = ((p.networkStateInitializer)(node))
proc connectToNetwork*(node: EthereumNode,
bootstrapNodes: seq[ENode],
startListening = true,
@ -101,8 +100,6 @@ proc connectToNetwork*(node: EthereumNode,
if startListening:
eth_p2p.startListening(node)
node.initProtocolStates()
if startListening:
node.listeningServer.start()
@ -129,6 +126,11 @@ iterator peers*(node: EthereumNode, Protocol: type): Peer =
for peer in node.peerPool.peers(Protocol):
yield peer
iterator protocolPeers*(node: EthereumNode, Protocol: type): auto =
mixin state
for peer in node.peerPool.peers(Protocol):
yield peer.state(Protocol)
iterator randomPeers*(node: EthereumNode, maxPeers: int): Peer =
# TODO: this can be implemented more efficiently

View File

@ -27,8 +27,10 @@ proc runTest(name: string, defs = "", lang = "c") =
exec "nim " & lang & " " & defs & " -d:testing --experimental:ForLoopMacros -r tests/" & name
task test, "Runs the test suite":
runTest "all_tests"
runTest "testenode"
runTest "tdiscovery"
runTest "tserver"
runTest "tserver", "-d:useSnappy"
runTest "all_tests"
# runTest "tshh_connect"
runTest "tshh_connect_mocked"

View File

@ -128,7 +128,7 @@ proc addHandshake*(mock: MockConf, msg: auto) =
msgInfo.protocol = mock.addProtocol(msgInfo.protocol)
let expectedMsg = ExpectedMsg(msg: msgInfo, response: reply(msg))
when msg is p2p.hello:
when msg is devp2p.hello:
devp2pHandshake = expectedMsg
else:
mock.handshakes.add expectedMsg
@ -200,7 +200,7 @@ proc newMockPeer*(userConfigurator: proc (m: MockConf)): EthereumNode =
proc sendHello(p: Peer, data: Rlp) {.async.} =
await p.hello(devp2pVersion,
mockConf.clientId,
node.rlpxCapabilities,
node.capabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
@ -208,7 +208,6 @@ proc newMockPeer*(userConfigurator: proc (m: MockConf)): EthereumNode =
msg: makeProtoMsgPair(p2p.hello),
response: sendHello)
node.initProtocolStates()
node.startListening()
return node

View File

@ -18,8 +18,8 @@ type
peerPool*: PeerPool
# Private fields:
rlpxCapabilities*: seq[Capability]
rlpxProtocols*: seq[ProtocolInfo]
capabilities*: seq[Capability]
protocols*: seq[ProtocolInfo]
listeningServer*: StreamServer
protocolStates*: seq[RootRef]
discovery*: DiscoveryProtocol

View File

@ -30,12 +30,12 @@ when tracingEnabled:
var
gProtocols: seq[ProtocolInfo]
gDispatchers = initSet[Dispatcher]()
devp2p: ProtocolInfo
gDevp2pInfo: ProtocolInfo
# The variables above are immutable RTTI information. We need to tell
# Nim to not consider them GcSafe violations:
template rlpxProtocols*: auto = {.gcsafe.}: gProtocols
template devp2pProtocolInfo: auto = {.gcsafe.}: devp2p
template allProtocols*: auto = {.gcsafe.}: gProtocols
template devp2pInfo: auto = {.gcsafe.}: gDevp2pInfo
proc newFuture[T](location: var Future[T]) =
location = newFuture[T]()
@ -82,12 +82,12 @@ proc getDispatcher(node: EthereumNode,
# We should be able to find an existing dispatcher without allocating a new one
new(result)
newSeq(result.protocolOffsets, rlpxProtocols.len)
newSeq(result.protocolOffsets, allProtocols.len)
result.protocolOffsets.fill -1
var nextUserMsgId = 0x10
for localProtocol in node.rlpxProtocols:
for localProtocol in node.protocols:
let idx = localProtocol.index
block findMatchingProtocol:
for remoteCapability in otherPeerCapabilities:
@ -105,9 +105,9 @@ proc getDispatcher(node: EthereumNode,
dest[index + i] = addr src[i]
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pProtocolInfo.messages.copyTo(result.messages, 0)
devp2pInfo.messages.copyTo(result.messages, 0)
for localProtocol in node.rlpxProtocols:
for localProtocol in node.protocols:
let idx = localProtocol.index
if result.protocolOffsets[idx] != -1:
result.activeProtocols.add localProtocol
@ -131,15 +131,15 @@ proc getMsgName*(peer: Peer, msgId: int): string =
proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, ptr MessageInfo) =
doAssert msgId >= 0
if msgId <= devp2p.messages[^1].id:
return (devp2p, addr devp2p.messages[msgId])
if msgId <= devp2pInfo.messages[^1].id:
return (devp2pInfo, addr devp2pInfo.messages[msgId])
if msgId < peer.dispatcher.messages.len:
for i in 0 ..< rlpxProtocols.len:
for i in 0 ..< allProtocols.len:
let offset = peer.dispatcher.protocolOffsets[i]
if offset != -1 and
offset + rlpxProtocols[i].messages[^1].id >= msgId:
return (rlpxProtocols[i], peer.dispatcher.messages[msgId])
offset + allProtocols[i].messages[^1].id >= msgId:
return (allProtocols[i], peer.dispatcher.messages[msgId])
# Protocol info objects
#
@ -229,7 +229,7 @@ proc registerProtocol(protocol: ProtocolInfo) =
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
else:
devp2p = protocol
gDevp2pInfo = protocol
# Message composition and encryption
#
@ -592,19 +592,19 @@ proc verifyStateType(t: NimNode): NimNode =
if result.kind != nnkBracketExpr or $result[0] != "ref":
macros.error($result & " must be a ref type")
macro rlpxProtocolImpl(name: static[string],
version: static[uint],
body: untyped,
useRequestIds: static[bool] = true,
timeout: static[int] = defaultReqTimeout,
shortName: static[string] = "",
outgoingRequestDecorator: untyped = nil,
incomingRequestDecorator: untyped = nil,
incomingRequestThunkDecorator: untyped = nil,
incomingResponseDecorator: untyped = nil,
incomingResponseThunkDecorator: untyped = nil,
peerState = type(nil),
networkState = type(nil)): untyped =
macro p2pProtocolImpl(name: static[string],
version: static[uint],
body: untyped,
useRequestIds: static[bool] = true,
timeout: static[int] = defaultReqTimeout,
shortName: static[string] = "",
outgoingRequestDecorator: untyped = nil,
incomingRequestDecorator: untyped = nil,
incomingRequestThunkDecorator: untyped = nil,
incomingResponseDecorator: untyped = nil,
incomingResponseThunkDecorator: untyped = nil,
peerState = type(nil),
networkState = type(nil)): untyped =
## The macro used to defined RLPx sub-protocols. See README.
var
# XXX: deal with a Nim bug causing the macro params to be
@ -628,6 +628,7 @@ macro rlpxProtocolImpl(name: static[string],
protoNameIdent = ident(protoName)
resultIdent = ident "result"
perProtocolMsgId = ident"perProtocolMsgId"
currentProtocolSym = ident"CurrentProtocol"
protocol = ident(protoName & "Protocol")
isSubprotocol = version > 0'u
peerState = verifyStateType peerState.getType
@ -687,6 +688,9 @@ macro rlpxProtocolImpl(name: static[string],
var userHandlerDefinitions = newStmtList()
userHandlerDefinitions.add quote do:
type `currentProtocolSym` = `protoNameIdent`
if msgId >= 0:
userHandlerDefinitions.add quote do:
const `perProtocolMsgId` = `msgId`
@ -1080,10 +1084,10 @@ macro rlpxProtocolImpl(name: static[string],
when defined(debugRlpxProtocol) or defined(debugMacros):
echo repr(result)
macro rlpxProtocol*(protocolOptions: untyped, body: untyped): untyped =
macro p2pProtocol*(protocolOptions: untyped, body: untyped): untyped =
let protoName = $(protocolOptions[0])
result = protocolOptions
result[0] = bindSym"rlpxProtocolImpl"
result[0] = bindSym"p2pProtocolImpl"
result.add(newTree(nnkExprEqExpr,
ident("name"),
newLit(protoName)))
@ -1091,7 +1095,7 @@ macro rlpxProtocol*(protocolOptions: untyped, body: untyped): untyped =
ident("body"),
body))
rlpxProtocol p2p(version = 0):
p2pProtocol devp2p(version = 0, shortName = "p2p"):
proc hello(peer: Peer,
version: uint,
clientId: string,
@ -1120,7 +1124,7 @@ proc removePeer(network: EthereumNode, peer: Peer) =
observer.onPeerDisconnected(peer)
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] =
var futures = newSeqOfCap[Future[void]](rlpxProtocols.len)
var futures = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.disconnectHandler != nil:
@ -1128,6 +1132,30 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[voi
return all(futures)
proc handshakeImpl(peer: Peer,
handshakeSendFut: Future[void],
timeout: int,
HandshakeType: type): Future[HandshakeType] {.async.} =
asyncCheck handshakeSendFut
var response = nextMsg(peer, HandshakeType)
if timeout > 0:
await response or sleepAsync(timeout)
if not response.finished:
discard disconnectAndRaise(peer, BreachOfProtocol,
"sub-protocol handshake was not received in time.")
else:
discard await response
return response.read
macro handshake*(peer: Peer, timeout = 0, sendCall: untyped): untyped =
let
msgName = $sendCall[0]
msgType = newDotExpr(ident"CurrentProtocol", ident(msgName))
sendCall.insert(1, peer)
result = newCall(bindSym"handshakeImpl", peer, sendCall, timeout, msgType)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} =
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
@ -1142,7 +1170,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true
peer.connectionState = Disconnected
removePeer(peer.network, peer)
proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool =
proc validatePubKeyInHello(msg: devp2p.hello, pubKey: PublicKey): bool =
var pk: PublicKey
recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey
@ -1170,13 +1198,13 @@ proc initPeerState*(peer: Peer, capabilities: openarray[Capability]) =
peer.lastReqId = 0
# Initialize all the active protocol states
newSeq(peer.protocolStates, rlpxProtocols.len)
newSeq(peer.protocolStates, allProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
let peerStateInit = protocol.peerStateInitializer
if peerStateInit != nil:
peer.protocolStates[protocol.index] = peerStateInit(peer)
proc postHelloSteps(peer: Peer, h: p2p.hello) {.async.} =
proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} =
initPeerState(peer, h.capabilities)
# Please note that the ordering of operations here is important!
@ -1185,7 +1213,7 @@ proc postHelloSteps(peer: Peer, h: p2p.hello) {.async.} =
# chance to send any initial packages they might require over
# the network and to yield on their `nextMsg` waits.
#
var subProtocolsHandshakes = newSeqOfCap[Future[void]](rlpxProtocols.len)
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer))
@ -1249,7 +1277,7 @@ template baseProtocolVersion(peer: Peer): uint =
devp2pVersion
proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
initTracing(devp2p, rlpxProtocols)
initTracing(devp2pInfo, node.protocols)
new result
result.network = node
@ -1289,11 +1317,11 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
logConnectedPeer result
asyncCheck result.hello(handshake.getVersion(),
node.clientId,
node.rlpxCapabilities,
node.capabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
var response = await result.waitSingleMsg(devp2p.hello)
if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
@ -1323,7 +1351,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
proc rlpxAccept*(node: EthereumNode,
transport: StreamTransport): Future[Peer] {.async.} =
initTracing(devp2p, rlpxProtocols)
initTracing(devp2pInfo, node.protocols)
new result
result.transport = transport
@ -1360,10 +1388,10 @@ proc rlpxAccept*(node: EthereumNode,
logAcceptedPeer result
await result.hello(result.baseProtocolVersion, node.clientId,
node.rlpxCapabilities, listenPort.uint,
node.capabilities, listenPort.uint,
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
var response = await result.waitSingleMsg(devp2p.hello)
if not validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "A Remote nodeId is not its public key" # XXX: Do we care?

View File

@ -36,9 +36,9 @@ const
maxHeadersFetch* = 192
protocolVersion* = 63
rlpxProtocol eth(version = protocolVersion,
peerState = PeerState,
useRequestIds = false):
p2pProtocol eth(version = protocolVersion,
peerState = PeerState,
useRequestIds = false):
onPeerConnected do (peer: Peer):
let

View File

@ -358,7 +358,7 @@ when defined(testing):
proc isMax(s: FlowControlState): bool =
s.bufValue == s.bufLimit
rlpxProtocol dummyLes(version = 1, shortName = "abc"):
p2pProtocol dummyLes(version = 1, shortName = "abc"):
proc a(p: Peer)
proc b(p: Peer)
proc c(p: Peer)

View File

@ -158,12 +158,12 @@ func getRequiredValue(values: openarray[KeyValuePair],
raise newException(HandshakeError,
"Required handshake field " & key & " missing")
rlpxProtocol les(version = lesVersion,
peerState = LesPeer,
networkState = LesNetwork,
outgoingRequestDecorator = outgoingRequestDecorator,
incomingRequestDecorator = incomingRequestDecorator,
incomingResponseThunkDecorator = incomingResponseDecorator):
p2pProtocol les(version = lesVersion,
peerState = LesPeer,
networkState = LesNetwork,
outgoingRequestDecorator = outgoingRequestDecorator,
incomingRequestDecorator = incomingRequestDecorator,
incomingResponseThunkDecorator = incomingResponseDecorator):
## Handshake
##

View File

@ -647,7 +647,7 @@ proc toBloom*(filters: Filters): Bloom =
result = result or filter.bloom
type
PeerState = ref object
WhisperPeer = ref object
initialized*: bool # when successfully completed the handshake
powRequirement*: float64
bloom*: Bloom
@ -656,15 +656,15 @@ type
received: HashSet[Message]
running*: bool
WhisperState = ref object
WhisperNetwork = ref object
queue*: Queue
filters*: Filters
config*: WhisperConfig
proc run(peer: Peer) {.async.}
proc run(node: EthereumNode, network: WhisperState) {.async.}
proc run(node: EthereumNode, network: WhisperNetwork) {.async.}
proc initProtocolState*(network: var WhisperState, node: EthereumNode) =
proc initProtocolState*(network: var WhisperNetwork, node: EthereumNode) =
network.queue = initQueue(defaultQueueCapacity)
network.filters = initTable[string, Filter]()
network.config.bloom = fullBloom()
@ -673,53 +673,47 @@ proc initProtocolState*(network: var WhisperState, node: EthereumNode) =
network.config.maxMsgSize = defaultMaxMsgSize
asyncCheck node.run(network)
rlpxProtocol shh(version = whisperVersion,
peerState = PeerState,
networkState = WhisperState):
p2pProtocol Whisper(version = whisperVersion,
shortName = "shh",
peerState = WhisperPeer,
networkState = WhisperNetwork):
onPeerConnected do (peer: Peer):
debug "onPeerConnected Whisper"
let
shhNetwork = peer.networkState
shhPeer = peer.state
whisperNet = peer.networkState
whisperPeer = peer.state
asyncCheck peer.status(whisperVersion,
cast[uint](shhNetwork.config.powRequirement),
@(shhNetwork.config.bloom),
shhNetwork.config.isLightNode)
var f = peer.nextMsg(shh.status)
# When the peer does not respond with status within 500 ms we disconnect
await f or sleepAsync(500)
if not f.finished:
raise newException(UselessPeerError, "No status message received")
let m = f.read()
let m = await handshake(peer, timeout = 500,
status(whisperVersion,
cast[uint](whisperNet.config.powRequirement),
@(whisperNet.config.bloom),
whisperNet.config.isLightNode))
if m.protocolVersion == whisperVersion:
debug "Suitable Whisper peer", peer, whisperVersion
else:
raise newException(UselessPeerError, "Incompatible Whisper version")
shhPeer.powRequirement = cast[float64](m.powConverted)
whisperPeer.powRequirement = cast[float64](m.powConverted)
if m.bloom.len > 0:
if m.bloom.len != bloomSize:
raise newException(UselessPeerError, "Bloomfilter size mismatch")
else:
shhPeer.bloom.bytesCopy(m.bloom)
whisperPeer.bloom.bytesCopy(m.bloom)
else:
# If no bloom filter is send we allow all
shhPeer.bloom = fullBloom()
whisperPeer.bloom = fullBloom()
shhPeer.isLightNode = m.isLightNode
if shhPeer.isLightNode and shhNetwork.config.isLightNode:
whisperPeer.isLightNode = m.isLightNode
if whisperPeer.isLightNode and whisperNet.config.isLightNode:
# No sense in connecting two light nodes so we disconnect
raise newException(UselessPeerError, "Two light nodes connected")
shhPeer.received.init()
shhPeer.trusted = false
shhPeer.initialized = true
whisperPeer.received.init()
whisperPeer.trusted = false
whisperPeer.initialized = true
asyncCheck peer.run()
debug "Whisper peer initialized"
@ -754,7 +748,7 @@ rlpxProtocol shh(version = whisperVersion,
continue
# This peer send it thus should not receive it again
peer.state(shh).received.incl(msg)
peer.state.received.incl(msg)
if peer.networkState.queue.add(msg):
# notify filters of this message
@ -790,48 +784,59 @@ rlpxProtocol shh(version = whisperVersion,
# 'Runner' calls ---------------------------------------------------------------
proc processQueue(peer: Peer) =
var envelopes: seq[Envelope] = @[]
for message in peer.networkState(shh).queue.items:
if peer.state(shh).received.contains(message):
var
envelopes: seq[Envelope] = @[]
whisperPeer = peer.state(Whisper)
whisperNet = peer.networkState(Whisper)
for message in whisperNet.queue.items:
if whisperPeer.received.contains(message):
# debug "message was already send to peer"
continue
if message.pow < peer.state(shh).powRequirement:
if message.pow < whisperPeer.powRequirement:
debug "Message PoW too low for peer"
continue
if not bloomFilterMatch(peer.state(shh).bloom, message.bloom):
if not bloomFilterMatch(whisperPeer.bloom, message.bloom):
debug "Message does not match peer bloom filter"
continue
debug "Adding envelope"
envelopes.add(message.env)
peer.state(shh).received.incl(message)
whisperPeer.received.incl(message)
debug "Sending envelopes", amount=envelopes.len
# await peer.messages(envelopes)
asyncCheck peer.messages(envelopes)
proc run(peer: Peer) {.async.} =
peer.state(shh).running = true
while peer.state(shh).running:
if not peer.networkState(shh).config.isLightNode:
var
whisperPeer = peer.state(Whisper)
whisperNet = peer.networkState(Whisper)
whisperPeer.running = true
while whisperPeer.running:
# XXX: shouldn't this be outside of the loop?
# In case we are runinng a light node, we have nothing to do here?
if not whisperNet.config.isLightNode:
peer.processQueue()
await sleepAsync(300)
proc pruneReceived(node: EthereumNode) =
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
for peer in node.peers(shh):
if not peer.state(shh).initialized:
var whisperNet = node.protocolState(Whisper)
for peer in node.protocolPeers(Whisper):
if not peer.initialized:
continue
# NOTE: Perhaps alter the queue prune call to keep track of a HashSet
# of pruned messages (as these should be smaller), and diff this with
# the received sets.
peer.state(shh).received = intersection(peer.state(shh).received,
node.protocolState(shh).queue.itemHashes)
peer.received = intersection(peer.received, whisperNet.queue.itemHashes)
proc run(node: EthereumNode, network: WhisperState) {.async.} =
proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
while true:
# prune message queue every second
# TTL unit is in seconds, so this should be sufficient?
@ -844,7 +849,7 @@ proc run(node: EthereumNode, network: WhisperState) {.async.} =
# Public EthereumNode calls ----------------------------------------------------
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(shh):
for peer in node.peers(Whisper):
if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env)
return true
@ -853,17 +858,18 @@ proc sendMessage*(node: EthereumNode, env: var Envelope): bool =
if not env.valid(): # actually just ttl !=0 is sufficient
return false
var whisperNet = node.protocolState(Whisper)
# We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node.
let msg = initMessage(env)
if not msg.allowed(node.protocolState(shh).config):
if not msg.allowed(whisperNet.config):
return false
debug "Adding message to queue"
if node.protocolState(shh).queue.add(msg):
if whisperNet.queue.add(msg):
# Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp
node.protocolState(shh).filters.notify(msg)
whisperNet.filters.notify(msg)
return true
@ -882,11 +888,14 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
# Allow lightnode to post only direct p2p messages
if targetPeer.isSome():
return node.sendP2PMessage(targetPeer.get(), env)
elif not node.protocolState(shh).config.isLightNode:
elif not node.protocolState(Whisper).config.isLightNode:
# XXX: make this non blocking or not?
# In its current blocking state, it could be noticed by a peer that no
# messages are send for a while, and thus that mining PoW is done, and that
# next messages contains a message originated from this peer
# messages are send for a while, and thus that mining PoW is done, and
# that next messages contains a message originated from this peer
# zah: It would be hard to execute this in a background thread at the
# moment. We'll need a way to send custom "tasks" to the async message
# loop (e.g. AD2 support for AsyncChannels).
env.nonce = env.minePow(powTime)
return node.sendMessage(env)
else:
@ -898,29 +907,29 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler = none[FilterMsgHandler]()): string =
return node.protocolState(shh).filters.subscribeFilter(filter, handler)
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler)
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
var filter: Filter
return node.protocolState(shh).filters.take(filterId, filter)
return node.protocolState(Whisper).filters.take(filterId, filter)
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
return node.protocolState(shh).filters.getFilterMessages(filterId)
return node.protocolState(Whisper).filters.getFilterMessages(filterId)
proc filtersToBloom*(node: EthereumNode): Bloom =
return node.protocolState(shh).filters.toBloom()
return node.protocolState(Whisper).filters.toBloom()
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
# NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(shh).config.powRequirement = powReq
for peer in node.peers(shh):
node.protocolState(Whisper).config.powRequirement = powReq
for peer in node.peers(Whisper):
# asyncCheck peer.powRequirement(cast[uint](powReq))
await peer.powRequirement(cast[uint](powReq))
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
# NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(shh).config.bloom = bloom
for peer in node.peers(shh):
node.protocolState(Whisper).config.bloom = bloom
for peer in node.peers(Whisper):
# asyncCheck peer.bloomFilterExchange(@bloom)
await peer.bloomFilterExchange(@bloom)
@ -928,17 +937,21 @@ proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
if size > defaultMaxMsgSize:
error "size > maxMsgSize"
return false
node.protocolState(shh).config.maxMsgSize = size
node.protocolState(Whisper).config.maxMsgSize = size
return true
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
for peer in node.peers(shh):
for peer in node.peers(Whisper):
if peer.remote.id == peerId:
peer.state(shh).trusted = true
peer.state(Whisper).trusted = true
return true
# XXX: should probably only be allowed before connection is made,
# as there exists no message to communicate to peers that it is a light node
# How to arrange that?
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
node.protocolState(shh).config.isLightNode = isLightNode
node.protocolState(Whisper).config.isLightNode = isLightNode
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
node.protocolState(Whisper).config = config

View File

@ -1,4 +1,4 @@
import
testecies, testauth, testcrypt,
testecies, testauth, testcrypt, tshh,
les/test_flow_control

View File

@ -34,10 +34,10 @@ Options:
"enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303",
"enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303",
]
# shh nodes taken from:
# Whisper nodes taken from:
# https://github.com/status-im/status-react/blob/80aa0e92864c638777a45c3f2aeb66c3ae7c0b2e/resources/config/fleets.json
# These are probably not on the main network?
ShhNodes = [
WhisperNodes = [
"enode://66ba15600cda86009689354c3a77bdf1a97f4f4fb3ab50ffe34dbc904fac561040496828397be18d9744c75881ffc6ac53729ddbd2cdbdadc5f45c400e2622f7@206.189.243.176:30305",
"enode://0440117a5bc67c2908fad94ba29c7b7f2c1536e96a9df950f3265a9566bf3a7306ea8ab5a1f9794a0a641dcb1e4951ce7c093c61c0d255f4ed5d2ed02c8fce23@35.224.15.65:30305",
"enode://a80eb084f6bf3f98bf6a492fd6ba3db636986b17643695f67f543115d93d69920fb72e349e0c617a01544764f09375bb85f452b9c750a892d01d0e627d9c251e@47.89.16.125:30305",
@ -115,7 +115,7 @@ else:
let keys = newKeyPair()
var node = newEthereumNode(keys, address, netId, nil, addAllCapabilities = false)
node.addCapability shh
node.addCapability Whisper
# lets prepare some prearranged keypairs
let encPrivateKey = initPrivateKey("5dc5381cae54ba3174dc0d46040fe11614d0cc94d41185922585198b4fcef9d3")
@ -136,11 +136,11 @@ if config.main:
asyncCheck node.connectToNetwork(bootnodes, true, true)
# main network has mostly non SHH nodes, so we connect directly to SHH nodes
for nodeId in ShhNodes:
var shhENode: ENode
discard initENode(nodeId, shhENode)
var shhNode = newNode(shhENode)
asyncCheck node.peerPool.connectToNode(shhNode)
for nodeId in WhisperNodes:
var whisperENode: ENode
discard initENode(nodeId, whisperENode)
var whisperNode = newNode(whisperENode)
asyncCheck node.peerPool.connectToNode(whisperNode)
else:
var bootENode: ENode
discard initENode(DockerBootNode, bootENode)

View File

@ -26,10 +26,10 @@ type
AbcNetwork = ref object
peers: seq[string]
rlpxProtocol abc(version = 1,
peerState = AbcPeer,
networkState = AbcNetwork,
timeout = 100):
p2pProtocol abc(version = 1,
peerState = AbcPeer,
networkState = AbcNetwork,
timeout = 100):
onPeerConnected do (peer: Peer):
await peer.hi "Bob"
@ -58,10 +58,10 @@ rlpxProtocol abc(version = 1,
else:
p.state.lastResponse = "timeout"
rlpxProtocol xyz(version = 1,
peerState = XyzPeer,
useRequestIds = false,
timeout = 100):
p2pProtocol xyz(version = 1,
peerState = XyzPeer,
useRequestIds = false,
timeout = 100):
proc foo(p: Peer, s: string, a, z: int) =
p.state.messages += 1
@ -94,7 +94,6 @@ asyncTest "network with 3 peers using custom protocols":
let localKeys = newKeyPair()
let localAddress = localAddress(30303)
var localNode = newEthereumNode(localKeys, localAddress, 1, nil, useCompression = useCompression)
localNode.initProtocolStates()
localNode.startListening()
var mock1 = newMockPeer do (m: MockConf):

View File

@ -11,14 +11,14 @@ import
sequtils, options, unittest, times, tables,
nimcrypto/hash,
eth_keys, rlp,
eth_p2p/rlpx_protocols/shh_protocol as shh
eth_p2p/rlpx_protocols/shh_protocol as whisper
suite "Whisper payload":
test "should roundtrip without keys":
let payload = Payload(payload: @[byte 0, 1, 2])
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -27,9 +27,9 @@ suite "Whisper payload":
test "should roundtrip with symmetric encryption":
var symKey: SymKey
let payload = Payload(symKey: some(symKey), payload: @[byte 0, 1, 2])
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get(), symKey = some(symKey))
let decoded = whisper.decode(encoded.get(), symKey = some(symKey))
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -39,9 +39,9 @@ suite "Whisper payload":
let privKey = eth_keys.newPrivateKey()
let payload = Payload(src: some(privKey), payload: @[byte 0, 1, 2])
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -53,9 +53,9 @@ suite "Whisper payload":
let payload = Payload(dst: some(privKey.getPublicKey()),
payload: @[byte 0, 1, 2])
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get(), dst = some(privKey))
let decoded = whisper.decode(encoded.get(), dst = some(privKey))
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -73,9 +73,9 @@ suite "Whisper payload":
suite "Whisper payload padding":
test "should do max padding":
let payload = Payload(payload: repeat(byte 1, 254))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -86,9 +86,9 @@ suite "Whisper payload padding":
let privKey = eth_keys.newPrivateKey()
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 189))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -98,9 +98,9 @@ suite "Whisper payload padding":
test "should do min padding":
let payload = Payload(payload: repeat(byte 1, 253))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -111,9 +111,9 @@ suite "Whisper payload padding":
let privKey = eth_keys.newPrivateKey()
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 188))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -124,9 +124,9 @@ suite "Whisper payload padding":
test "should roundtrip custom padding":
let payload = Payload(payload: repeat(byte 1, 10),
padding: some(repeat(byte 2, 100)))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -137,9 +137,9 @@ suite "Whisper payload padding":
let padding: seq[byte] = @[]
let payload = Payload(payload: repeat(byte 1, 10),
padding: some(padding))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -149,9 +149,9 @@ suite "Whisper payload padding":
let privKey = eth_keys.newPrivateKey()
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10),
padding: some(repeat(byte 2, 100)))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -164,9 +164,9 @@ suite "Whisper payload padding":
let privKey = eth_keys.newPrivateKey()
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10),
padding: some(padding))
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let decoded = shh.decode(encoded.get())
let decoded = whisper.decode(encoded.get())
check:
decoded.isSome()
payload.payload == decoded.get().payload
@ -280,7 +280,7 @@ proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](),
src = none[PrivateKey](), topic: Topic): Message =
let payload = Payload(dst: pubKey, symKey: symKey, src: src,
payload: @[byte 0, 1, 2])
let encoded = shh.encode(payload)
let encoded = whisper.encode(payload)
let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(),
nonce: 0)
result = initMessage(env)

View File

@ -40,16 +40,16 @@ let
var node1 = newEthereumNode(keys1, localAddress(30303), 1, nil,
addAllCapabilities = false,
useCompression = useCompression)
node1.addCapability shh
node1.addCapability Whisper
var node2 = newEthereumNode(keys2, localAddress(30304), 1, nil,
addAllCapabilities = false,
useCompression = useCompression)
node2.addCapability shh
node2.addCapability Whisper
template waitForEmptyQueues() =
while node1.protocolState(shh).queue.items.len != 0 or
node2.protocolState(shh).queue.items.len != 0: poll()
while node1.protocolState(Whisper).queue.items.len != 0 or
node2.protocolState(Whisper).queue.items.len != 0: poll()
when not defined(directConnect):
let bootENode = waitFor setupBootNode()
@ -65,8 +65,6 @@ when not defined(directConnect):
node1.peerPool.connectedNodes.len() == 1
node2.peerPool.connectedNodes.len() == 1
else: # XXX: tricky without peerPool
node1.initProtocolStates()
node2.initProtocolStates()
node2.startListening()
discard waitFor node1.rlpxConnect(newNode(initENode(node2.keys.pubKey,
node2.address)))
@ -130,13 +128,13 @@ asyncTest "Filters with encryption and signing":
src = some(signKeyPair.seckey), ttl = 2,
topic = topic, payload = payloads[3])
check node2.protocolState(shh).queue.items.len == 4
check node2.protocolState(Whisper).queue.items.len == 4
var f = all(futures)
await f or sleepAsync(300)
check:
f.finished == true
node1.protocolState(shh).queue.items.len == 4
node1.protocolState(Whisper).queue.items.len == 4
for filter in filters:
check node1.unsubscribeFilter(filter) == true
@ -166,7 +164,7 @@ asyncTest "Filters with topics":
await f or sleepAsync(300)
check:
f.finished == true
node1.protocolState(shh).queue.items.len == 2
node1.protocolState(Whisper).queue.items.len == 2
node1.unsubscribeFilter(filter1) == true
node1.unsubscribeFilter(filter2) == true
@ -197,7 +195,7 @@ asyncTest "Filters with PoW":
check:
futures[0].finished == true
futures[1].finished == false
node1.protocolState(shh).queue.items.len == 1
node1.protocolState(Whisper).queue.items.len == 1
node1.unsubscribeFilter(filter1) == true
node1.unsubscribeFilter(filter2) == true
@ -238,8 +236,8 @@ asyncTest "Bloomfilter blocking":
await f or sleepAsync(300)
check:
f.finished == false
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 1
node1.protocolState(Whisper).queue.items.len == 0
node2.protocolState(Whisper).queue.items.len == 1
f = newFuture[int]()
waitForEmptyQueues()
@ -250,8 +248,8 @@ asyncTest "Bloomfilter blocking":
check:
f.finished == true
f.read() == 1
node1.protocolState(shh).queue.items.len == 1
node2.protocolState(shh).queue.items.len == 1
node1.protocolState(Whisper).queue.items.len == 1
node2.protocolState(Whisper).queue.items.len == 1
node1.unsubscribeFilter(filter) == true
@ -266,8 +264,8 @@ asyncTest "PoW blocking":
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 1
node1.protocolState(Whisper).queue.items.len == 0
node2.protocolState(Whisper).queue.items.len == 1
waitForEmptyQueues()
@ -275,8 +273,8 @@ asyncTest "PoW blocking":
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 1
node2.protocolState(shh).queue.items.len == 1
node1.protocolState(Whisper).queue.items.len == 1
node2.protocolState(Whisper).queue.items.len == 1
waitForEmptyQueues()
@ -286,16 +284,16 @@ asyncTest "Queue pruning":
for i in countdown(10, 1):
check true == node2.postMessage(ttl = i.uint32, topic = topic,
payload = payload)
check node2.protocolState(shh).queue.items.len == 10
check node2.protocolState(Whisper).queue.items.len == 10
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 10
node1.protocolState(Whisper).queue.items.len == 10
await sleepAsync(1000)
check:
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 0
node1.protocolState(Whisper).queue.items.len == 0
node2.protocolState(Whisper).queue.items.len == 0
asyncTest "Light node posting":
let topic = [byte 0, 0, 0, 0]
@ -304,7 +302,7 @@ asyncTest "Light node posting":
check:
result == false
node1.protocolState(shh).queue.items.len == 0
node1.protocolState(Whisper).queue.items.len == 0
node1.setLightNode(false)
@ -327,7 +325,7 @@ asyncTest "P2P":
check:
f.finished == true
f.read() == 1
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 0
node1.protocolState(Whisper).queue.items.len == 0
node2.protocolState(Whisper).queue.items.len == 0
node1.unsubscribeFilter(filter) == true

View File

@ -20,32 +20,30 @@ template asyncTest(name, body: untyped) =
proc scenario {.async.} = body
waitFor scenario()
asyncTest "network with 3 peers using shh protocol":
asyncTest "network with 3 peers using the Whisper protocol":
const useCompression = defined(useSnappy)
let localKeys = newKeyPair()
let localAddress = localAddress(30303)
var localNode = newEthereumNode(localKeys, localAddress, 1, nil,
addAllCapabilities = false,
useCompression = useCompression)
localNode.addCapability shh
localNode.initProtocolStates()
localNode.addCapability Whisper
localNode.startListening()
var mock1 = newMockPeer do (m: MockConf):
m.addHandshake shh.status(protocolVersion: whisperVersion, powConverted: 0,
bloom: @[], isLightNode: false)
m.expect(shh.messages)
m.addHandshake Whisper.status(protocolVersion: whisperVersion, powConverted: 0,
bloom: @[], isLightNode: false)
m.expect Whisper.messages
var mock2 = newMockPeer do (m: MockConf):
m.addHandshake shh.status(protocolVersion: whisperVersion,
powConverted: cast[uint](0.1),
bloom: @[], isLightNode: false)
m.expect(shh.messages)
m.addHandshake Whisper.status(protocolVersion: whisperVersion,
powConverted: cast[uint](0.1),
bloom: @[], isLightNode: false)
m.expect Whisper.messages
var mock1Peer = await localNode.rlpxConnect(mock1)
var mock2Peer = await localNode.rlpxConnect(mock2)
check:
mock1Peer.state(shh).powRequirement == 0
mock2Peer.state(shh).powRequirement == 0.1
mock1Peer.state(Whisper).powRequirement == 0
mock2Peer.state(Whisper).powRequirement == 0.1