deploy: 0db4107ae290e982ab82fa5fc42e52bed12ebaf9

This commit is contained in:
jm-clius 2021-08-25 12:29:52 +00:00
parent f6ee239574
commit e24a5d089a
194 changed files with 1978 additions and 92 deletions

5
.gitmodules vendored
View File

@ -135,3 +135,8 @@
url = https://github.com/status-im/nim-dnsdisc.git
ignore = untracked
branch = main
[submodule "vendor/dnsclient.nim"]
path = vendor/dnsclient.nim
url = https://github.com/jm-clius/dnsclient.nim.git
ignore = untracked
branch = master

View File

@ -19,8 +19,10 @@ import libp2p/[switch, # manage transports, a single entry poi
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/protocol, # define the protocol base type
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver,# define DNS resolution
muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/node/./dnsdisc/waku_dnsdisc,
../../waku/v2/utils/peers,
../../waku/common/utils/nat,
./config_chat2
@ -344,6 +346,30 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
if conf.staticnodes.len > 0:
await connectToNodes(chat, conf.staticnodes)
elif conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
# Discover nodes via DNS
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl
var nameServers: seq[TransportAddress]
for ip in conf.dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
resolver)
if wakuDnsDiscovery.isOk:
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
waitFor chat.node.connectToNodes(discoveredPeers.get())
else:
warn "Failed to init Waku DNS discovery"
elif conf.fleet != Fleet.none:
# Connect to at least one random fleet node
echo "No static peers configured. Choosing one at random from " & $conf.fleet & " fleet..."

View File

