diff --git a/eth_p2p/rlpx_protocols/shh_protocol.nim b/eth_p2p/rlpx_protocols/shh_protocol.nim index 2910f49..d299697 100644 --- a/eth_p2p/rlpx_protocols/shh_protocol.nim +++ b/eth_p2p/rlpx_protocols/shh_protocol.nim @@ -112,6 +112,8 @@ type handler: Option[FilterMsgHandler] queue: seq[ReceivedMessage] + Filters* = Table[string, Filter] + WhisperConfig* = object powRequirement*: float64 bloom*: Bloom @@ -558,7 +560,22 @@ proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](), Filter(src: src, privateKey: privateKey, symKey: symKey, topics: topics, powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics)) -proc notify(filters: var Table[string, Filter], msg: Message) = +proc subscribeFilter*(filters: var Filters, filter: Filter, + handler = none[FilterMsgHandler]()): string = + # NOTE: Should we allow a filter without a key? Encryption is mandatory in v6? + # Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence. + let id = generateRandomID() + var filter = filter + if handler.isSome(): + filter.handler = handler + else: + filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) + + filters.add(id, filter) + debug "Filter added", filter = id + return id + +proc notify*(filters: var Filters, msg: Message) = var decoded: Option[DecodedPayload] var keyHash: Hash @@ -616,6 +633,19 @@ proc notify(filters: var Table[string, Filter], msg: Message) = else: filter.queue.insert(receivedMsg) +proc getFilterMessages*(filters: var Filters, filterId: string): seq[ReceivedMessage] = + result = @[] + if filters.contains(filterId): + if filters[filterId].handler.isNone(): + result = filters[filterId].queue + filters[filterId].queue = + newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) + +proc toBloom*(filters: Filters): Bloom = + for filter in filters.values: + if filter.topics.len > 0: + result = result or filter.bloom + type PeerState = ref object initialized*: bool # when successfully completed the handshake @@ -628,7 +658,7 @@ type WhisperState = ref object queue*: Queue - filters*: Table[string, Filter] + filters*: Filters config*: WhisperConfig proc run(peer: Peer) {.async.} @@ -868,29 +898,17 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), proc subscribeFilter*(node: EthereumNode, filter: Filter, handler = none[FilterMsgHandler]()): string = - # NOTE: Should we allow a filter without a key? Encryption is mandatory in v6? - # Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence. - let id = generateRandomID() - var filter = filter - if handler.isSome(): - filter.handler = handler - else: - filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) - node.protocolState(shh).filters.add(id, filter) - debug "Filter added", filter = id - return id + return node.protocolState(shh).filters.subscribeFilter(filter, handler) proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = var filter: Filter return node.protocolState(shh).filters.take(filterId, filter) proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = - result = @[] - if node.protocolState(shh).filters.contains(filterId): - if node.protocolState(shh).filters[filterId].handler.isNone(): - result = node.protocolState(shh).filters[filterId].queue - node.protocolState(shh).filters[filterId].queue = - newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) + return node.protocolState(shh).filters.getFilterMessages(filterId) + +proc filtersToBloom*(node: EthereumNode): Bloom = + return node.protocolState(shh).filters.toBloom() proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = # NOTE: do we need a tolerance of old PoW for some time? @@ -906,11 +924,6 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = # asyncCheck peer.bloomFilterExchange(@bloom) await peer.bloomFilterExchange(@bloom) -proc filtersToBloom*(node: EthereumNode): Bloom = - for filter in node.protocolState(shh).filters.values: - if filter.topics.len > 0: - result = result or filter.bloom - proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = if size > defaultMaxMsgSize: error "size > maxMsgSize" diff --git a/tests/tshh.nim b/tests/tshh.nim index 05dd2c6..0d01957 100644 --- a/tests/tshh.nim +++ b/tests/tshh.nim @@ -8,7 +8,7 @@ # MIT license (LICENSE-MIT) import - sequtils, options, unittest, times, + sequtils, options, unittest, times, tables, nimcrypto/hash, eth_keys, rlp, eth_p2p/rlpx_protocols/shh_protocol as shh @@ -274,3 +274,95 @@ suite "Whisper queue": test "check field order against expected rlp order": check rlp.encode(env0) == rlp.encodeList(env0.expiry, env0.ttl, env0.topic, env0.data, env0.nonce) + +# To test filters we do not care if the msg is valid or allowed +proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](), + src = none[PrivateKey](), topic: Topic): Message = + let payload = Payload(dst: pubKey, symKey: symKey, src: src, + payload: @[byte 0, 1, 2]) + let encoded = shh.encode(payload) + let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(), + nonce: 0) + result = initMessage(env) + +suite "Whisper filter": + test "should notify filter on message with symmetric encryption": + var symKey: SymKey + let topic = [byte 0, 0, 0, 0] + let msg = prepFilterTestMsg(symKey = some(symKey), topic = topic) + + var filters = initTable[string, Filter]() + let filter = newFilter(symKey = some(symKey), topics = @[topic]) + let filterId = filters.subscribeFilter(filter) + + notify(filters, msg) + + let messages = filters.getFilterMessages(filterId) + check messages.len == 1 + + test "should notify filter on message with asymmetric encryption": + let privKey = eth_keys.newPrivateKey() + let topic = [byte 0, 0, 0, 0] + let msg = prepFilterTestMsg(pubKey = some(privKey.getPublicKey()), + topic = topic) + + var filters = initTable[string, Filter]() + let filter = newFilter(privateKey = some(privKey), topics = @[topic]) + let filterId = filters.subscribeFilter(filter) + + notify(filters, msg) + + let messages = filters.getFilterMessages(filterId) + check messages.len == 1 + + test "should notify filter on message with signature": + let privKey = eth_keys.newPrivateKey() + let topic = [byte 0, 0, 0, 0] + let msg = prepFilterTestMsg(src = some(privKey), topic = topic) + + var filters = initTable[string, Filter]() + let filter = newFilter(src = some(privKey.getPublicKey()), + topics = @[topic]) + let filterId = filters.subscribeFilter(filter) + + notify(filters, msg) + + let messages = filters.getFilterMessages(filterId) + check messages.len == 1 + + test "test notify of filter against PoW requirement": + let topic = [byte 0, 0, 0, 0] + # this message has a PoW of 0.02962962962962963, number should be updated + # in case PoW algorithm changes + let msg = prepFilterTestMsg(topic = topic) + + var filters = initTable[string, Filter]() + let + filterId1 = filters.subscribeFilter( + newFilter(topics = @[topic], powReq = 0.02962962962962963)) + filterId2 = filters.subscribeFilter( + newFilter(topics = @[topic], powReq = 0.02962962962962964)) + + notify(filters, msg) + + check: + filters.getFilterMessages(filterId1).len == 1 + filters.getFilterMessages(filterId2).len == 0 + + test "test notify of filter on message with certain topic": + let + topic1 = [byte 0xAB, 0x12, 0xCD, 0x34] + topic2 = [byte 0, 0, 0, 0] + + let msg = prepFilterTestMsg(topic = topic1) + + var filters = initTable[string, Filter]() + let + filterId1 = filters.subscribeFilter(newFilter(topics = @[topic1])) + filterId2 = filters.subscribeFilter(newFilter(topics = @[topic2])) + + notify(filters, msg) + + check: + filters.getFilterMessages(filterId1).len == 1 + filters.getFilterMessages(filterId2).len == 0