Improvements and some cleanup

- Added sendP2PMessage + test
- Timeout on waiting for status message
This commit is contained in:
deme 2018-11-22 12:09:43 +01:00 committed by zah
parent 70fc6874be
commit 8c273b2a2d
3 changed files with 64 additions and 40 deletions

View File

@ -39,7 +39,6 @@ type
src*: Option[PrivateKey] ## Optional key used for signing message
dst*: Option[PublicKey] ## Optional key used for asymmetric encryption
symKey*: Option[SymKey] ## Optional key used for symmetric encryption
payload*: Bytes ## Application data / message contents
padding*: Option[Bytes] ## Padding - if unset, will automatically pad up to
@ -461,7 +460,6 @@ proc cmpPow(a, b: Message): int =
proc initMessage*(env: Envelope): Message =
result.env = env
result.hash = env.calcPowHash()
debug "PoW hash", hash = result.hash
result.size = env.toRlp().len().uint32 # XXX: calc len without creating RLP
result.pow = calcPow(result.size, result.env.ttl, result.hash)
result.bloom = topicBloom(env.topic)
@ -551,8 +549,8 @@ proc notify(filters: Table[string, Filter], msg: Message) =
if not filter.allowP2P and msg.isP2P:
continue
# NOTE: should we still check PoW if msg.isP2P? Not much sense to it?
if filter.powReq > 0 and msg.pow < filter.powReq:
# if message is direct p2p PoW doesn't matter
if msg.pow < filter.powReq and not msg.isP2P:
continue
if filter.topics.len > 0:
@ -635,8 +633,14 @@ rlpxProtocol shh(version = whisperVersion,
@(shhNetwork.config.bloom),
shhNetwork.config.isLightNode)
# XXX: we should allow this to timeout and disconnect if so
let m = await peer.nextMsg(shh.status)
var f = peer.nextMsg(shh.status)
# When the peer does not respond with status within 500 ms we disconnect
await f or sleepAsync(500)
if not f.finished:
raise newException(UselessPeerError, "No status message received")
let m = f.read()
if m.protocolVersion == whisperVersion:
debug "Suitable Whisper peer", peer, whisperVersion
else:
@ -784,44 +788,54 @@ proc run(node: EthereumNode, network: WhisperState) {.async.} =
# Public EthereumNode calls ----------------------------------------------------
# XXX: add targetPeer option
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope) =
for peer in node.peers(shh):
if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env)
break
proc sendMessage*(node: EthereumNode, env: var Envelope) =
if not env.valid(): # actually just ttl !=0 is sufficient
return
# We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node.
var msg = initMessage(env)
if not msg.allowed(node.protocolState(shh).config):
return
debug "Adding message to queue"
if node.protocolState(shh).queue.add(msg):
# Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp
node.protocolState(shh).filters.notify(msg)
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
symKey = none[SymKey](), src = none[PrivateKey](),
ttl: uint32, topic: Topic, payload: Bytes, powTime = 1) =
ttl: uint32, topic: Topic, payload: Bytes, powTime = 1,
targetPeer = none[NodeId]()) =
# NOTE: Allow a post without a key? Encryption is mandatory in v6?
# NOTE: Do we allow a light node to add messages to queue?
# if node.protocolState(shh).isLightNode:
# error "Light node not allowed to post messages"
# return
var payload = encode(Payload(payload: payload, src: src, dst: pubKey,
symKey: symKey))
if payload.isSome():
var env = Envelope(expiry:epochTime().uint32 + ttl + powTime.uint32,
ttl: ttl, topic: topic, data: payload.get(), nonce: 0)
# XXX: make this non blocking or not?
# In its current blocking state, it could be noticed by a peer that no
# messages are send for a while, and thus that mining PoW is done, and that
# next messages contains a message originated from this peer
env.nonce = env.minePow(powTime.float)
if not env.valid(): # actually just ttl !=0 is sufficient
return
# allow lightnode to post only direct p2p messages
if targetPeer.isSome():
node.sendP2PMessage(targetPeer.get(), env)
elif not node.protocolState(shh).config.isLightNode:
# XXX: make this non blocking or not?
# In its current blocking state, it could be noticed by a peer that no
# messages are send for a while, and thus that mining PoW is done, and that
# next messages contains a message originated from this peer
env.nonce = env.minePow(powTime.float)
node.sendMessage(env)
else:
error "Light node not allowed to post messages"
# We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node.
var msg = initMessage(env)
if not msg.allowed(node.protocolState(shh).config):
return
debug "Adding message to queue"
if node.protocolState(shh).queue.add(msg):
# Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp
node.protocolState(shh).filters.notify(msg)
else:
error "encoding failed"
error "Encoding of payload failed"
proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler: FilterMsgHandler): string =

View File

@ -122,7 +122,6 @@ let encPrivateKey = initPrivateKey("5dc5381cae54ba3174dc0d46040fe11614d0cc94d411
let encPublicKey = encPrivateKey.getPublicKey()
let signPrivateKey = initPrivateKey("365bda0757d22212b04fada4b9222f8c3da59b49398fa04cf612481cd893b0a3")
let signPublicKey = signPrivateKey.getPublicKey()
# var symKey: SymKey = [byte 234, 86, 75, 97, 0, 214, 53, 41, 62, 204, 78, 253, 220, 134, 78, 203, 58, 35, 51, 61, 95, 218, 42, 78, 146, 142, 229, 232, 151, 219, 224, 32]
var symKey: SymKey
let topic = [byte 0x12, 0, 0, 0]
@ -166,11 +165,6 @@ if config.watch:
symKey = some(symKey),
topics = @[topic]), handler)
# discard node.setBloomFilter(node.filtersToBloom())
# discard node.setBloomFilter(emptyBloom())
# waitFor sleepAsync(10000)
# echo data.repr
if config.post:
# encrypted asym
node.postMessage(some(encPublicKey), ttl = 5, topic = topic,

View File

@ -203,5 +203,21 @@ asyncTest "Lightnode":
1 == 1
asyncTest "P2P":
let topic = [byte 0, 0, 0, 0]
var f: Future[int] = newFuture[int]()
proc handler(payload: Bytes) =
check payload == repeat(byte 4, 10)
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true),
handler)
node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
node2.postMessage(ttl = 2, topic = topic, payload = repeat(byte 4, 10),
targetPeer = some(toNodeId(node1.keys.pubkey)))
await f or sleepAsync(300)
check:
1 == 1
f.finished == true
f.read() == 1
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 0