@ -186,6 +186,23 @@ type
defaultValue: false
name: "metrics-logging" }: bool
## DNS discovery config
dnsDiscovery* {.
desc: "Enable discovering nodes via DNS"
defaultValue: false
name: "dns-discovery" }: bool
dnsDiscoveryUrl* {.
desc: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
defaultValue: ""
name: "dns-discovery-url" }: string
dnsDiscoveryNameServers* {.
desc: "DNS name server IPs to query. Argument may be repeated."
defaultValue: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")]
name: "dns-discovery-name-server" }: seq[ValidIpAddress]
## Chat2 configuration
fleet* {.

View File

@ -0,0 +1,12 @@
# These are supported funding model platforms
github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
patreon: # Replace with a single Patreon username
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
otechie: # Replace with a single Otechie username
custom: ['https://paypal.me/ba0f3']

8
vendor/dnsclient.nim/.travis.yml vendored Normal file
View File

@ -0,0 +1,8 @@
sudo: required
services:
- docker
before_install:
- docker pull nimlang/nim
script:
- docker run nimlang/nim nim --version
- docker run -v "$(pwd):/project" -w /project nimlang/nim sh -c "nimble install -dy && nimble test"

32
vendor/dnsclient.nim/README.md vendored Normal file
View File

@ -0,0 +1,32 @@
# dnsclient
Simple DNS Client and library in pure Nim
Installation
============
```
$ nimble install dnsclient
```
Usage
============
This is a hybird repo, contains a command line DNS client and library for DNS query.
For now, only some simple records are supported, but adding new records is very simple.
Feel free to make PR or raise an issue as your need!
### CLI
```
$ dnsclient TXT txt.example.huy.im
```
### Library
```nim
import dnsclient
let client = newDNSClient()
let resp = client.sendQuery("txt.example.huy.im", TXT)
assert resp.answers[0].kind == TXT
let rr = TXTRecord(resp.answers[0])
assert rr.data == "dnsclient.nim"
```

15
vendor/dnsclient.nim/dnsclient.nimble vendored Normal file
View File

@ -0,0 +1,15 @@
# Package
version = "0.1.0"
author = "Huy Doan"
description = "Simple DNS Client & Library"
license = "MIT"
srcDir = "src"
installExt = @["nim"]
bin = @["dnsclient"]
# Dependencies
requires "nim >= 0.20.0"

81
vendor/dnsclient.nim/src/dnsclient.nim vendored Normal file
View File

@ -0,0 +1,81 @@
# dnsclient
# Copyright Huy Doan
# Simple DNS client
import strutils, streams, net, nativesockets, endians
import dnsclientpkg/[protocol, records, types]
export records, types, TimeoutError
type
DNSClient = ref object of RootObj
server: string
port: Port
socket: Socket
proc newDNSClient*(server: string, port: Port): DNSClient =
## Create new DNS client
new(result)
result.socket = newSocket(sockType=SOCK_DGRAM,protocol=IPPROTO_UDP)
result.server = server
result.port = port
proc newDNSClient*(server = "8.8.8.8", port = 53): DNSClient =
## Create new DNS client with default dns server `8.8.8.8`
result = newDNSClient(server, Port(port))
proc sendQuery*(c: DNSClient, query: string, kind: QKind = A, timeout = 500): Response =
## send dns query to server
var
header = initHeader()
question = initQuestion(query, kind)
var buf = header.toStream()
question.toStream(buf)
var bufLen = buf.getPosition()
buf.setPosition(0)
var data = alloc0(bufLen)
defer: dealloc(data)
discard buf.readData(data, bufLen)
c.socket.sendTo(c.server, c.port, data, bufLen)
bufLen = 1024
var
resp = newStringOfCap(bufLen)
readFds = @[c.socket.getFd]
if selectRead(readFds, timeout) > 0:
discard c.socket.recvFrom(resp, bufLen, c.server, c.port)
else:
raise newException(TimeoutError, "Call to 'sendQuery' timed out.")
buf.setPosition(0)
buf.write(resp)
buf.setPosition(0)
result = parseResponse(buf)
proc close*(c: DNSClient) = c.socket.close()
when isMainModule:
import os
if paramCount() != 2:
quit("Usage: " & getAppFilename() & " q-type host")
let client = newDNSClient()
var
qtype = paramStr(1)
kind: QKind
# TODO: find a better way, this is inefficient especially if large values
# are added to `QKind`
for k2 in QKind.low.int..QKind.high.int:
let k = k2.QKind
if qtype == $k:
kind = k
break
if kind == UNUSED:
quit("unsupported q-type")
let resp = client.sendQuery(paramStr(2), kind)
dumpHeader(resp.header)
dumpQuestion(resp.question)
dumpRR(resp.answers)
dumpRR(resp.authorityRRs, "AUTHORITY")

View File

@ -0,0 +1,165 @@
import strutils, streams, endians, utils, random
import types, records
randomize()
proc dumpHeader*(h: Header) =
let
opcode = if h.qr == QR_QUERY: "QUERY" else: "RESPONSE"
rcode = if h.rcode == 0: "NOERROR" else: $(h.rcode)
echo ";; ->>HEADER<<- opcode: $#, status: $#, id: $#" % [opcode, rcode, $h.id]
echo ";; QUERY: $#, ANSWER: $#, AUTHORITY: $#, ADDITIONAL: $#" % [$h.qdcount, $h.ancount, $h.nscount, $h.arcount]
proc dumpQuestion*(q: Question) =
echo ";; QUESTION SECTION:"
echo ";$#.\t\t\t$#\t$#" % [q.name, $q.class, $q.kind]
proc dumpRR*(rr: seq[ResourceRecord], section = "ANSWER") =
if len(rr) <= 0:
return
echo ";; $# SECTION:" % section
for r in rr:
echo "$#.\t\t\t$#\t$#\t$#\t$#" % [r.name, $r.ttl, $r.class, $r.kind, r.toString()]
proc initHeader*(): Header =
result.id = rand(high(uint16).int).uint16
result.qr = QR_QUERY
result.opcode = OPCODE_QUERY
result.tc = 0
result.rd = 1
result.qdcount = 1
proc toStream*(h: var Header): StringStream =
result = newStringStream()
var flags: uint16
result.writeShort(h.id)
flags = 0
flags = flags or h.qr.uint16
flags = flags shl 1
flags = flags or h.opcode.uint16
flags = flags shl 4
flags = flags or h.aa.uint16
flags = flags shl 1
flags = flags or h.tc
flags = flags shl 1
flags = flags or h.rd
flags = flags shl 1
flags = flags or h.ra
flags = flags shl 7
flags = flags or h.rcode
result.writeShort(flags)
result.writeShort(h.qdcount)
result.writeShort(h.ancount)
result.writeShort(h.nscount)
result.writeShort(h.arcount)
proc initQuestion*(name: string, kind: QKind = A): Question =
result.name = name
result.kind = kind
result.class= IN
proc toStream*(q: var Question, data: StringStream) =
var labelLen: uint8
for label in q.name.split('.'):
labelLen = label.len.uint8
if labelLen < 1.uint8:
raise newException(ValueError, q.name & "is not a legal name (empty label)")
if labelLen >= 64.uint8:
raise newException(ValueError, q.name & "is not a legal name (label too long)")
data.write(labelLen)
data.write(label)
data.write('\0')
data.writeShort(q.kind.uint16)
data.writeShort(q.class.uint16)
proc parseHeader(data: StringStream): Header =
result.id = data.readShort()
var flags = data.readUint16()
result.rcode = flags and 15
flags = flags shr 7
result.ra = flags and 1
flags = flags shr 1
result.rd = flags and 1
flags = flags shr 1
result.tc = flags and 1
flags = flags shr 1
result.aa = QAuthority(flags and 1)
flags = flags shr 1
result.opcode = QOpCode(flags and 15)
flags = flags shr 4
result.qr = QQuery(flags)
result.qdcount = data.readShort()
result.ancount = data.readShort()
result.nscount = data.readShort()
result.arcount = data.readShort()
proc parseQuestion(data: StringStream): Question =
result.name = data.getName()
result.kind = QKind(data.readShort())
result.class = QClass(data.readShort())
proc parseRR(data: StringStream): ResourceRecord =
# name offset
new(result)
let
name = data.getName()
kind = QKind(data.readShort())
case kind
of A:
result = ARecord(name: name, kind: kind)
of AAAA:
result = AAAARecord(name: name, kind: kind)
of CNAME:
result = CNAMERecord(name: name, kind: kind)
of HINFO:
result = HINFORecord(name: name, kind: kind)
of MB:
result = MBRecord(name: name, kind: kind)
of MINFO:
result = MINFORecord(name: name, kind: kind)
of MR:
result = MRRecord(name: name, kind: kind)
of MX:
result = MXRecord(name: name, kind: kind)
of NS:
result = NSRecord(name: name, kind: kind)
of PTR:
result = PTRRecord(name: name, kind: kind)
of SOA:
result = SOARecord(name: name, kind: kind)
of TXT:
result = TXTRecord(name: name, kind: kind)
of SRV:
result = SRVRecord(name: name, kind: kind)
else:
raise newException(ValueError, "RR for " & $kind & " is not implemented yet")
result.class = QClass(data.readShort())
result.ttl = data.readTTL().uint32
result.rdlength = data.readShort()
result.parse(data)
proc parseResponse*(data: StringStream): Response =
result.header = parseHeader(data)
result.question = parseQuestion(data)
for _ in 0..<result.header.ancount.int:
var answer = parseRR(data)
result.answers.add(answer)
for _ in 0..<result.header.nscount.int:
var answer = parseRR(data)
result.authorityRRs.add(answer)
assert data.atEnd()
data.close()

View File

@ -0,0 +1,3 @@
import streams, strutils, types, utils
include records/[a, aaaa, cname, hinfo, mb, minfo, mr, mx, ns, ptrr, soa, srv, txt]

View File

@ -0,0 +1,7 @@
type ARecord* = ref object of ResourceRecord
address*: int32
method toString*(r: ARecord): string = ipv4ToString(r.address)
method parse*(r: ARecord, data: StringStream) =
r.address = data.readInt32()

View File

@ -0,0 +1,8 @@
type AAAARecord* = ref object of ResourceRecord
address_v6*: array[16, uint8]
method toString*(r: AAAARecord): string = ipv6ToString(r.address_v6)
method parse*(r: AAAARecord, data: StringStream) =
for i in 0..<16:
r.address_v6[i] = data.readUInt8()

View File

@ -0,0 +1,7 @@
type CNAMERecord* = ref object of ResourceRecord
cname*: string
method toString*(r: CNAMERecord): string = r.cname
method parse*(r: CNAMERecord, data: StringStream) =
r.cname = data.getName()

View File

@ -0,0 +1,9 @@
type HINFORecord* = ref object of ResourceRecord
cpu*: string
os*: string
method toString*(r: HINFORecord): string = r.cpu & " " & r.os
method parse*(r: HINFORecord, data: StringStream) =
r.cpu = data.readStr(data.readInt8())
r.os = data.readStr(data.readInt8())

View File

@ -0,0 +1,7 @@
type MBRecord* = ref object of ResourceRecord
madname*: string
method toString*(r: MBRecord): string = r.madname
method parse*(r: MBRecord, data: StringStream) =
r.madname = data.getName()

View File

@ -0,0 +1,9 @@
type MINFORecord* = ref object of ResourceRecord
rmailbx*: string
emailbx*: string
method toString*(r: MINFORecord): string = r.rmailbx & " " & r.emailbx
method parse*(r: MINFORecord, data: StringStream) =
r.rmailbx = data.getName()
r.emailbx = data.getName()

View File

@ -0,0 +1,7 @@
type MRRecord* = ref object of ResourceRecord
newname*: string
method toString*(r: MRRecord): string = r.newname
method parse*(r: MRRecord, data: StringStream) =
r.newname = data.getName()

View File

@ -0,0 +1,9 @@
type MXRecord* = ref object of ResourceRecord
preference*: uint16
exchange*: string
method toString*(r: MXRecord): string = $r.preference & " " & r.exchange
method parse*(r: MXRecord, data: StringStream) =
r.preference = data.readShort()
r.exchange = data.getName()

View File

@ -0,0 +1,7 @@
type NSRecord* = ref object of ResourceRecord
nsdname*: string
method toString*(r: NSRecord): string = r.nsdname
method parse*(r: NSRecord, data: StringStream) =
r.nsdname = data.getName()

View File

@ -0,0 +1,7 @@
type PTRRecord* = ref object of ResourceRecord
ptrdname*: string
method toString*(r: PTRRecord): string = r.ptrdname
method parse*(r: PTRRecord, data: StringStream) =
r.ptrdname = data.getName()

View File

@ -0,0 +1,19 @@
type SOARecord* = ref object of ResourceRecord
mname*: string
rname*: string
serial*: uint32
refresh*: int32
retry*: int32
expire*: int32
minimum*: uint32
method toString*(r: SOARecord): string = "$# $# $# $# $# $# $#" % [r.mname, r.rname, $r.serial, $r.refresh, $r.retry, $r.expire, $r.minimum]
method parse*(r: SOARecord, data: StringStream) =
r.mname = data.getName()
r.rname = data.getName()
r.serial = data.readTTL().uint32
r.refresh = data.readTTL()
r.retry = data.readTTL()
r.expire = data.readTTL()
r.minimum = data.readTTL().uint32

View File

@ -0,0 +1,13 @@
type SRVRecord* = ref object of ResourceRecord
priority*: uint16
weight*: uint16
port*: uint16
target*: string
method toString*(r: SRVRecord): string = $r.priority & " " & $r.weight & " " & $r.port & " " & r.target
method parse*(r: SRVRecord, data: StringStream) =
r.priority = data.readShort()
r.weight = data.readShort()
r.port = data.readShort()
r.target = data.getName()

View File

@ -0,0 +1,9 @@
type TXTRecord* = ref object of ResourceRecord
length*: uint8
data*: string
method toString*(r: TXTRecord): string = r.data
method parse*(r: TXTRecord, data: StringStream) =
r.length = data.readUint8()
r.data = data.readStr(r.length.int)

View File

@ -0,0 +1,113 @@
import streams
type
QQuery* = enum
QR_QUERY = 0
QR_RESPONSE = 1
QAuthority* = enum
AA_NONAUTHORITY = 0
AA_AUTHORITY = 1
QOpCode* = enum
OPCODE_QUERY = 0
OPCODE_IQUERY = 1
OPCODE_STATUS = 2
QClass* = enum
IN = 1
CS = 2
CH = 3
HS = 4
NONE = 254
ALL = 255
QKind* = enum
UNUSED = 0
A = 1
NS = 2
CNAME = 5
SOA = 6
MB = 7
MG = 8
MR = 9
WKS = 11
PTR = 12
HINFO = 13
MINFO = 14
MX = 15
TXT = 16
RP = 17
AFSDB = 18
SIG = 24
KEY = 25
AAAA = 28
LOC = 29
SRV = 33
NAPTR = 35
KX = 36
CERT = 37
DNAME = 39
APL = 42
DS = 43
SSHFP = 44
IPSECKEY = 45
RRSIG = 46
NSEC = 47
DNSKEY = 48
DHCID = 49
NSEC3 = 50
NSEC3PARAM = 51
TLSA = 52
HIP = 55
CDS = 59
CDNSKEY = 60
OPENPGPKEY = 61
TKEY = 249
TSIG = 250
ANY = 255
URI = 256
CAA = 257
# TA = 32768
# DLV = 32769
Header* = object
id*: uint16
qr* {.bitsize:1.}: QQuery
opcode* {.bitsize:4.}: QOpCode
aa* {.bitsize:1.}: QAuthority
tc* {.bitsize:1.}: uint16
rd* {.bitsize:1.}: uint16
ra* {.bitsize:1.}: uint16
z* {.bitsize:3.}: uint16
rcode* {.bitsize:4}: uint16
qdcount*: uint16
ancount*: uint16
nscount*: uint16
arcount*: uint16
Question* = object
name*: string
kind*: QKind
class*: QClass
ResourceRecord* = ref object of RootObj
name*: string
class*: QClass
ttl*: uint32
rdlength*: uint16
kind*: QKind
#rdata*: StringStream
Response* = object
header*: Header
question*: Question
answers*: seq[ResourceRecord]
authorityRRs*: seq[ResourceRecord]
method parse*(r: ResourceRecord, data: StringStream) {.base.} =
raise newException(LibraryError, "parser for " & $r.kind & " is not implemented yet")
method toString*(r: ResourceRecord): string {.base.} =
raise newException(LibraryError, "to override!")

View File

@ -0,0 +1,61 @@
import endians, streams, strutils
#proc pack*(inp: int16): uint16 {.inline.} =
# var inp = inp.uint16
# bigEndian16(addr result, addr inp)
#proc pack*(inp: uint16): uint16 {.inline.} =
# var inp = inp
# bigEndian16(addr result, addr inp)
proc readTTL*(s: StringStream): int32 {.inline.} =
var value = s.readInt32()
bigEndian32(addr result, addr value)
proc readShort*(s: StringStream): uint16 {.inline.} =
var value = s.readInt16()
bigEndian16(addr result, addr value)
proc writeShort*[T: int16|uint16](s: StringStream, value: T) {.inline.} =
var
value = value
input: T
bigEndian16(addr input, addr value)
s.write(input)
proc getBits(data: auto, offset: int, bits = 1): int =
let mask = ((1 shl bits) - 1) shl offset
result = (data.int and mask) shr offset
proc getName*(data: StringStream): string =
var labels: seq[string]
while true:
let
length = data.readUint8()
magic = length.getBits(6, 2)
if magic == 3:
data.setPosition(data.getPosition() - 1)
let offset = int(data.readShort() xor 0xC000)
let currentPosition = data.getPosition()
data.setPosition(offset)
labels.add(data.getName())
data.setPosition(currentPosition)
break
elif length.int > 0:
labels.add(data.readStr(length.int))
else:
break
result = labels.join(".")
proc ipv4ToString*(ip: int32): string =
let arr = cast[array[4, uint8]](ip)
arr.join(".")
proc ipv6ToString*(ip6: array[16, uint8]): string =
for i in 0..<8:
result &= ":"
result &= ip6[i * 2].toHex()
result &= ip6[i * 2 + 1].toHex()
result.removePrefix(":")

View File

@ -0,0 +1 @@
switch("path", "$projectDir/../src")

50
vendor/dnsclient.nim/tests/test1.nim vendored Normal file
View File

@ -0,0 +1,50 @@
# This is just an example to get you started. You may wish to put all of your
# tests into a single file, or separate them into multiple `test1`, `test2`
# etc. files (better names are recommended, just make sure the name starts with
# the letter 't').
#
# To run these tests, simply execute `nimble test`.
import unittest, dnsclient
let client = newDNSClient()
test "query A":
let resp = client.sendQuery("example.huy.im", A)
assert resp.answers[0].kind == A
let rr = ARecord(resp.answers[0])
assert rr.toString() == "8.8.8.8"
test "query AAAA":
let resp = client.sendQuery("google.fr", AAAA)
assert resp.answers[0].kind == AAAA
let rr = AAAARecord(resp.answers[0])
#assert rr.toString() == "0000:0000:0000:0000:0000:0000:0000:0001" ??
test "query TXT":
let resp = client.sendQuery("txt.example.huy.im", TXT)
assert resp.answers[0].kind == TXT
let rr = TXTRecord(resp.answers[0])
assert rr.data == "dnsclient.nim"
test "query MX":
let resp = client.sendQuery("mx.example.huy.im", MX)
assert resp.answers[0].kind == MX
let rr = MXRecord(resp.answers[0])
assert rr.preference == 5
assert rr.exchange == "8.8.8.8"
test "query CNAME":
let resp = client.sendQuery("cname.example.huy.im", CNAME)
assert resp.answers[0].kind == CNAME
let rr = CNAMERecord(resp.answers[0])
assert rr.cname == "example.huy.im"
test "query SRV":
let resp = client.sendQuery("_smtp._tcp.example.huy.im", SRV)
assert resp.answers[0].kind == SRV
let rr = SRVRecord(resp.answers[0])
assert rr.priority == 10
assert rr.weight == 15
assert rr.port == 25
assert rr.target == "smtp.yandex.ru"

1
vendor/dnsclient.nim/tests/test1.nims vendored Normal file
View File

@ -0,0 +1 @@
switch("path", "$projectDir/../src")

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az196-45:
# Libtool was configured on host fv-az278-14:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -3,6 +3,7 @@ on:
push:
branches:
- master
- unstable
pull_request:
workflow_dispatch:

View File

@ -9,16 +9,18 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
requires "nim >= 1.2.0",
"nimcrypto >= 0.4.1",
"https://github.com/ba0f3/dnsclient.nim == 0.1.0",
"bearssl >= 0.1.4",
"chronicles >= 0.7.2",
"chronicles#ba2817f1",
"chronos >= 2.5.2",
"metrics",
"secp256k1",
"stew#head"
"stew#head",
"https://github.com/status-im/nim-websock"
proc runTest(filename: string, verify: bool = true, sign: bool = true,
moreoptions: string = "") =
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics --verbosity:0 --hints:off"
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:lipp2p_network_protocols_metrics --verbosity:0 --hints:off"
excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
excstr.add(" -d:libp2p_pubsub_verify=" & $verify)

View File

@ -16,20 +16,19 @@ import
muxers/[muxer, mplex/mplex],
protocols/[identify, secure/secure, secure/noise],
connmanager, upgrademngrs/muxedupgrade,
nameresolving/nameresolver,
errors
export
switch, peerid, peerinfo, connection, multiaddress, crypto, errors
type
TransportProvider* = proc(upgr: Upgrade): Transport {.gcsafe, raises: [Defect].}
SecureProtocol* {.pure.} = enum
Noise,
Secio {.deprecated.}
TcpTransportOpts = object
enable: bool
flags: set[ServerFlags]
MplexOpts = object
enable: bool
newMuxer: MuxerConstructor
@ -39,7 +38,7 @@ type
addresses: seq[MultiAddress]
secureManagers: seq[SecureProtocol]
mplexOpts: MplexOpts
tcpTransportOpts: TcpTransportOpts
transports: seq[TransportProvider]
rng: ref BrHmacDrbgContext
maxConnections: int
maxIn: int
@ -47,6 +46,7 @@ type
maxConnsPerPeer: int
protoVersion: string
agentVersion: string
nameResolver: NameResolver
proc new*(T: type[SwitchBuilder]): T =
@ -58,7 +58,6 @@ proc new*(T: type[SwitchBuilder]): T =
privKey: none(PrivateKey),
addresses: @[address],
secureManagers: @[],
tcpTransportOpts: TcpTransportOpts(),
maxConnections: MaxConnections,
maxIn: -1,
maxOut: -1,
@ -97,11 +96,13 @@ proc withNoise*(b: SwitchBuilder): SwitchBuilder =
b.secureManagers.add(SecureProtocol.Noise)
b
proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder =
b.tcpTransportOpts.enable = true
b.tcpTransportOpts.flags = flags
proc withTransport*(b: SwitchBuilder, prov: TransportProvider): SwitchBuilder =
b.transports.add(prov)
b
proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder =
b.withTransport(proc(upgr: Upgrade): Transport = TcpTransport.new(flags, upgr))
proc withRng*(b: SwitchBuilder, rng: ref BrHmacDrbgContext): SwitchBuilder =
b.rng = rng
b
@ -130,6 +131,10 @@ proc withAgentVersion*(b: SwitchBuilder, agentVersion: string): SwitchBuilder =
b.agentVersion = agentVersion
b
proc withNameResolver*(b: SwitchBuilder, nameResolver: NameResolver): SwitchBuilder =
b.nameResolver = nameResolver
b
proc build*(b: SwitchBuilder): Switch
{.raises: [Defect, LPError].} =
@ -168,8 +173,8 @@ proc build*(b: SwitchBuilder): Switch
let
transports = block:
var transports: seq[Transport]
if b.tcpTransportOpts.enable:
transports.add(Transport(TcpTransport.new(b.tcpTransportOpts.flags, muxedUpgrade)))
for tProvider in b.transports:
transports.add(tProvider(muxedUpgrade))
transports
if b.secureManagers.len == 0:
@ -185,7 +190,8 @@ proc build*(b: SwitchBuilder): Switch
muxers = muxers,
secureManagers = secureManagerInstances,
connManager = connManager,
ms = ms)
ms = ms,
nameResolver = b.nameResolver)
return switch
@ -202,7 +208,8 @@ proc newStandardSwitch*(
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil): Switch
{.raises: [Defect, LPError].} =
if SecureProtocol.Secio in secureManagers:
quit("Secio is deprecated!") # use of secio is unsafe
@ -217,6 +224,7 @@ proc newStandardSwitch*(
.withMaxConnsPerPeer(maxConnsPerPeer)
.withMplex(inTimeout, outTimeout)
.withTcpTransport(transportFlags)
.withNameResolver(nameResolver)
.withNoise()
if privKey.isSome():

View File

@ -13,7 +13,7 @@
import pkg/chronos
import std/[nativesockets, hashes]
import tables, strutils, stew/shims/net
import tables, strutils, sets, stew/shims/net
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
protobuf/minprotobuf, errors
import stew/[base58, base32, endians2, results]
@ -362,6 +362,9 @@ const
MAProtocol(
mcodec: multiCodec("ws"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("wss"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("ipfs"), kind: Length, size: 0,
coder: TranscoderP2P
@ -374,6 +377,10 @@ const
mcodec: multiCodec("unix"), kind: Path, size: 0,
coder: TranscoderUnix
),
MAProtocol(
mcodec: multiCodec("dns"), kind: Length, size: 0,
coder: TranscoderDNS
),
MAProtocol(
mcodec: multiCodec("dns4"), kind: Length, size: 0,
coder: TranscoderDNS
@ -400,17 +407,22 @@ const
)
]
DNSANY* = mapEq("dns")
DNS4* = mapEq("dns4")
DNS6* = mapEq("dns6")
DNSADDR* = mapEq("dnsaddr")
IP4* = mapEq("ip4")
IP6* = mapEq("ip6")
DNS* = mapOr(mapEq("dnsaddr"), DNS4, DNS6)
DNS* = mapOr(DNSANY, DNS4, DNS6, DNSADDR)
IP* = mapOr(IP4, IP6)
TCP* = mapOr(mapAnd(DNS, mapEq("tcp")), mapAnd(IP, mapEq("tcp")))
UDP* = mapOr(mapAnd(DNS, mapEq("udp")), mapAnd(IP, mapEq("udp")))
UTP* = mapAnd(UDP, mapEq("utp"))
QUIC* = mapAnd(UDP, mapEq("quic"))
UNIX* = mapEq("unix")
WS* = mapAnd(TCP, mapEq("ws"))
WSS* = mapAnd(TCP, mapEq("wss"))
WebSockets* = mapOr(WS, WSS)
Unreliable* = mapOr(UDP)
@ -928,13 +940,24 @@ proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.
m1.append(m2).tryGet()
proc `==`*(m1: var MultiAddress, m2: MultiAddress): bool =
## Check of two MultiAddress are equal
m1.data == m2.data
proc isWire*(ma: MultiAddress): bool =
## Returns ``true`` if MultiAddress ``ma`` is one of:
## - {IP4}/{TCP, UDP}
## - {IP6}/{TCP, UDP}
## - {UNIX}/{PATH}
var
state = 0
var state = 0
const
wireProtocols = toHashSet([
multiCodec("ip4"), multiCodec("ip6"),
])
wireTransports = toHashSet([
multiCodec("tcp"), multiCodec("udp")
])
try:
for rpart in ma.items():
if rpart.isErr():
@ -947,7 +970,7 @@ proc isWire*(ma: MultiAddress): bool =
return false
let code = rcode.get()
if code == multiCodec("ip4") or code == multiCodec("ip6"):
if code in wireProtocols:
inc(state)
continue
elif code == multiCodec("unix"):
@ -962,7 +985,7 @@ proc isWire*(ma: MultiAddress): bool =
return false
let code = rcode.get()
if code == multiCodec("tcp") or code == multiCodec("udp"):
if code in wireTransports:
inc(state)
result = true
else:
@ -978,11 +1001,14 @@ proc matchPart(pat: MaPattern, protos: seq[MultiCodec]): MaPatResult =
var empty: seq[MultiCodec]
var pcs = protos
if pat.operator == Or:
result = MaPatResult(flag: false, rem: empty)
for a in pat.args:
let res = a.matchPart(pcs)
if res.flag:
return MaPatResult(flag: true, rem: res.rem)
result = MaPatResult(flag: false, rem: empty)
#Greedy Or
if result.flag == false or
result.rem.len > res.rem.len:
result = res
elif pat.operator == And:
if len(pcs) < len(pat.args):
return MaPatResult(flag: false, rem: empty)

View File

@ -201,6 +201,7 @@ const MultiCodecList = [
("p2p-webrtc-direct", 0x0114), # not in multicodec list
("onion", 0x01BC),
("p2p-circuit", 0x0122),
("dns", 0x35),
("dns4", 0x36),
("dns6", 0x37),
("dnsaddr", 0x38),

View File

@ -80,6 +80,7 @@ proc select*(m: MultistreamSelect,
trace "reading first requested proto", conn
if s == proto[0]:
trace "successfully selected ", conn, proto = proto[0]
conn.tag = proto[0]
return proto[0]
elif proto.len > 1:
# Try to negotiate alternatives
@ -92,6 +93,7 @@ proc select*(m: MultistreamSelect,
validateSuffix(s)
if s == p:
trace "selected protocol", conn, protocol = s
conn.tag = s
return s
return ""
else:
@ -169,6 +171,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler", conn, protocol = ms
await conn.writeLp(ms & "\n")
conn.tag = ms
await h.protocol.handler(conn, ms)
return
debug "no handlers", conn, protocol = ms

View File

@ -21,6 +21,9 @@ export connection
logScope:
topics = "libp2p mplexchannel"
when defined(lipp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states
##
## | State | Closed local | Closed remote
@ -157,6 +160,10 @@ method readOnce*(s: LPChannel,
## or the reads will lock each other.
try:
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
when defined(lipp2p_network_protocols_metrics):
if s.tag.len > 0:
libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.tag, "in"])
trace "readOnce", s, bytes
if bytes == 0:
await s.closeUnderlying()
@ -194,6 +201,11 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
trace "write msg", s, conn = s.conn, len = msg.len
await s.conn.writeMsg(s.id, s.msgCode, msg)
when defined(lipp2p_network_protocols_metrics):
if s.tag.len > 0:
libp2p_protocols_bytes.inc(msg.len.int64, labelValues=[s.tag, "out"])
s.activity = true
except CatchableError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg

View File

@ -0,0 +1,160 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import
std/[streams, strutils, sets, sequtils],
chronos, chronicles,
dnsclientpkg/[protocol, types]
import
nameresolver
logScope:
topics = "libp2p dnsresolver"
type
DnsResolver* = ref object of NameResolver
nameServers*: seq[TransportAddress]
proc questionToBuf(address: string, kind: QKind): seq[byte] =
try:
var
header = initHeader()
question = initQuestion(address, kind)
requestStream = header.toStream()
question.toStream(requestStream)
let dataLen = requestStream.getPosition()
requestStream.setPosition(0)
var buf = newSeq[byte](dataLen)
discard requestStream.readData(addr buf[0], dataLen)
return buf
except CatchableError as exc:
info "Failed to created DNS buffer", msg = exc.msg
return newSeq[byte](0)
proc getDnsResponse(
dnsServer: TransportAddress,
address: string,
kind: QKind): Future[Response] {.async.} =
var sendBuf = questionToBuf(address, kind)
if sendBuf.len == 0:
raise newException(ValueError, "Incorrect DNS query")
let receivedDataFuture = newFuture[void]()
proc datagramDataReceived(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async, closure.} =
receivedDataFuture.complete()
let sock =
if dnsServer.family == AddressFamily.IPv6:
newDatagramTransport6(datagramDataReceived)
else:
newDatagramTransport(datagramDataReceived)
try:
await sock.sendTo(dnsServer, addr sendBuf[0], sendBuf.len)
await receivedDataFuture or sleepAsync(5.seconds) #unix default
if not receivedDataFuture.finished:
raise newException(IOError, "DNS server timeout")
var
rawResponse = sock.getMessage()
dataStream = newStringStream()
dataStream.writeData(addr rawResponse[0], rawResponse.len)
dataStream.setPosition(0)
return parseResponse(dataStream)
finally:
await sock.closeWait()
method resolveIp*(
self: DnsResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} =
trace "Resolving IP using DNS", address, servers = self.nameservers.mapIt($it), domain
for _ in 0 ..< self.nameservers.len:
let server = self.nameservers[0]
var responseFutures: seq[Future[Response]]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
responseFutures.add(getDnsResponse(server, address, A))
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
let fut = getDnsResponse(server, address, AAAA)
if server.family == AddressFamily.IPv6:
trace "IPv6 DNS server, puting AAAA records first", server = $server
responseFutures.insert(fut)
else:
responseFutures.add(fut)
var
resolvedAddresses: OrderedSet[string]
resolveFailed = false
for fut in responseFutures:
try:
let resp = await fut
for answer in resp.answers:
resolvedAddresses.incl(answer.toString())
except CancelledError as e:
raise e
except ValueError as e:
info "Invalid DNS query", address, error=e.msg
return @[]
except CatchableError as e:
info "Failed to query DNS", address, error=e.msg
resolveFailed = true
break
if resolveFailed:
self.nameservers.add(self.nameservers[0])
self.nameservers.delete(0)
continue
trace "Got IPs from DNS server", resolvedAddresses, server = $server
return resolvedAddresses.toSeq().mapIt(initTAddress(it, port))
debug "Failed to resolve address, returning empty set"
return @[]
method resolveTxt*(
self: DnsResolver,
address: string): Future[seq[string]] {.async.} =
trace "Resolving TXT using DNS", address, servers = self.nameservers.mapIt($it)
for _ in 0 ..< self.nameservers.len:
let server = self.nameservers[0]
try:
let response = await getDnsResponse(server, address, TXT)
trace "Got TXT response", server = $server, answer=response.answers.mapIt(it.toString())
return response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except CatchableError as e:
info "Failed to query DNS", address, error=e.msg
self.nameservers.add(self.nameservers[0])
self.nameservers.delete(0)
continue
debug "Failed to resolve TXT, returning empty set"
return @[]
proc new*(
T: typedesc[DnsResolver],
nameServers: seq[TransportAddress]): T =
T(nameServers: nameServers)

View File

@ -0,0 +1,46 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import
std/[streams, strutils, tables],
chronos, chronicles
import nameresolver
export tables
logScope:
topics = "libp2p mockresolver"
type MockResolver* = ref object of NameResolver
txtResponses*: Table[string, seq[string]]
# key: address, isipv6?
ipResponses*: Table[(string, bool), seq[string]]
method resolveIp*(
self: MockResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} =
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, false)):
result.add(initTAddress(resp, port))
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, true)):
result.add(initTAddress(resp, port))
method resolveTxt*(
self: MockResolver,
address: string): Future[seq[string]] {.async.} =
return self.txtResponses.getOrDefault(address)
proc new*(T: typedesc[MockResolver]): T = T()

View File

@ -0,0 +1,148 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sugar, sets, sequtils, strutils]
import
chronos,
chronicles,
stew/[endians2, byteutils]
import ".."/[multiaddress, multicodec]
logScope:
topics = "libp2p nameresolver"
type
NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver,
address: string): Future[seq[string]] {.async, base.} =
## Get TXT record
##
doAssert(false, "Not implemented!")
method resolveIp*(
self: NameResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} =
## Resolve the specified address
##
doAssert(false, "Not implemented!")
proc getHostname(ma: MultiAddress): string =
var dnsbuf = newSeq[byte](256)
let dnsLen = ma[0].get().protoArgument(dnsbuf).get()
dnsbuf.setLen(dnsLen)
return string.fromBytes(dnsbuf)
proc resolveDnsAddress(
self: NameResolver,
ma: MultiAddress,
domain: Domain = Domain.AF_UNSPEC,
prefix = ""): Future[seq[MultiAddress]]
{.async, raises: [Defect, MaError, TransportAddressError].} =
#Resolve a single address
var pbuf: array[2, byte]
var dnsval = getHostname(ma)
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
raise newException(MaError, "Incorrect port number")
let
port = Port(fromBytesBE(uint16, pbuf))
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
var addressSuffix = ma
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.get()): continue
createdAddress &= part.tryGet()
createdAddress
func matchDnsSuffix(m1, m2: MultiAddress): MaResult[bool] =
for partMaybe in m1:
let part = ?partMaybe
if DNS.match(part): continue
let entryProt = ?m2[?part.protoCode()]
if entryProt != part:
return ok(false)
return ok(true)
proc resolveDnsAddr(
self: NameResolver,
ma: MultiAddress,
depth: int = 0): Future[seq[MultiAddress]]
{.async.} =
trace "Resolving dnsaddr", ma
if depth > 6:
info "Stopping DNSADDR recursion, probably malicious", ma
return @[]
var dnsval = getHostname(ma)
let txt = await self.resolveTxt("_dnsaddr." & dnsval)
trace "txt entries", txt
var result: seq[MultiAddress]
for entry in txt:
if not entry.startsWith("dnsaddr="): continue
let entryValue = MultiAddress.init(entry[8..^1]).tryGet()
if not matchDnsSuffix(ma, entryValue).tryGet(): continue
# The spec is not clear wheter only DNSADDR can be recursived
# or any DNS addr. Only handling DNSADDR because it's simpler
# to avoid infinite recursion
if DNSADDR.matchPartial(entryValue):
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
else:
result.add(entryValue)
if result.len == 0:
debug "Failed to resolve any DNSADDR", ma
return @[ma]
return result
proc resolveMAddresses*(
self: NameResolver,
addrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.async.} =
var res = initOrderedSet[MultiAddress]()
for address in addrs:
if not DNS.matchPartial(address):
res.incl(address)
else:
let code = address[0].get().protoCode().get()
let seq = case code:
of multiCodec("dns"):
await self.resolveDnsAddress(address)
of multiCodec("dns4"):
await self.resolveDnsAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveDnsAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
@[address]
for ad in seq:
res.incl(ad)
return res.toSeq

