Filter queue and other improvements:

- Add filter queue and getFilterMessages
- Improve some procs that can return failures
- Add + improve tests
This commit is contained in:
deme 2018-11-23 16:40:47 +01:00 committed by zah
parent 8c273b2a2d
commit 4f53236233
3 changed files with 235 additions and 87 deletions

View File

@ -21,9 +21,10 @@ const
signatureBits = 0b100'u8 ## payload flags signature mask
whisperVersion* = 6
defaultMinPow = 0.001'f64
defaultMaxMsgSize = 1024 * 1024 # * 10 # should be no higher than max RLPx size
defaultMaxMsgSize = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
bloomSize = 512 div 8
defaultQueueCapacity = 256
defaultFilterQueueCapacity = 64
type
Hash* = MDigest[256]
@ -47,6 +48,7 @@ type
src*: Option[PublicKey] ## If the message was signed, this is the public key
## of the source
payload*: Bytes ## Application data / message contents
padding*: Option[Bytes] # XXX: to be added still
Envelope* = object
## What goes on the wire in the whisper protocol - a payload and some
@ -68,6 +70,14 @@ type
bloom*: Bloom ## Filter sent to direct peers for topic-based filtering
isP2P: bool
ReceivedMessage* = object
decoded*: DecodedPayload
timestamp*: uint32
ttl*: uint32
topic*: Topic
pow*: float64
hash*: Hash
Queue* = object
## Bounded message repository
##
@ -88,8 +98,7 @@ type
## XXX: really big messages can cause excessive mem usage when using msg \
## count
# XXX: We have to return more than just the payload
FilterMsgHandler* = proc(payload: Bytes) {.closure.}
FilterMsgHandler* = proc(msg: ReceivedMessage) {.closure.}
Filter* = object
src: Option[PublicKey]
@ -100,8 +109,8 @@ type
allowP2P: bool
bloom: Bloom # cached bloom filter of all topics of filter
handler: FilterMsgHandler
# NOTE: could also have a queue here instead, or leave it to the actual client
handler: Option[FilterMsgHandler]
queue: seq[ReceivedMessage]
WhisperConfig* = object
powRequirement*: float64
@ -541,11 +550,11 @@ proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](),
Filter(src: src, privateKey: privateKey, symKey: symKey, topics: topics,
powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics))
proc notify(filters: Table[string, Filter], msg: Message) =
proc notify(filters: var Table[string, Filter], msg: Message) =
var decoded: Option[DecodedPayload]
var keyHash: Hash
for filter in filters.values:
for filter in filters.mvalues:
if not filter.allowP2P and msg.isP2P:
continue
@ -587,9 +596,17 @@ proc notify(filters: Table[string, Filter], msg: Message) =
elif src.get() != filter.src.get():
continue
# Run callback
# NOTE: could also add the message to a filter queue
filter.handler(decoded.get().payload)
var receivedMsg = ReceivedMessage(decoded: decoded.get(),
timestamp: msg.env.expiry - msg.env.ttl,
ttl: msg.env.ttl,
topic: msg.env.topic,
pow: msg.pow,
hash: msg.hash)
# Either run callback or add to queue
if filter.handler.isSome():
filter.handler.get()(receivedMsg)
else:
filter.queue.insert(receivedMsg)
type
PeerState = ref object
@ -788,21 +805,21 @@ proc run(node: EthereumNode, network: WhisperState) {.async.} =
# Public EthereumNode calls ----------------------------------------------------
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope) =
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(shh):
if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env)
break
return true
proc sendMessage*(node: EthereumNode, env: var Envelope) =
proc sendMessage*(node: EthereumNode, env: var Envelope): bool =
if not env.valid(): # actually just ttl !=0 is sufficient
return
return false
# We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node.
var msg = initMessage(env)
if not msg.allowed(node.protocolState(shh).config):
return
return false
debug "Adding message to queue"
if node.protocolState(shh).queue.add(msg):
@ -810,10 +827,13 @@ proc sendMessage*(node: EthereumNode, env: var Envelope) =
# e.g. msg from local Dapp to Dapp
node.protocolState(shh).filters.notify(msg)
return true
# XXX: add padding
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
symKey = none[SymKey](), src = none[PrivateKey](),
ttl: uint32, topic: Topic, payload: Bytes, powTime = 1,
targetPeer = none[NodeId]()) =
targetPeer = none[NodeId]()): bool =
# NOTE: Allow a post without a key? Encryption is mandatory in v6?
var payload = encode(Payload(payload: payload, src: src, dst: pubKey,
symKey: symKey))
@ -821,29 +841,33 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
var env = Envelope(expiry:epochTime().uint32 + ttl + powTime.uint32,
ttl: ttl, topic: topic, data: payload.get(), nonce: 0)
# allow lightnode to post only direct p2p messages
# Allow lightnode to post only direct p2p messages
if targetPeer.isSome():
node.sendP2PMessage(targetPeer.get(), env)
return node.sendP2PMessage(targetPeer.get(), env)
elif not node.protocolState(shh).config.isLightNode:
# XXX: make this non blocking or not?
# In its current blocking state, it could be noticed by a peer that no
# messages are send for a while, and thus that mining PoW is done, and that
# next messages contains a message originated from this peer
env.nonce = env.minePow(powTime.float)
node.sendMessage(env)
return node.sendMessage(env)
else:
error "Light node not allowed to post messages"
return false
else:
error "Encoding of payload failed"
return false
proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler: FilterMsgHandler): string =
handler = none[FilterMsgHandler]()): string =
# NOTE: Should we allow a filter without a key? Encryption is mandatory in v6?
# Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence.
var id = generateRandomID()
var filter = filter
if handler.isSome():
filter.handler = handler
else:
filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
node.protocolState(shh).filters.add(id, filter)
debug "Filter added", filter = id
return id
@ -852,6 +876,14 @@ proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
var filter: Filter
return node.protocolState(shh).filters.take(filterId, filter)
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
result = @[]
if node.protocolState(shh).filters.contains(filterId):
if node.protocolState(shh).filters[filterId].handler.isNone():
result = node.protocolState(shh).filters[filterId].queue
node.protocolState(shh).filters[filterId].queue =
newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
# NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(shh).config.powRequirement = powReq
@ -871,14 +903,18 @@ proc filtersToBloom*(node: EthereumNode): Bloom =
if filter.topics.len > 0:
result = result or filter.bloom
proc setMaxMessageSize*(node: EthereumNode, size: uint32) =
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
if size > defaultMaxMsgSize:
error "size > maxMsgSize"
return false
node.protocolState(shh).config.maxMsgSize = size
return true
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId) =
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
for peer in node.peers(shh):
if peer.remote.id == peerId:
peer.state(shh).trusted = true
break
return true
# XXX: should probably only be allowed before connection is made,
# as there exists no message to communicate to peers that it is a light node

