# # Ethereum P2P # (c) Copyright 2018 # Status Research & Development GmbH # # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) import sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys, eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode] const useCompression = defined(useSnappy) var nextPort = 30303 proc localAddress(port: int): Address = let port = Port(port) result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) proc startDiscoveryNode(privKey: PrivateKey, address: Address, bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} = result = newDiscoveryProtocol(privKey, address, bootnodes) result.open() await result.bootstrap() proc setupBootNode(): Future[ENode] {.async.} = let bootNodeKey = newPrivateKey() bootNodeAddr = localAddress(30301) bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[]) result = initENode(bootNodeKey.getPublicKey, bootNodeAddr) template asyncTest(name, body: untyped) = test name: proc scenario {.async.} = body waitFor scenario() proc resetMessageQueues(nodes: varargs[EthereumNode]) = for node in nodes: node.resetMessageQueue() proc prepTestNode(): EthereumNode = let keys1 = newKeyPair() result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, addAllCapabilities = false, useCompression = useCompression) nextPort.inc result.addCapability Whisper let bootENode = waitFor setupBootNode() var node1 = prepTestNode() var node2 = prepTestNode() # node2 listening and node1 not, to avoid many incoming vs outgoing var node1Connected = node1.connectToNetwork(@[bootENode], false, true) var node2Connected = node2.connectToNetwork(@[bootENode], true, true) waitFor node1Connected waitFor node2Connected suite "Whisper connections": asyncTest "Two peers connected": check: node1.peerPool.connectedNodes.len() == 1 node2.peerPool.connectedNodes.len() == 1 asyncTest "Filters with encryption and signing": let encryptKeyPair = newKeyPair() let signKeyPair = newKeyPair() var symKey: SymKey let topic = [byte 0x12, 0, 0, 0] var filters: seq[string] = @[] var payloads = [repeat(byte 1, 10), repeat(byte 2, 10), repeat(byte 3, 10), repeat(byte 4, 10)] var futures = [newFuture[int](), newFuture[int](), newFuture[int](), newFuture[int]()] proc handler1(msg: ReceivedMessage) = var count {.global.}: int check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1] count += 1 if count == 2: futures[0].complete(1) proc handler2(msg: ReceivedMessage) = check msg.decoded.payload == payloads[1] futures[1].complete(1) proc handler3(msg: ReceivedMessage) = var count {.global.}: int check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3] count += 1 if count == 2: futures[2].complete(1) proc handler4(msg: ReceivedMessage) = check msg.decoded.payload == payloads[3] futures[3].complete(1) # Filters # filter for encrypted asym filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler1)) # filter for encrypted asym + signed filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler2)) # filter for encrypted sym filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey), topics = @[topic]), handler3)) # filter for encrypted sym + signed filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), symKey = some(symKey), topics = @[topic]), handler4)) var safeTTL = 5'u32 # Messages check: # encrypted asym node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL, topic = topic, payload = payloads[0]) == true # encrypted asym + signed node2.postMessage(some(encryptKeyPair.pubkey), src = some(signKeyPair.seckey), ttl = safeTTL, topic = topic, payload = payloads[1]) == true # encrypted sym node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic, payload = payloads[2]) == true # encrypted sym + signed node2.postMessage(symKey = some(symKey), src = some(signKeyPair.seckey), ttl = safeTTL, topic = topic, payload = payloads[3]) == true node2.protocolState(Whisper).queue.items.len == 4 var f = all(futures) await f or sleepAsync(messageInterval) check: f.finished == true node1.protocolState(Whisper).queue.items.len == 4 for filter in filters: check node1.unsubscribeFilter(filter) == true resetMessageQueues(node1, node2) asyncTest "Filters with topics": let topic1 = [byte 0x12, 0, 0, 0] let topic2 = [byte 0x34, 0, 0, 0] var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] var futures = [newFuture[int](), newFuture[int]()] proc handler1(msg: ReceivedMessage) = check msg.decoded.payload == payloads[0] futures[0].complete(1) proc handler2(msg: ReceivedMessage) = check msg.decoded.payload == payloads[1] futures[1].complete(1) var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) var safeTTL = 3'u32 check: node2.postMessage(ttl = safeTTL + 1, topic = topic1, payload = payloads[0]) == true node2.postMessage(ttl = safeTTL, topic = topic2, payload = payloads[1]) == true node2.protocolState(Whisper).queue.items.len == 2 var f = all(futures) await f or sleepAsync(messageInterval) check: f.finished == true node1.protocolState(Whisper).queue.items.len == 2 node1.unsubscribeFilter(filter1) == true node1.unsubscribeFilter(filter2) == true resetMessageQueues(node1, node2) asyncTest "Filters with PoW": let topic = [byte 0x12, 0, 0, 0] var payload = repeat(byte 0, 10) var futures = [newFuture[int](), newFuture[int]()] proc handler1(msg: ReceivedMessage) = check msg.decoded.payload == payload futures[0].complete(1) proc handler2(msg: ReceivedMessage) = check msg.decoded.payload == payload futures[1].complete(1) var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0), handler1) var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 1_000_000), handler2) let safeTTL = 2'u32 check: node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true await futures[0] or sleepAsync(messageInterval) await futures[1] or sleepAsync(messageInterval) check: futures[0].finished == true futures[1].finished == false node1.protocolState(Whisper).queue.items.len == 1 node1.unsubscribeFilter(filter1) == true node1.unsubscribeFilter(filter2) == true resetMessageQueues(node1, node2) asyncTest "Filters with queues": let topic = [byte 0, 0, 0, 0] let payload = repeat(byte 0, 10) var filter = node1.subscribeFilter(newFilter(topics = @[topic])) for i in countdown(10, 1): check node2.postMessage(ttl = i.uint32, topic = topic, payload = payload) == true await sleepAsync(messageInterval) check: node1.getFilterMessages(filter).len() == 10 node1.getFilterMessages(filter).len() == 0 node1.unsubscribeFilter(filter) == true resetMessageQueues(node1, node2) asyncTest "Local filter notify": let topic = [byte 0, 0, 0, 0] var filter = node1.subscribeFilter(newFilter(topics = @[topic])) let safeTTL = 2'u32 check: node1.postMessage(ttl = safeTTL, topic = topic, payload = repeat(byte 4, 10)) == true node1.getFilterMessages(filter).len() == 1 node1.unsubscribeFilter(filter) == true await sleepAsync(messageInterval) resetMessageQueues(node1, node2) asyncTest "Bloomfilter blocking": let sendTopic1 = [byte 0x12, 0, 0, 0] let sendTopic2 = [byte 0x34, 0, 0, 0] let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]] let payload = repeat(byte 0, 10) var f: Future[int] = newFuture[int]() proc handler(msg: ReceivedMessage) = check msg.decoded.payload == payload f.complete(1) var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) await node1.setBloomFilter(node1.filtersToBloom()) let safeTTL = 2'u32 check: node2.postMessage(ttl = safeTTL, topic = sendTopic1, payload = payload) == true node2.protocolState(Whisper).queue.items.len == 1 await f or sleepAsync(messageInterval) check: f.finished == false node1.protocolState(Whisper).queue.items.len == 0 resetMessageQueues(node1, node2) f = newFuture[int]() check: node2.postMessage(ttl = safeTTL, topic = sendTopic2, payload = payload) == true node2.protocolState(Whisper).queue.items.len == 1 await f or sleepAsync(messageInterval) check: f.finished == true f.read() == 1 node1.protocolState(Whisper).queue.items.len == 1 node1.unsubscribeFilter(filter) == true await node1.setBloomFilter(fullBloom()) resetMessageQueues(node1, node2) asyncTest "PoW blocking": let topic = [byte 0, 0, 0, 0] let payload = repeat(byte 0, 10) let safeTTL = 2'u32 await node1.setPowRequirement(1_000_000) check: node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true node2.protocolState(Whisper).queue.items.len == 1 await sleepAsync(messageInterval) check: node1.protocolState(Whisper).queue.items.len == 0 resetMessageQueues(node1, node2) await node1.setPowRequirement(0.0) check: node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true node2.protocolState(Whisper).queue.items.len == 1 await sleepAsync(messageInterval) check: node1.protocolState(Whisper).queue.items.len == 1 resetMessageQueues(node1, node2) asyncTest "Queue pruning": let topic = [byte 0, 0, 0, 0] let payload = repeat(byte 0, 10) # We need a minimum TTL of 2 as when set to 1 there is a small chance that # it is already expired after messageInterval due to rounding down of float # to uint32 in postMessage() let minTTL = 2'u32 for i in countdown(minTTL + 9, minTTL): check node2.postMessage(ttl = i, topic = topic, payload = payload) == true check node2.protocolState(Whisper).queue.items.len == 10 await sleepAsync(messageInterval) check node1.protocolState(Whisper).queue.items.len == 10 await sleepAsync(int(minTTL*1000)) check node1.protocolState(Whisper).queue.items.len == 0 check node2.protocolState(Whisper).queue.items.len == 0 resetMessageQueues(node1, node2) asyncTest "P2P post": let topic = [byte 0, 0, 0, 0] var f: Future[int] = newFuture[int]() proc handler(msg: ReceivedMessage) = check msg.decoded.payload == repeat(byte 4, 10) f.complete(1) var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true), handler) check: node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true node2.postMessage(ttl = 10, topic = topic, payload = repeat(byte 4, 10), targetPeer = some(toNodeId(node1.keys.pubkey))) == true await f or sleepAsync(messageInterval) check: f.finished == true f.read() == 1 node1.protocolState(Whisper).queue.items.len == 0 node2.protocolState(Whisper).queue.items.len == 0 node1.unsubscribeFilter(filter) == true test "Light node posting": var ln1 = prepTestNode() ln1.setLightNode(true) # not listening, so will only connect to others that are listening (node2) waitFor ln1.connectToNetwork(@[bootENode], false, true) let topic = [byte 0, 0, 0, 0] let safeTTL = 2'u32 check: # normal post ln1.postMessage(ttl = safeTTL, topic = topic, payload = repeat(byte 0, 10)) == false ln1.protocolState(Whisper).queue.items.len == 0 # P2P post ln1.postMessage(ttl = safeTTL, topic = topic, payload = repeat(byte 0, 10), targetPeer = some(toNodeId(node2.keys.pubkey))) == true ln1.protocolState(Whisper).queue.items.len == 0 test "Connect two light nodes": var ln1 = prepTestNode() var ln2 = prepTestNode() ln1.setLightNode(true) ln2.setLightNode(true) ln2.startListening() let peer = waitFor ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey, ln2.address))) check peer.isNil == true