View File

@ -33,6 +33,8 @@ when defined(libp2p_agents_metrics):
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"])
func shortLog*(conn: ChronosStream): auto =
try:
if conn.isNil: "ChronosStream(nil)"
@ -105,6 +107,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions:
result = await s.client.readOnce(pbytes, nbytes)
s.activity = true # reset activity flag
libp2p_network_bytes.inc(nbytes.int64, labelValues = ["in"])
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:
@ -127,6 +130,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
s.activity = true # reset activity flag
libp2p_network_bytes.inc(msg.len.int64, labelValues = ["out"])
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:

View File

@ -36,6 +36,7 @@ type
peerInfo*: PeerInfo
observedAddr*: Multiaddress
upgraded*: Future[void]
tag*: string # debug tag for metrics (generally ms protocol)
transportDir*: Direction # The bottom level transport (generally the socket) direction
proc timeoutMonitor(s: Connection) {.async, gcsafe.}

View File

@ -32,6 +32,7 @@ import stream/connection,
muxers/muxer,
utils/semaphore,
connmanager,
nameresolving/nameresolver,
peerid,
peerstore,
errors,
@ -62,6 +63,7 @@ type
acceptFuts: seq[Future[void]]
dialer*: Dial
peerStore*: PeerStore
nameResolver*: NameResolver
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
@ -256,7 +258,8 @@ proc newSwitch*(peerInfo: PeerInfo,
muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect): Switch
ms: MultistreamSelect,
nameResolver: NameResolver = nil): Switch
{.raises: [Defect, LPError].} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
@ -267,7 +270,8 @@ proc newSwitch*(peerInfo: PeerInfo,
transports: transports,
connManager: connManager,
peerStore: PeerStore.new(),
dialer: Dialer.new(peerInfo, connManager, transports, ms))
dialer: Dialer.new(peerInfo, connManager, transports, ms),
nameResolver: nameResolver)
switch.mount(identity)
return switch