View File

@ -145,42 +145,44 @@ else:
waitFor node.connectToNetwork(@[bootENode], true, true)
if config.watch:
var data: seq[Bytes] = @[]
proc handler(payload: Bytes) =
echo payload.repr
data.add(payload)
proc handler(msg: ReceivedMessage) =
echo msg.decoded.payload.repr
# filter encrypted asym
discard node.subscribeFilter(newFilter(privateKey = some(encPrivateKey),
topics = @[topic]), handler)
topics = @[topic]),
some((FilterMsgHandler)handler))
# filter encrypted asym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
privateKey = some(encPrivateKey),
topics = @[topic]), handler)
topics = @[topic]),
some((FilterMsgHandler)handler))
# filter encrypted sym
discard node.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]), handler)
topics = @[topic]),
some((FilterMsgHandler)handler))
# filter encrypted sym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
symKey = some(symKey),
topics = @[topic]), handler)
topics = @[topic]),
some((FilterMsgHandler)handler))
if config.post:
# encrypted asym
node.postMessage(some(encPublicKey), ttl = 5, topic = topic,
discard node.postMessage(some(encPublicKey), ttl = 5, topic = topic,
payload = repeat(byte 65, 10))
poll()
# # encrypted asym + signed
node.postMessage(some(encPublicKey), src = some(signPrivateKey), ttl = 5,
topic = topic, payload = repeat(byte 66, 10))
discard node.postMessage(some(encPublicKey), src = some(signPrivateKey),
ttl = 5, topic = topic, payload = repeat(byte 66, 10))
poll()
# # encrypted sym
node.postMessage(symKey = some(symKey), ttl = 5, topic = topic,
discard node.postMessage(symKey = some(symKey), ttl = 5, topic = topic,
payload = repeat(byte 67, 10))
poll()
# # encrypted sym + signed
node.postMessage(symKey = some(symKey), src = some(signPrivateKey), ttl = 5,
topic = topic, payload = repeat(byte 68, 10))
discard node.postMessage(symKey = some(symKey), src = some(signPrivateKey),
ttl = 5, topic = topic, payload = repeat(byte 68, 10))
while true:
poll()

View File

@ -77,50 +77,66 @@ asyncTest "Filters with encryption and signing":
var symKey: SymKey
let topic = [byte 0x12, 0, 0, 0]
var filters: seq[string] = @[]
var payloads = [repeat(byte 1, 10), repeat(byte 2, 10),
repeat(byte 3, 10), repeat(byte 4, 10)]
var futures = [newFuture[int](), newFuture[int](),
newFuture[int](), newFuture[int]()]
proc handler1(payload: Bytes) =
check payload == repeat(byte 1, 10) or payload == repeat(byte 2, 10)
proc handler2(payload: Bytes) =
check payload == repeat(byte 2, 10)
proc handler3(payload: Bytes) =
check payload == repeat(byte 3, 10) or payload == repeat(byte 4, 10)
proc handler4(payload: Bytes) =
check payload == repeat(byte 4, 10)
proc handler1(msg: ReceivedMessage) =
var count {.global.}: int
check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
count += 1
if count == 2: futures[0].complete(1)
proc handler2(msg: ReceivedMessage) =
check msg.decoded.payload == payloads[1]
futures[1].complete(1)
proc handler3(msg: ReceivedMessage) =
var count {.global.}: int
check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
count += 1
if count == 2: futures[2].complete(1)
proc handler4(msg: ReceivedMessage) =
check msg.decoded.payload == payloads[3]
futures[3].complete(1)
# Filters
# filter for encrypted asym
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), handler1))
topics = @[topic]), some(handler1)))
# filter for encrypted asym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), handler2))
topics = @[topic]), some(handler2)))
# filter for encrypted sym
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]), handler3))
topics = @[topic]), some(handler3)))
# filter for encrypted sym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
symKey = some(symKey),
topics = @[topic]), handler4))
topics = @[topic]), some(handler4)))
# Messages
# encrypted asym
node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5, topic = topic,
payload = repeat(byte 1, 10))
check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5,
topic = topic, payload = payloads[0])
# encrypted asym + signed
node2.postMessage(some(encryptKeyPair.pubkey), src = some(signKeyPair.seckey),
ttl = 4, topic = topic, payload = repeat(byte 2, 10))
check true == node2.postMessage(some(encryptKeyPair.pubkey),
src = some(signKeyPair.seckey), ttl = 4,
topic = topic, payload = payloads[1])
# encrypted sym
node2.postMessage(symKey = some(symKey), ttl = 3, topic = topic,
payload = repeat(byte 3, 10))
check true == node2.postMessage(symKey = some(symKey), ttl = 3, topic = topic,
payload = payloads[2])
# encrypted sym + signed
node2.postMessage(symKey = some(symKey), src = some(signKeyPair.seckey),
ttl = 2, topic = topic, payload = repeat(byte 4, 10))
check true == node2.postMessage(symKey = some(symKey),
src = some(signKeyPair.seckey), ttl = 2,
topic = topic, payload = payloads[3])
check node2.protocolState(shh).queue.items.len == 4
# XXX: improve the dumb sleep
await sleepAsync(300)
check node1.protocolState(shh).queue.items.len == 4
var f = all(futures)
await f or sleepAsync(300)
check:
f.finished == true
node1.protocolState(shh).queue.items.len == 4
for filter in filters:
check node1.unsubscribeFilter(filter) == true
@ -128,45 +144,126 @@ asyncTest "Filters with encryption and signing":
waitForEmptyQueues()
asyncTest "Filters with topics":
let topic1 = [byte 0x12, 0, 0, 0]
let topic2 = [byte 0x34, 0, 0, 0]
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
var futures = [newFuture[int](), newFuture[int]()]
proc handler1(msg: ReceivedMessage) =
check msg.decoded.payload == payloads[0]
futures[0].complete(1)
proc handler2(msg: ReceivedMessage) =
check msg.decoded.payload == payloads[1]
futures[1].complete(1)
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), some(handler1))
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), some(handler2))
check:
1 == 1
true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0])
true == node2.postMessage(ttl = 2, topic = topic2, payload = payloads[1])
var f = all(futures)
await f or sleepAsync(300)
check:
f.finished == true
node1.protocolState(shh).queue.items.len == 2
node1.unsubscribeFilter(filter1) == true
node1.unsubscribeFilter(filter2) == true
waitForEmptyQueues()
asyncTest "Filters with PoW":
let topic = [byte 0x12, 0, 0, 0]
var payload = repeat(byte 0, 10)
var futures = [newFuture[int](), newFuture[int]()]
proc handler1(msg: ReceivedMessage) =
check msg.decoded.payload == payload
futures[0].complete(1)
proc handler2(msg: ReceivedMessage) =
check msg.decoded.payload == payload
futures[1].complete(1)
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
some(handler1))
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10),
some(handler2))
check:
1 == 1
true == node2.postMessage(ttl = 2, topic = topic, payload = payload)
await futures[0] or sleepAsync(300)
await futures[1] or sleepAsync(300)
check:
futures[0].finished == true
futures[1].finished == false
node1.protocolState(shh).queue.items.len == 1
node1.unsubscribeFilter(filter1) == true
node1.unsubscribeFilter(filter2) == true
waitForEmptyQueues()
asyncTest "Filters with queues":
let topic = [byte 0, 0, 0, 0]
let payload = repeat(byte 0, 10)
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
for i in countdown(10, 1):
check true == node2.postMessage(ttl = i.uint32, topic = topic,
payload = payload)
await sleepAsync(300)
check:
node1.getFilterMessages(filter).len() == 10
node1.getFilterMessages(filter).len() == 0
node1.unsubscribeFilter(filter) == true
waitForEmptyQueues()
asyncTest "Bloomfilter blocking":
let sendTopic1 = [byte 0x12, 0, 0, 0]
let sendTopic2 = [byte 0x34, 0, 0, 0]
let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]]
proc handler(payload: Bytes) = discard
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
let payload = repeat(byte 0, 10)
var f: Future[int] = newFuture[int]()
proc handler(msg: ReceivedMessage) =
check msg.decoded.payload == payload
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), some(handler))
await node1.setBloomFilter(node1.filtersToBloom())
node2.postMessage(ttl = 1, topic = sendTopic1, payload = repeat(byte 0, 10))
# XXX: improve the dumb sleep
await sleepAsync(300)
check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload)
await f or sleepAsync(300)
check:
f.finished == false
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 1
f = newFuture[int]()
waitForEmptyQueues()
node2.postMessage(ttl = 1, topic = sendTopic2, payload = repeat(byte 0, 10))
# XXX: improve the dumb sleep
await sleepAsync(300)
check true == node2.postMessage(ttl = 1, topic = sendTopic2, payload = payload)
await f or sleepAsync(300)
check:
f.finished == true
f.read() == 1
node1.protocolState(shh).queue.items.len == 1
node2.protocolState(shh).queue.items.len == 1
node1.unsubscribeFilter(filter) == true
await node1.setBloomFilter(fullBloom())
waitForEmptyQueues()
asyncTest "PoW blocking":
let topic = [byte 0, 0, 0, 0]
let payload = repeat(byte 0, 10)
await node1.setPowRequirement(1.0)
node2.postMessage(ttl = 1, topic = topic, payload = repeat(byte 0, 10))
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 0
@ -175,7 +272,7 @@ asyncTest "PoW blocking":
waitForEmptyQueues()
await node1.setPowRequirement(0.0)
node2.postMessage(ttl = 1, topic = topic, payload = repeat(byte 0, 10))
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 1
@ -185,34 +282,45 @@ asyncTest "PoW blocking":
asyncTest "Queue pruning":
let topic = [byte 0, 0, 0, 0]
let payload = repeat(byte 0, 10)
for i in countdown(10, 1):
node2.postMessage(ttl = i.uint32, topic = topic, payload = repeat(byte 0, 10))
check true == node2.postMessage(ttl = i.uint32, topic = topic,
payload = payload)
check node2.protocolState(shh).queue.items.len == 10
await sleepAsync(300)
check:
node1.protocolState(shh).queue.items.len == 10
node2.protocolState(shh).queue.items.len == 10
await sleepAsync(1000)
check:
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 0
asyncTest "Lightnode":
asyncTest "Light node posting":
let topic = [byte 0, 0, 0, 0]
node1.setLightNode(true)
var result = node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10))
check:
1 == 1
result == false
node1.protocolState(shh).queue.items.len == 0
node1.setLightNode(false)
asyncTest "P2P":
let topic = [byte 0, 0, 0, 0]
var f: Future[int] = newFuture[int]()
proc handler(payload: Bytes) =
check payload == repeat(byte 4, 10)
proc handler(msg: ReceivedMessage) =
check msg.decoded.payload == repeat(byte 4, 10)
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true),
handler)
node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
node2.postMessage(ttl = 2, topic = topic, payload = repeat(byte 4, 10),
some(handler))
check:
true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
true == node2.postMessage(ttl = 2, topic = topic,
payload = repeat(byte 4, 10),
targetPeer = some(toNodeId(node1.keys.pubkey)))
await f or sleepAsync(300)
@ -221,3 +329,5 @@ asyncTest "P2P":
f.read() == 1
node1.protocolState(shh).queue.items.len == 0
node2.protocolState(shh).queue.items.len == 0
node1.unsubscribeFilter(filter) == true