View File

@ -201,7 +201,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
debug "Server was closed", exc = exc.msg
raise newTransportClosedError(exc)
except CatchableError as exc:
warn "Unexpected error creating connection", exc = exc.msg
debug "Unexpected error accepting connection", exc = exc.msg
raise exc
method dial*(
@ -218,8 +218,4 @@ method dial*(
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return address.protocols
.get()
.filterIt(
it == multiCodec("tcp")
).len > 0
return TCP.match(address)

View File

@ -93,4 +93,8 @@ method handles*(
# by default we skip circuit addresses to avoid
# having to repeat the check in every transport
if address.protocols.isOk:
return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0
return address.protocols
.get()
.filterIt(
it == multiCodec("p2p-circuit")
).len == 0

View File

@ -0,0 +1,244 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sequtils]
import chronos, chronicles
import transport,
../errors,
../wire,
../multicodec,
../multistream,
../connmanager,
../multiaddress,
../stream/connection,
../upgrademngrs/upgrade,
websock/websock
logScope:
topics = "libp2p wstransport"
export transport, websock
const
WsTransportTrackerName* = "libp2p.wstransport"
type
WsStream = ref object of Connection
session: WSSession
proc init*(T: type WsStream,
session: WSSession,
dir: Direction,
timeout = 10.minutes,
observedAddr: MultiAddress = MultiAddress()): T =
let stream = T(
session: session,
timeout: timeout,
dir: dir,
observedAddr: observedAddr)
stream.initStream()
return stream
method readOnce*(
s: WsStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
let res = await s.session.recv(pbytes, nbytes)
if res == 0 and s.session.readyState == ReadyState.Closed:
raise newLPStreamEOFError()
return res
method write*(
s: WsStream,
msg: seq[byte]): Future[void] {.async.} =
try:
await s.session.send(msg, Opcode.Binary)
except WSClosedError:
raise newLPStreamEOFError()
method closeImpl*(s: WsStream): Future[void] {.async.} =
await s.session.close()
await procCall Connection(s).closeImpl()
type
WsTransport* = ref object of Transport
httpserver: HttpServer
wsserver: WSServer
connections: array[Direction, seq[WsStream]]
tlsPrivateKey: TLSPrivateKey
tlsCertificate: TLSCertificate
tlsFlags: set[TLSFlags]
flags: set[ServerFlags]
factories: seq[ExtFactory]
rng: Rng
proc secure*(self: WsTransport): bool =
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))
method start*(
self: WsTransport,
ma: MultiAddress) {.async.} =
## listen on the transport
##
if self.running:
trace "WS transport already running"
return
await procCall Transport(self).start(ma)
trace "Starting WS transport"
self.httpserver =
if self.secure:
TlsHttpServer.create(
address = self.ma.initTAddress().tryGet(),
tlsPrivateKey = self.tlsPrivateKey,
tlsCertificate = self.tlsCertificate,
flags = self.flags)
else:
HttpServer.create(self.ma.initTAddress().tryGet())
self.wsserver = WSServer.new(
factories = self.factories,
rng = self.rng)
let codec = if self.secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
# always get the resolved address in case we're bound to 0.0.0.0:0
self.ma = MultiAddress.init(
self.httpserver.localAddress()).tryGet() & codec.tryGet()
self.running = true
trace "Listening on", address = self.ma
method stop*(self: WsTransport) {.async, gcsafe.} =
## stop the transport
##
self.running = false # mark stopped as soon as possible
try:
trace "Stopping WS transport"
await procCall Transport(self).stop() # call base
checkFutures(
await allFinished(
self.connections[Direction.In].mapIt(it.close()) &
self.connections[Direction.Out].mapIt(it.close())))
# server can be nil
if not isNil(self.httpserver):
self.httpserver.stop()
await self.httpserver.closeWait()
self.httpserver = nil
trace "Transport stopped"
except CatchableError as exc:
trace "Error shutting down ws transport", exc = exc.msg
proc trackConnection(self: WsTransport, conn: WsStream, dir: Direction) =
self.connections[dir].add(conn)
proc onClose() {.async.} =
await conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"
asyncSpawn onClose()
method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
## accept a new WS connection
##
if not self.running:
raise newTransportClosedError()
try:
let
req = await self.httpserver.accept()
wstransp = await self.wsserver.handleRequest(req)
stream = WsStream.init(wstransp, Direction.In)
self.trackConnection(stream, Direction.In)
return stream
except TransportOsError as exc:
debug "OS Error", exc = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", exc = exc.msg
raise newTransportClosedError(exc)
except CatchableError as exc:
warn "Unexpected error accepting connection", exc = exc.msg
raise exc
method dial*(
self: WsTransport,
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
## dial a peer
##
trace "Dialing remote peer", address = $address
let
secure = WSS.match(address)
transp = await WebSocket.connect(
address.initTAddress().tryGet(),
"",
secure = secure,
flags = self.tlsFlags)
stream = WsStream.init(transp, Direction.Out)
self.trackConnection(stream, Direction.Out)
return stream
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return WebSockets.match(address)
proc new*(
T: typedesc[WsTransport],
upgrade: Upgrade,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
tlsFlags: set[TLSFlags] = {},
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil): T =
T(
upgrader: upgrade,
tlsPrivateKey: tlsPrivateKey,
tlsCertificate: tlsCertificate,
tlsFlags: tlsFlags,
flags: flags,
factories: @factories,
rng: rng)
proc new*(
T: typedesc[WsTransport],
upgrade: Upgrade,
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil): T =
T.new(
upgrade = upgrade,
tlsPrivateKey = nil,
tlsCertificate = nil,
flags = flags,
factories = @factories,
rng = rng)

View File

@ -19,17 +19,18 @@ else:
import posix
const
TRANSPMA* = mapOr(
mapAnd(IP, mapEq("udp")),
mapAnd(IP, mapEq("tcp")),
mapAnd(mapEq("unix"))
RTRANSPMA* = mapOr(
TCP,
WebSockets,
UNIX
)
RTRANSPMA* = mapOr(
mapAnd(IP, mapEq("tcp")),
mapAnd(mapEq("unix"))
TRANSPMA* = mapOr(
RTRANSPMA,
UDP
)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
##

View File

@ -11,14 +11,24 @@ import ../libp2p/[stream/connection,
import ./helpers
proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
suite $transportType & " common":
type TransportProvider* = proc(): Transport {.gcsafe.}
proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
suite name & " common tests":
teardown:
checkTrackers()
asyncTest "can handle local address":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1 = prov()
await transport1.start(ma)
check transport1.handles(transport1.ma)
await transport1.stop()
asyncTest "e2e: handle write":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
let transport1 = prov()
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -28,23 +38,25 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
let handlerWait = acceptHandler()
let transport2: transportType = transportType.new(upgrade = Upgrade())
let transport2 = prov()
let conn = await transport2.dial(transport1.ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
await conn.close() #for some protocols, closing requires actively, so we must close here
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await conn.close() #for some protocols, closing requires actively reading, so we must close here
await transport2.stop()
await transport1.stop()
await allFuturesThrowing(
allFinished(
transport1.stop(),
transport2.stop()))
check string.fromBytes(msg) == "Hello!"
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
asyncSpawn transport1.start(ma)
let transport1 = prov()
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
@ -55,35 +67,39 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
let handlerWait = acceptHandler()
let transport2: transportType = transportType.new(upgrade = Upgrade())
let transport2 = prov()
let conn = await transport2.dial(transport1.ma)
await conn.write("Hello!")
await conn.close() #for some protocols, closing requires actively, so we must close here
await conn.close() #for some protocols, closing requires actively reading, so we must close here
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await transport2.stop()
await transport1.stop()
await allFuturesThrowing(
allFinished(
transport1.stop(),
transport2.stop()))
asyncTest "e2e: handle dial cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
let transport1 = prov()
await transport1.start(ma)
let transport2: transportType = transportType.new(upgrade = Upgrade())
let transport2 = prov()
let cancellation = transport2.dial(transport1.ma)
await cancellation.cancelAndWait()
check cancellation.cancelled
await transport2.stop()
await transport1.stop()
await allFuturesThrowing(
allFinished(
transport1.stop(),
transport2.stop()))
asyncTest "e2e: handle accept cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
let transport1 = prov()
await transport1.start(ma)
let acceptHandler = transport1.accept()
@ -91,3 +107,55 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
check acceptHandler.cancelled
await transport1.stop()
asyncTest "e2e: stopping transport kills connections":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1 = prov()
await transport1.start(ma)
let transport2 = prov()
let acceptHandler = transport1.accept()
let conn = await transport2.dial(transport1.ma)
let serverConn = await acceptHandler
await allFuturesThrowing(
allFinished(
transport1.stop(),
transport2.stop()))
check serverConn.closed()
check conn.closed()
asyncTest "read or write on closed connection":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1 = prov()
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
await conn.close()
let handlerWait = acceptHandler()
let conn = await transport1.dial(transport1.ma)
var msg = newSeq[byte](6)
try:
await conn.readExactly(addr msg[0], 6)
check false
except CatchableError as exc:
check true
# we don't HAVE to throw on write on EOF
# (at least TCP doesn't)
try:
await conn.write(msg)
except CatchableError as exc:
check true
await conn.close() #for some protocols, closing requires actively reading, so we must close here
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await transport1.stop()

View File

@ -16,6 +16,7 @@ export asyncunit
const
StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server"
DgramTransportTrackerName = "datagram.transport"
trackerNames = [
LPStreamTrackerName,
@ -25,8 +26,9 @@ const
BufferStreamTrackerName,
TcpTransportTrackerName,
StreamTransportTrackerName,
ChronosStreamTrackerName,
StreamServerTrackerName
StreamServerTrackerName,
DgramTransportTrackerName,
ChronosStreamTrackerName
]
iterator testTrackers*(extras: openArray[string] = []): TrackerBase =

View File

@ -151,11 +151,11 @@ suite "Identify":
storedInfo1.peerId == switch2.peerInfo.peerId
storedInfo2.peerId == switch1.peerInfo.peerId
storedInfo1.addrs.toSeq() == switch2.peerInfo.addrs
storedInfo2.addrs.toSeq() == switch1.peerInfo.addrs
storedInfo1.addrs == switch2.peerInfo.addrs.toHashSet()
storedInfo2.addrs == switch1.peerInfo.addrs.toHashSet()
storedInfo1.protos.toSeq() == switch2.peerInfo.protocols
storedInfo2.protos.toSeq() == switch1.peerInfo.protocols
storedInfo1.protos == switch2.peerInfo.protocols.toHashSet()
storedInfo2.protos == switch1.peerInfo.protocols.toHashSet()
proc closeAll() {.async.} =
await conn.close()
@ -171,21 +171,17 @@ suite "Identify":
switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet())
check:
switch1.peerStore.get(switch2.peerInfo.peerId).addrs.toSeq() != switch2.peerInfo.addrs
switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toSet()
switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toHashSet()
switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toHashSet()
await identifyPush2.push(switch2.peerInfo, conn)
await closeAll()
# Wait the very end to be sure that the push has been processed
var aprotos = switch1.peerStore.get(switch2.peerInfo.peerId).protos.toSeq()
var bprotos = switch2.peerInfo.protocols
aprotos.sort()
bprotos.sort()
check:
aprotos == bprotos
switch1.peerStore.get(switch2.peerInfo.peerId).addrs == switch2.peerInfo.addrs.toSet()
switch1.peerStore.get(switch2.peerInfo.peerId).protos == switch2.peerInfo.protocols.toHashSet()
switch1.peerStore.get(switch2.peerInfo.peerId).addrs == switch2.peerInfo.addrs.toHashSet()
asyncTest "wrong peer id push identify":
@ -193,8 +189,8 @@ suite "Identify":
switch2.peerInfo.addrs.add(MultiAddress.init("/ip4/127.0.0.1/tcp/5555").tryGet())
check:
switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toSet()
switch1.peerStore.get(switch2.peerInfo.peerId).protos.toSeq() != switch2.peerInfo.protocols
switch1.peerStore.get(switch2.peerInfo.peerId).addrs != switch2.peerInfo.addrs.toHashSet()
switch1.peerStore.get(switch2.peerInfo.peerId).protos != switch2.peerInfo.protocols.toHashSet()
let oldPeerId = switch2.peerInfo.peerId
switch2.peerInfo = PeerInfo.init(PrivateKey.random(newRng()[]).get())
@ -204,10 +200,6 @@ suite "Identify":
await closeAll()
# Wait the very end to be sure that the push has been processed
var aprotos = switch1.peerStore.get(oldPeerId).protos.toSeq()
var bprotos = switch2.peerInfo.protocols
aprotos.sort()
bprotos.sort()
check:
aprotos != bprotos
switch1.peerStore.get(oldPeerId).addrs.toSeq() != switch2.peerInfo.addrs
switch1.peerStore.get(oldPeerId).protos != switch2.peerInfo.protocols.toHashSet()
switch1.peerStore.get(oldPeerId).addrs != switch2.peerInfo.addrs.toHashSet()

View File

@ -2,7 +2,7 @@ import options, tables
import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p
import ../libp2p/[daemon/daemonapi, varint]
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
type
# TODO: Unify both PeerInfo structs
@ -288,6 +288,99 @@ suite "Interop":
await daemonNode.close()
await sleepAsync(1.seconds)
asyncTest "native -> daemon websocket connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
var testFuture = newFuture[string]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
var line = string.fromBytes(await conn.readLp(1024))
check line == test
testFuture.complete(line)
await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
let nativeNode = SwitchBuilder
.new()
.withAddress(wsAddress)
.withRng(crypto.newRng())
.withMplex()
.withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr))
.withNoise()
.build()
nativeNode.mount(proto)
let awaiters = await nativeNode.start()
let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test)
check test == (await wait(testFuture, 10.secs))
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
await sleepAsync(1.seconds)
asyncTest "daemon -> native websocket connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# We are preparing expect string, which should be prefixed with varint
# length and do not have `\r\n` suffix, because we going to use
# readLine().
var buffer = initVBuffer()
buffer.writeSeq(test & "\r\n")
buffer.finish()
var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
let nativeNode = SwitchBuilder
.new()
.withAddress(wsAddress)
.withRng(crypto.newRng())
.withMplex()
.withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr))
.withNoise()
.build()
let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
check line == expect
testFuture.complete(line)
await stream.close()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses),
protos[0])
await conn.writeLp(test & "\r\n")
check expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
asyncTest "daemon -> multiple reads and writes":
var protos = @["/test-stream"]

View File

@ -53,6 +53,10 @@ const
"/ip4/1.2.3.4/tcp/80/unix/a/b/c/d/e/f",
"/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234/unix/stdio",
"/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234/unix/stdio",
"/dns/example.io/udp/65535",
"/dns4/example.io/udp/65535",
"/dns6/example.io/udp/65535",
"/dnsaddr/example.io/udp/65535",
]
FailureVectors = [
@ -257,7 +261,7 @@ const
]
),
PatternVector(pattern: DNS,
good: @["/dnsaddr/example.io", "/dns4/example.io", "/dns6/example.io"],
good: @["/dns/example.io", "/dnsaddr/example.io", "/dns4/example.io", "/dns6/example.io"],
bad: @["/ip4/127.0.0.1"],
),
PatternVector(pattern: WebRTCDirect,

View File

@ -0,0 +1,247 @@
{.used.}
import std/[streams, strutils, sets, sequtils, tables, algorithm]
import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
transports/tcptransport,
upgrademngrs/upgrade,
multiaddress,
errors,
nameresolving/nameresolver,
nameresolving/dnsresolver,
nameresolving/mockresolver,
wire]
import ./helpers
#
#Cloudflare
const fallbackDnsServers = @[
initTAddress("1.1.1.1:53"),
initTAddress("1.0.0.1:53"),
initTAddress("[2606:4700:4700::1111]:53")
]
const unixPlatform = defined(linux) or defined(solaris) or
defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly)
proc guessOsNameServers(): seq[TransportAddress] =
when unixPlatform:
var resultSeq = newSeqOfCap[TransportAddress](3)
try:
for l in lines("/etc/resolv.conf"):
let lineParsed = l.strip().split(seps = Whitespace + {'%'}, maxsplit = 2)
if lineParsed.len < 2: continue
if lineParsed[0].startsWith('#'): continue
if lineParsed[0] == "nameserver":
resultSeq.add(initTAddress(lineParsed[1], Port(53)))
if resultSeq.len > 2: break #3 nameserver max on linux
except Exception as e:
echo "Failed to get unix nameservers ", e.msg
finally:
if resultSeq.len > 0:
return resultSeq
return fallbackDnsServers
elif defined(windows):
#TODO
return fallbackDnsServers
else:
return fallbackDnsServers
suite "Name resolving":
suite "Generic Resolving":
var resolver {.threadvar.}: MockResolver
proc testOne(input: string, output: seq[Multiaddress]): bool =
let resolved = waitFor resolver.resolveMAddresses(@[Multiaddress.init(input).tryGet()])
if resolved != output:
echo "Expected ", output
echo "Got ", resolved
return false
return true
proc testOne(input: string, output: seq[string]): bool =
testOne(input, output.mapIt(Multiaddress.init(it).tryGet()))
proc testOne(input, output: string): bool =
testOne(input, @[Multiaddress.init(output).tryGet()])
asyncSetup:
resolver = MockResolver.new()
asyncTest "test multi address dns resolve":
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
resolver.ipResponses[("localhost", true)] = @["::1"]
check testOne("/dns/localhost/udp/0", @["/ip4/127.0.0.1/udp/0", "/ip6/::1/udp/0"])
check testOne("/dns4/localhost/tcp/0", "/ip4/127.0.0.1/tcp/0")
check testOne("/dns6/localhost/tcp/0", "/ip6/::1/tcp/0")
check testOne("/dns6/localhost/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", "/ip6/::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
asyncTest "test non dns resolve":
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
resolver.ipResponses[("localhost", true)] = @["::1"]
check testOne("/ip6/::1/tcp/0", "/ip6/::1/tcp/0")
asyncTest "test multiple resolve":
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
resolver.ipResponses[("localhost", true)] = @["::1"]
let resolved = waitFor resolver.resolveMAddresses(@[
Multiaddress.init("/dns/localhost/udp/0").tryGet(),
Multiaddress.init("/dns4/localhost/udp/0").tryGet(),
Multiaddress.init("/dns6/localhost/udp/0").tryGet(),
])
check resolved == @[Multiaddress.init("/ip4/127.0.0.1/udp/0").tryGet(), Multiaddress.init("/ip6/::1/udp/0").tryGet()]
asyncTest "dnsaddr recursive test":
resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @[
"dnsaddr=/dnsaddr/sjc-1.bootstrap.libp2p.io/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"dnsaddr=/dnsaddr/ams-2.bootstrap.libp2p.io/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb"
]
resolver.txtResponses["_dnsaddr.sjc-1.bootstrap.libp2p.io"] = @[
"dnsaddr=/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"dnsaddr=/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"
]
resolver.txtResponses["_dnsaddr.ams-2.bootstrap.libp2p.io"] = @[
"dnsaddr=/ip4/147.75.83.83/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"dnsaddr=/ip6/2604:1380:2000:7a00::1/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb"
]
check testOne("/dnsaddr/bootstrap.libp2p.io/", @[
"/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/ip4/147.75.83.83/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/ip6/2604:1380:2000:7a00::1/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
])
asyncTest "dnsaddr suffix matching test":
resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @[
"dnsaddr=/dnsaddr/ams-2.bootstrap.libp2p.io/tcp/4001/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"dnsaddr=/dnsaddr/sjc-1.bootstrap.libp2p.io/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"dnsaddr=/dnsaddr/nrt-1.bootstrap.libp2p.io/tcp/4001/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"dnsaddr=/dnsaddr/ewr-1.bootstrap.libp2p.io/tcp/4001/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
]
resolver.txtResponses["_dnsaddr.sjc-1.bootstrap.libp2p.io"] = @[
"dnsaddr=/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"dnsaddr=/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
]
resolver.txtResponses["_dnsaddr.ams-1.bootstrap.libp2p.io"] = @[
"dnsaddr=/ip4/147.75.69.143/tcp/4001/p2p/shouldbefiltered",
"dnsaddr=/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/shouldbefiltered",
]
check testOne("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @[
"/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
])
asyncTest "dnsaddr infinite recursion":
resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @["dnsaddr=/dnsaddr/bootstrap.libp2p.io"]
check testOne("/dnsaddr/bootstrap.libp2p.io/", "/dnsaddr/bootstrap.libp2p.io/")
suite "DNS Resolving":
teardown:
checkTrackers()
asyncTest "test manual dns ip resolve":
## DNS mock server
proc clientMark1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var msg = transp.getMessage()
let
resp = if msg[24] == 1: #AAAA or A
"\xae\xbf\x81\x80\x00\x01\x00\x03\x00\x00\x00\x00\x06\x73\x74\x61" &
"\x74\x75\x73\x02\x69\x6d\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\x68\x16\x18\xb5\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\xac\x43\x0a\xa1\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\x68\x16\x19\xb5"
else:
"\xe8\xc5\x81\x80\x00\x01\x00\x03\x00\x00\x00\x00\x06\x73\x74\x61" &
"\x74\x75\x73\x02\x69\x6d\x00\x00\x1c\x00\x01\xc0\x0c\x00\x1c\x00" &
"\x01\x00\x00\x00\x4f\x00\x10\x26\x06\x47\x00\x00\x10\x00\x00\x00" &
"\x00\x00\x00\x68\x16\x19\xb5\xc0\x0c\x00\x1c\x00\x01\x00\x00\x00" &
"\x4f\x00\x10\x26\x06\x47\x00\x00\x10\x00\x00\x00\x00\x00\x00\x68" &
"\x16\x18\xb5\xc0\x0c\x00\x1c\x00\x01\x00\x00\x00\x4f\x00\x10\x26" &
"\x06\x47\x00\x00\x10\x00\x00\x00\x00\x00\x00\xac\x43\x0a\xa1"
await transp.sendTo(raddr, resp)
let server = newDatagramTransport(clientMark1)
# The test
var dnsresolver = DnsResolver.new(@[server.localAddress])
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_UNSPEC)) ==
mapIt(
@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0",
"[2606:4700:10::6816:19b5]:0", "[2606:4700:10::6816:18b5]:0", "[2606:4700:10::ac43:aa1]:0"
], initTAddress(it))
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) ==
mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it))
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET6)) ==
mapIt(@["[2606:4700:10::6816:19b5]:0", "[2606:4700:10::6816:18b5]:0", "[2606:4700:10::ac43:aa1]:0"], initTAddress(it))
await server.closeWait()
asyncTest "test unresponsive dns server":
var unresponsiveTentatives = 0
## DNS mock server
proc clientMark1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
unresponsiveTentatives.inc()
check unresponsiveTentatives == 1
proc clientMark2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var msg = transp.getMessage()
let resp =
"\xae\xbf\x81\x80\x00\x01\x00\x03\x00\x00\x00\x00\x06\x73\x74\x61" &
"\x74\x75\x73\x02\x69\x6d\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\x68\x16\x18\xb5\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\xac\x43\x0a\xa1\xc0\x0c\x00\x01\x00" &
"\x01\x00\x00\x00\x4f\x00\x04\x68\x16\x19\xb5"
await transp.sendTo(raddr, resp)
let
unresponsiveServer = newDatagramTransport(clientMark1)
server = newDatagramTransport(clientMark2)
# The test
var dnsresolver = DnsResolver.new(@[unresponsiveServer.localAddress, server.localAddress])
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) ==
mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it))
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) ==
mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it))
await server.closeWait()
await unresponsiveServer.closeWait()
asyncTest "inexisting domain resolving":
var dnsresolver = DnsResolver.new(guessOsNameServers())
let invalid = await dnsresolver.resolveIp("thisdomain.doesnot.exist", 0.Port)
check invalid.len == 0
asyncTest "wrong domain resolving":
var dnsresolver = DnsResolver.new(guessOsNameServers())
let invalid = await dnsresolver.resolveIp("", 0.Port)
check invalid.len == 0
asyncTest "unreachable dns server":
var dnsresolver = DnsResolver.new(@[initTAddress("172.67.10.161:53")])
let invalid = await dnsresolver.resolveIp("google.fr", 0.Port)
check invalid.len == 0

View File

@ -17,6 +17,8 @@ import testmultibase,
testpeerid
import testtcptransport,
testnameresolve,
testwstransport,
testmultistream,
testbufferstream,
testidentify,

View File

@ -18,6 +18,8 @@ import ../libp2p/[errors,
muxers/muxer,
muxers/mplex/lpchannel,
stream/lpstream,
nameresolving/nameresolver,
nameresolving/mockresolver,
stream/chronosstream,
transports/tcptransport]
import ./helpers
@ -898,8 +900,8 @@ suite "Switch":
storedInfo1.peerId == switch2.peerInfo.peerId
storedInfo2.peerId == switch1.peerInfo.peerId
storedInfo1.addrs.toSeq() == switch2.peerInfo.addrs
storedInfo2.addrs.toSeq() == switch1.peerInfo.addrs
storedInfo1.addrs == switch2.peerInfo.addrs.toHashSet()
storedInfo2.addrs == switch1.peerInfo.addrs.toHashSet()
storedInfo1.protos.toSeq() == switch2.peerInfo.protocols
storedInfo2.protos.toSeq() == switch1.peerInfo.protocols
storedInfo1.protos == switch2.peerInfo.protocols.toHashSet()
storedInfo2.protos == switch1.peerInfo.protocols.toHashSet()

View File

@ -125,4 +125,7 @@ suite "TCP transport":
server.close()
await server.join()
TcpTransport.commonTransportTest("/ip4/0.0.0.0/tcp/0")
commonTransportTest(
"TcpTransport",
proc (): Transport = TcpTransport.new(upgrade = Upgrade()),
"/ip4/0.0.0.0/tcp/0")

View File

@ -0,0 +1,88 @@
{.used.}
import sequtils
import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
transports/wstransport,
upgrademngrs/upgrade,
multiaddress,
errors,
wire]
import ./helpers, ./commontransport
const
SecureKey* = """
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCdNv0SX02aeZ4/
Yc+p/Kwd5UVOHlpmK7/TVC/kcjFbdoUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaL
EZKjfy8GBZwXZnJCevFkTvGTTebXXExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D
5+HWaiiBr38gsl4ZbfyedTF40JvzokCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28F
Jh5TnxAeDHJpOVjr8XMb/xoNqiDF6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmh
dN3Crz11+t2h519hRKHxT6/l5pTx/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3
+wD7/PRTAgMBAAECggEBAJuXPEbegxMKog7gYoE9S6oaqchySc0sJyCjBPL2ANsg
JRZV38cnh0hhNDh2MfxqGd7Bd6wbYQjvZ88iiRm+WW+ARcby4MnimtxHNNYwFvG0
qt0BffqqftfkMYfV0x8coAJUdFtvy+DoQstsxhlJ3uTaJtrZLD/GlmjMWzXSX0Vy
FXiLDO7/LoSjsjaf4e4aLofIyLJS3H1T+5cr/d2mdpRzkeWkxShODsK4cRLOlZ5I
pz4Wm2770DTbiYph8ixl/CnmYn6T7V0F5VYujALknipUBeQY4e/A9vrQ/pvqJV+W
JjFUne6Rxg/lJjh8vNJp2bK1ZbzpwmZLaZIoEz8t/qECgYEAzvCCA48uQPaurSQ3
cvHDhcVwYmEaH8MW8aIW/5l8XJK60GsUHPFhEsfD/ObI5PJJ9aOqgabpRHkvD4ZY
a8QJBxCy6UeogUeKvGks8VQ34SZXLimmgrL9Mlljv0v9PloEkVYbztYyX4GVO0ov
3oH+hKO+/MclzNDyeXZx3Vv4K+UCgYEAwnyb7tqp7fRqm/8EymIZV5pa0p6h609p
EhCBi9ii6d/ewEjsBhs7bPDBO4PO9ylvOvryYZH1hVbQja2anOCBjO8dAHRHWM86
964TFriywBQkYxp6dsB8nUjLBDza2xAM3m+OGi9/ATuhEAe5sXp/fZL3tkfSaOXI
A7Gzro+kS9cCgYEAtKScSfEeBlWQa9H2mV9UN5z/mtF61YkeqTW+b8cTGVh4vWEL
wKww+gzqGAV6Duk2CLijKeSDMmO64gl7fC83VjSMiTklbhz+jbQeKFhFI0Sty71N
/j+y6NXBTgdOfLRl0lzhj2/JrzdWBtie6tR9UloCaXSKmb04PTFY+kvDWsUCgYBR
krJUnKJpi/qrM2tu93Zpp/QwIxkG+We4i/PKFDNApQVo4S0d4o4qQ1DJBZ/pSxe8
RUUkZ3PzWVZgFlCjPAcadbBUYHEMbt7sw7Z98ToIFmqspo53AIVD8yQzwtKIz1KW
eXPAx+sdOUV008ivCBIxOVNswPMfzED4S7Bxpw3iQQKBgGJhct2nBsgu0l2/wzh9
tpKbalW1RllgptNQzjuBEZMTvPF0L+7BE09/exKtt4N9s3yAzi8o6Qo7RHX5djVc
SNgafV4jj7jt2Ilh6KOy9dshtLoEkS1NmiqfVe2go2auXZdyGm+I2yzKWdKGDO0J
diTtYf1sA0PgNXdSyDC03TZl
-----END PRIVATE KEY-----
"""
SecureCert* = """
-----BEGIN CERTIFICATE-----
MIIDazCCAlOgAwIBAgIUe9fr78Dz9PedQ5Sq0uluMWQhX9wwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCSU4xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMTAzMTcwOTMzMzZaFw0zMTAz
MTUwOTMzMzZaMEUxCzAJBgNVBAYTAklOMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQCdNv0SX02aeZ4/Yc+p/Kwd5UVOHlpmK7/TVC/kcjFb
doUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaLEZKjfy8GBZwXZnJCevFkTvGTTebX
XExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D5+HWaiiBr38gsl4ZbfyedTF40Jvz
okCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28FJh5TnxAeDHJpOVjr8XMb/xoNqiDF
6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmhdN3Crz11+t2h519hRKHxT6/l5pTx
/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3+wD7/PRTAgMBAAGjUzBRMB0GA1Ud
DgQWBBRkSY1AkGUpVNxG5fYocfgFODtQmTAfBgNVHSMEGDAWgBRkSY1AkGUpVNxG
5fYocfgFODtQmTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBt
D71VH7F8GOQXITFXCrHwEq1Fx3ScuSnL04NJrXw/e9huzLVQOchAYp/EIn4x2utN
S31dt94wvi/IysOVbR1LatYNF5kKgGj2Wc6DH0PswBMk8R1G8QMeCz+hCjf1VDHe
AAW1x2q20rJAvUrT6cRBQqeiMzQj0OaJbvfnd2hu0/d0DFkcuGVgBa2zlbG5rbdU
Jnq7MQfSaZHd0uBgiKkS+Zw6XaYfWfByCAGSnUqRdOChiJ2stFVLvu+9oQ+PJjJt
Er1u9bKTUyeuYpqXr2BP9dqphwu8R4NFVUg6DIRpMFMsybaL7KAd4hD22RXCvc0m
uLu7KODi+eW62MHqs4N2
-----END CERTIFICATE-----
"""
suite "WebSocket transport":
teardown:
checkTrackers()
commonTransportTest(
"WebSocket",
proc (): Transport = WsTransport.new(Upgrade()),
"/ip4/0.0.0.0/tcp/0/ws")
commonTransportTest(
"WebSocket Secure",
proc (): Transport =
WsTransport.new(
Upgrade(),
TLSPrivateKey.init(SecureKey),
TLSCertificate.init(SecureCert),
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}),
"/ip4/0.0.0.0/tcp/0/wss")

Some files were not shown because too many files have changed in this diff Show More