Noise and eth2/nbc fixes (#226)

* Remove noise padding payload (spec removed it)

* add log scope in secure

* avoid defect array out of range in switch secure when "na"

* improve identify traces

* wip noise fixes

* noise protobuf adjustments (trying)

* add more debugging messages/traces, improve their actual contents

* re-enable ID check in noise

* bump go daemon tag version

* bump go daemon tag version

* enable noise in daemonapi

* interop testing, (both secio and noise will be tested)

* azure cache bump (p2pd)

* CI changes

- Travis: use Go 1.14
- azure-pipelines.yml: big cleanup
- Azure: bump cache keys
- build 64-bit p2pd on 32-bit Windows
- install both Mingw-w64 architectures

* noise logging fixes

* alternate testing between noise and secio

* increase timeout to avoid VM errors in CI (multistream tests)

* refactor heartbeat management in gossipsub

* remove locking within heartbeat

* refactor heartbeat management in gossipsub

* remove locking within heartbeat

Co-authored-by: Ștefan Talpalaru <stefantalpalaru@yahoo.com>
This commit is contained in:
Giovanni Petrantoni 2020-06-20 19:56:55 +09:00 committed by GitHub
parent 7a1c1c2ea6
commit 7852c6dd0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 173 additions and 240 deletions

View File

@ -40,7 +40,7 @@ install:
# install and build go-libp2p-daemon # install and build go-libp2p-daemon
- curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh - curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh
- bash build_p2pd.sh p2pdCache - bash build_p2pd.sh p2pdCache v0.2.4
build_script: build_script:
- nimble install -y --depsOnly - nimble install -y --depsOnly

View File

@ -12,7 +12,7 @@ git:
# when multiple CI builds are queued, the tested commit needs to be in the last X commits cloned with "--depth X" # when multiple CI builds are queued, the tested commit needs to be in the last X commits cloned with "--depth X"
depth: 10 depth: 10
go: "1.12.x" go: "1.14.x"
matrix: matrix:
include: include:
@ -45,9 +45,10 @@ install:
# install and build go-libp2p-daemon # install and build go-libp2p-daemon
- curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh - curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh
- bash build_p2pd.sh p2pdCache v0.2.1 - bash build_p2pd.sh p2pdCache v0.2.4
script: script:
- nimble install -y --depsOnly - nimble install -y --depsOnly
- nimble test - nimble test
- nimble examples_build - nimble examples_build

View File

@ -13,186 +13,108 @@ strategy:
VM: 'windows-latest' VM: 'windows-latest'
PLATFORM: x64 PLATFORM: x64
TEST_LANG: c TEST_LANG: c
# Windows_cpp_64bit:
# VM: 'windows-latest'
# UCPU: amd64
# PLATFORM: x64
# TEST_LANG: cpp
#Linux_64bit:
#VM: 'ubuntu-16.04'
#PLATFORM: x64
#TEST_LANG: c
# Linux_cpp_64bit:
# VM: 'ubuntu-16.04'
# UCPU: amd64
# PLATFORM: x64
# TEST_LANG: cpp
# TODO this requires gcc -m32 as the Ubuntu image is 64-bit
# Linux_32bit:
# VM: 'ubuntu-16.04'
# ARCH: x86
# PLATFORM: x86
# TEST_LANG: c
# TODO: go version on Azure is problematic,
# we have xerrors package issue
# MacOS_64bit:
# VM: 'macOS-10.14'
# PLATFORM: x64
# TEST_LANG: c
pool: pool:
vmImage: $(VM) vmImage: $(VM)
variables: variables:
GOPATH: '$(system.defaultWorkingDirectory)/go'
V: 0 # Scripts verbosity, 1 for debugging build scripts V: 0 # Scripts verbosity, 1 for debugging build scripts
steps: steps:
- task: CacheBeta@1 - task: CacheBeta@1
displayName: 'cache Nim binaries' displayName: 'cache Nim binaries'
inputs: inputs:
key: NimBinaries | $(Agent.OS) | $(PLATFORM) | "$(Build.SourceBranchName)" | "v3" key: NimBinaries | $(Agent.OS) | $(PLATFORM) | "$(Build.SourceBranchName)" | "v4"
path: NimBinaries path: NimBinaries
- task: CacheBeta@1 - task: CacheBeta@1
displayName: 'cache Go libp2p daemon' displayName: 'cache Go libp2p daemon'
inputs: inputs:
key: p2pdCache | $(Agent.OS) | $(PLATFORM) key: p2pdCache | $(Agent.OS) | $(PLATFORM) | "v3"
path: p2pdCache path: p2pdCache
- task: CacheBeta@1 - task: CacheBeta@1
displayName: 'cache MinGW-w64' displayName: 'cache MinGW-w64'
inputs: inputs:
key: mingwCache | 8_1_0 | $(PLATFORM) key: mingwCache | 8_1_0 | $(PLATFORM) | "v1"
path: mingwCache path: mingwCache
condition: eq(variables['Agent.OS'], 'Windows_NT')
- powershell: | - powershell: |
Set-ItemProperty -Path 'HKLM:\SYSTEM\CurrentControlSet\Control\FileSystem' -Name 'LongPathsEnabled' -Value 1 Set-ItemProperty -Path 'HKLM:\SYSTEM\CurrentControlSet\Control\FileSystem' -Name 'LongPathsEnabled' -Value 1
displayName: 'long path support' displayName: 'long path support'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- bash: | - bash: |
set -e set -e
echo "PATH=${PATH}"
# custom MinGW-w64 versions for both 32-bit and 64-bit, since we need a 64-bit build of p2pd
echo "Installing MinGW-w64" echo "Installing MinGW-w64"
install_mingw() {
mkdir -p mingwCache
cd mingwCache
if [[ ! -e "$MINGW_FILE" ]]; then
rm -f *.7z
curl -OLsS "$MINGW_URL"
fi
7z x -y -bd "$MINGW_FILE" >/dev/null
mkdir -p /c/custom
mv "$MINGW_DIR" /c/custom/
cd ..
}
# 32-bit
MINGW_FILE="i686-8.1.0-release-posix-dwarf-rt_v6-rev0.7z"
MINGW_URL="https://sourceforge.net/projects/mingw-w64/files/Toolchains%20targetting%20Win32/Personal%20Builds/mingw-builds/8.1.0/threads-posix/dwarf/${MINGW_FILE}"
MINGW_DIR="mingw32"
install_mingw
# 64-bit
MINGW_FILE="x86_64-8.1.0-release-posix-seh-rt_v6-rev0.7z"
MINGW_URL="https://sourceforge.net/projects/mingw-w64/files/Toolchains%20targetting%20Win64/Personal%20Builds/mingw-builds/8.1.0/threads-posix/seh/${MINGW_FILE}"
MINGW_DIR="mingw64"
install_mingw
if [[ $PLATFORM == "x86" ]]; then if [[ $PLATFORM == "x86" ]]; then
MINGW_FILE="i686-8.1.0-release-posix-dwarf-rt_v6-rev0.7z"
MINGW_URL="https://sourceforge.net/projects/mingw-w64/files/Toolchains%20targetting%20Win32/Personal%20Builds/mingw-builds/8.1.0/threads-posix/dwarf/${MINGW_FILE}"
MINGW_DIR="mingw32" MINGW_DIR="mingw32"
else else
MINGW_FILE="x86_64-8.1.0-release-posix-seh-rt_v6-rev0.7z"
MINGW_URL="https://sourceforge.net/projects/mingw-w64/files/Toolchains%20targetting%20Win64/Personal%20Builds/mingw-builds/8.1.0/threads-posix/seh/${MINGW_FILE}"
MINGW_DIR="mingw64" MINGW_DIR="mingw64"
fi fi
mkdir -p mingwCache export PATH="/c/custom/${MINGW_DIR}/bin:${PATH}"
pushd mingwCache
if [[ ! -e "$MINGW_FILE" ]]; then
rm -f *.7z
curl -OLsS "$MINGW_URL"
fi
7z x -y -bd "$MINGW_FILE" >/dev/null
mkdir -p /c/custom
mv "$MINGW_DIR" /c/custom/
popd
# Workaround https://developercommunity.visualstudio.com/content/problem/891929/windows-2019-cygheap-base-mismatch-detected-git-ba.html
echo "##vso[task.prependpath]/usr/bin"
echo "##vso[task.prependpath]/mingw64/bin"
echo "##vso[task.setvariable variable=MINGW_DIR;]$MINGW_DIR"
displayName: 'Install dependencies (Windows)'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- powershell: |
# export custom mingw PATH to other tasks
echo "##vso[task.prependpath]c:\custom\$(MINGW_DIR)\bin"
displayName: 'Mingw PATH (Windows)'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- bash: |
set -e
echo "PATH=${PATH}"
export ncpu=
case '$(Agent.OS)' in
'Linux')
ncpu=$(nproc)
;;
'Darwin')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows_NT')
ncpu=$NUMBER_OF_PROCESSORS
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "Found ${ncpu} cores"
echo "##vso[task.setvariable variable=ncpu;]$ncpu"
displayName: 'Detecting number of cores'
- bash: |
set -e
echo "PATH=${PATH}"
# build nim from our own branch - this to avoid the day-to-day churn and
# regressions of the fast-paced Nim development while maintaining the
# flexibility to apply patches
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="mingw32-make -j${ncpu}" ARCH_OVERRIDE=$(PLATFORM) bash build_nim.sh Nim csources dist/nimble NimBinaries
displayName: 'Building Nim (Windows)'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- powershell: |
echo "##vso[task.prependpath]$pwd\Nim\bin"
displayName: 'Set env variable (Windows)'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- bash: |
set -e
echo "PATH=${PATH}"
# build nim from our own branch - this to avoid the day-to-day churn and
# regressions of the fast-paced Nim development while maintaining the
# flexibility to apply patches
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="make -j${ncpu}" ARCH_OVERRIDE=$(PLATFORM) bash build_nim.sh Nim csources dist/nimble NimBinaries
displayName: 'Building Nim (POSIX)'
condition: ne(variables['Agent.OS'], 'Windows_NT')
- bash: |
echo "##vso[task.prependpath]$PWD/Nim/bin"
displayName: 'Set env variable (Posix)'
condition: ne(variables['Agent.OS'], 'Windows_NT')
- bash: |
set -e
# install and build go-libp2p-daemon
go version
echo "##vso[task.prependpath]$(GOPATH)/bin"
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh
bash build_p2pd.sh p2pdCache
displayName: 'Building Go libp2p daemon'
- bash: |
set -e
echo "PATH=${PATH}"
nimble refresh
nimble install -y --depsOnly
displayName: 'Building dependencies (Posix)'
condition: ne(variables['Agent.OS'], 'Windows_NT')
- powershell: |
echo $Env:Path
nimble refresh
nimble install -y --depsOnly
displayName: 'Building dependencies (Windows)'
condition: eq(variables['Agent.OS'], 'Windows_NT')
- bash: |
set -e
echo "PATH=${PATH}" echo "PATH=${PATH}"
which gcc which gcc
gcc -v gcc -v
# detect number of cores
export ncpu="$NUMBER_OF_PROCESSORS"
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=2
echo "Found ${ncpu} cores"
# build nim from our own branch - this to avoid the day-to-day churn and
# regressions of the fast-paced Nim development while maintaining the
# flexibility to apply patches
curl -OLsS https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="mingw32-make -j${ncpu}" ARCH_OVERRIDE=$(PLATFORM) bash build_nim.sh Nim csources dist/nimble NimBinaries
export PATH="${PWD}/Nim/bin:${PATH}"
echo "PATH=${PATH}"
# install and build go-libp2p-daemon
go version
export GOPATH="${PWD}/go"
export PATH="${GOPATH}/bin:${PATH}"
echo "PATH=${PATH}"
curl -OLsS https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh
# we can't seem to be able to build a 32-bit p2pd
env PATH="/c/custom/mingw64/bin:${PATH}" bash build_p2pd.sh p2pdCache v0.2.4
# install dependencies
nimble refresh
nimble install -y --depsOnly
# run tests
nimble test nimble test
nimble examples_build nimble examples_build
displayName: 'Testing the package' displayName: 'build and test'

View File

@ -17,18 +17,19 @@ requires "nim >= 1.2.0",
"stew >= 0.1.0" "stew >= 0.1.0"
proc runTest(filename: string, verify: bool = true, sign: bool = true) = proc runTest(filename: string, verify: bool = true, sign: bool = true) =
var excstr: string = "nim c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info" var excstr = "nim c -r --opt:speed -d:debug --verbosity:0 --hints:off -d:chronicles_log_level=info"
excstr.add(" ") excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
excstr.add("-d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
excstr.add(" ") excstr.add(" -d:libp2p_pubsub_verify=" & $verify)
excstr.add("-d:libp2p_pubsub_verify=" & $verify) excstr.add(" tests/" & filename)
excstr.add(" ")
excstr.add("tests/" & filename)
exec excstr exec excstr
rmFile "tests/" & filename.toExe rmFile "tests/" & filename.toExe
proc buildSample(filename: string) = proc buildSample(filename: string) =
exec "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off examples/" & filename var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off"
excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
excstr.add(" examples/" & filename)
exec excstr
rmFile "examples" & filename.toExe rmFile "examples" & filename.toExe
task testnative, "Runs libp2p native tests": task testnative, "Runs libp2p native tests":

View File

@ -736,6 +736,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
if i > 0: opt.add "," if i > 0: opt.add ","
opt.add $address opt.add $address
args.add(opt) args.add(opt)
args.add("-noise=true")
args.add("-listen=" & $api.address) args.add("-listen=" & $api.address)
# We are trying to get absolute daemon path. # We are trying to get absolute daemon path.

View File

@ -47,6 +47,7 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider
method init(c: MuxerProvider) = method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
trace "starting muxer handler", proto=proto, peer=conn
try: try:
let let
muxer = c.newMuxer(conn) muxer = c.newMuxer(conn)
@ -63,7 +64,7 @@ method init(c: MuxerProvider) =
checkFutures(await allFinished(futs)) checkFutures(await allFinished(futs))
except CatchableError as exc: except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg trace "exception in muxer handler", exc = exc.msg, peer=conn, proto=proto
c.handler = handler c.handler = handler

View File

@ -129,7 +129,7 @@ method init*(p: Identify) =
proc identify*(p: Identify, proc identify*(p: Identify,
conn: Connection, conn: Connection,
remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} =
trace "initiating identify" trace "initiating identify", peer=conn
var message = await conn.readLp(64*1024) var message = await conn.readLp(64*1024)
if len(message) == 0: if len(message) == 0:
trace "identify: Invalid or empty message received!" trace "identify: Invalid or empty message received!"

View File

@ -53,7 +53,8 @@ type
gossip*: Table[string, seq[ControlIHave]] # pending gossip gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache mcache*: MCache # messages cache
heartbeatCancel*: Future[void] # cancellation future for heartbeat interval heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
@ -211,9 +212,8 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while true: while g.heartbeatRunning:
try: try:
await g.heartbeatLock.acquire()
trace "running heartbeat" trace "running heartbeat"
for t in toSeq(g.topics.keys): for t in toSeq(g.topics.keys):
@ -230,9 +230,10 @@ proc heartbeat(g: GossipSub) {.async.} =
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
except CatchableError as exc: except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
# sleep less in the case of an error
# but still throttle
await sleepAsync(100.millis)
continue continue
finally:
g.heartbeatLock.release()
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
@ -497,21 +498,33 @@ method publish*(g: GossipSub,
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
debug "gossipsub start"
## start pubsub ## start pubsub
## start long running/repeating procedures ## start long running/repeating procedures
# interlock start to to avoid overlapping to stops
await g.heartbeatLock.acquire()
# setup the heartbeat interval # setup the heartbeat interval
g.heartbeatCancel = g.heartbeat() g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat()
g.heartbeatLock.release()
method stop*(g: GossipSub) {.async.} = method stop*(g: GossipSub) {.async.} =
## stopt pubsub debug "gossipsub stop"
## stop pubsub
## stop long running tasks ## stop long running tasks
await g.heartbeatLock.acquire() await g.heartbeatLock.acquire()
# stop heartbeat interval # stop heartbeat interval
if not g.heartbeatCancel.finished: g.heartbeatRunning = false
g.heartbeatCancel.complete() if not g.heartbeatFut.finished:
debug "awaiting last heartbeat"
await g.heartbeatFut
g.heartbeatLock.release() g.heartbeatLock.release()

View File

@ -285,27 +285,6 @@ proc sendHSMessage(sconn: Connection; buf: seq[byte]) {.async.} =
outbuf &= buf outbuf &= buf
await sconn.write(outbuf) await sconn.write(outbuf)
proc packNoisePayload(payload: openarray[byte]): seq[byte] =
result &= payload.len.uint16.toBytesBE
result &= payload
if result.len > uint16.high.int:
raise newException(NoiseOversizedPayloadError, "Trying to send an unsupported oversized payload over Noise")
trace "packed noise payload", size = payload.len
proc unpackNoisePayload(payload: var seq[byte]) =
let
besize = payload[0..1]
size = uint16.fromBytesBE(besize).int
if size != (payload.len - 2):
raise newException(NoiseOversizedPayloadError, "Received a wrong payload size")
payload = payload[2..^1]
trace "unpacked noise payload", size = payload.len
proc handshakeXXOutbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Future[HandshakeResult] {.async.} = proc handshakeXXOutbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Future[HandshakeResult] {.async.} =
const initiator = true const initiator = true
@ -336,8 +315,7 @@ proc handshakeXXOutbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Fut
read_s() read_s()
dh_es() dh_es()
var remoteP2psecret = hs.ss.decryptAndHash(msg) let remoteP2psecret = hs.ss.decryptAndHash(msg)
unpackNoisePayload(remoteP2psecret)
# -> s, se # -> s, se
@ -347,8 +325,7 @@ proc handshakeXXOutbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Fut
dh_se() dh_se()
# last payload must follow the ecrypted way of sending # last payload must follow the ecrypted way of sending
var packed = packNoisePayload(p2psecret) msg &= hs.ss.encryptAndHash(p2psecret)
msg &= hs.ss.encryptAndHash(packed)
await conn.sendHSMessage(msg) await conn.sendHSMessage(msg)
@ -384,8 +361,7 @@ proc handshakeXXInbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Futu
write_s() write_s()
dh_es() dh_es()
var packedSecret = packNoisePayload(p2psecret) msg &= hs.ss.encryptAndHash(p2psecret)
msg &= hs.ss.encryptAndHash(packedSecret)
await conn.sendHSMessage(msg) await conn.sendHSMessage(msg)
@ -396,8 +372,7 @@ proc handshakeXXInbound(p: Noise, conn: Connection, p2pProof: ProtoBuffer): Futu
read_s() read_s()
dh_se() dh_se()
var remoteP2psecret = hs.ss.decryptAndHash(msg) let remoteP2psecret = hs.ss.decryptAndHash(msg)
unpackNoisePayload(remoteP2psecret)
let (cs1, cs2) = hs.ss.split() let (cs1, cs2) = hs.ss.split()
return HandshakeResult(cs1: cs1, cs2: cs2, remoteP2psecret: remoteP2psecret, rs: hs.rs) return HandshakeResult(cs1: cs1, cs2: cs2, remoteP2psecret: remoteP2psecret, rs: hs.rs)
@ -411,9 +386,7 @@ method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} =
if size > 0: if size > 0:
var buffer = newSeq[byte](size) var buffer = newSeq[byte](size)
await sconn.stream.readExactly(addr buffer[0], buffer.len) await sconn.stream.readExactly(addr buffer[0], buffer.len)
var plain = sconn.readCs.decryptWithAd([], buffer) return sconn.readCs.decryptWithAd([], buffer)
unpackNoisePayload(plain)
return plain
else: else:
trace "Received 0-length message", conn = $sconn trace "Received 0-length message", conn = $sconn
@ -427,8 +400,7 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
while left > 0: while left > 0:
let let
chunkSize = if left > MaxPlainSize: MaxPlainSize else: left chunkSize = if left > MaxPlainSize: MaxPlainSize else: left
packed = packNoisePayload(message.toOpenArray(offset, offset + chunkSize - 1)) cipher = sconn.writeCs.encryptWithAd([], message.toOpenArray(offset, offset + chunkSize - 1))
cipher = sconn.writeCs.encryptWithAd([], packed)
left = left - chunkSize left = left - chunkSize
offset = offset + chunkSize offset = offset + chunkSize
var var
@ -440,8 +412,8 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
outbuf &= cipher outbuf &= cipher
await sconn.stream.write(outbuf) await sconn.stream.write(outbuf)
method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[SecureConn] {.async.} = method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} =
trace "Starting Noise handshake", initiator debug "Starting Noise handshake", initiator, peer=conn
# https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages # https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages
let let
@ -450,9 +422,8 @@ method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[S
var var
libp2pProof = initProtoBuffer() libp2pProof = initProtoBuffer()
libp2pProof.write(initProtoField(1, p.localPublicKey.getBytes.tryGet()))
libp2pProof.write(initProtoField(1, p.localPublicKey)) libp2pProof.write(initProtoField(2, signedPayload.getBytes()))
libp2pProof.write(initProtoField(2, signedPayload))
# data field also there but not used! # data field also there but not used!
libp2pProof.finish() libp2pProof.finish()
@ -465,23 +436,35 @@ method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[S
var var
remoteProof = initProtoBuffer(handshakeRes.remoteP2psecret) remoteProof = initProtoBuffer(handshakeRes.remoteP2psecret)
remotePubKey: PublicKey remotePubKey: PublicKey
remotePubKeyBytes: seq[byte]
remoteSig: Signature remoteSig: Signature
if remoteProof.getValue(1, remotePubKey) <= 0: remoteSigBytes: seq[byte]
raise newException(NoiseHandshakeError, "Failed to deserialize remote public key.")
if remoteProof.getValue(2, remoteSig) <= 0: if remoteProof.getLengthValue(1, remotePubKeyBytes) <= 0:
raise newException(NoiseHandshakeError, "Failed to deserialize remote public key.") raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
if remoteProof.getLengthValue(2, remoteSigBytes) <= 0:
raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
if not remotePubKey.init(remotePubKeyBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
if not remoteSig.init(remoteSigBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes
if not remoteSig.verify(verifyPayload, remotePubKey): if not remoteSig.verify(verifyPayload, remotePubKey):
raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.") raise newException(NoiseHandshakeError, "Noise handshake signature verify failed.")
else: else:
trace "Remote signature verified" debug "Remote signature verified", peer=conn
if initiator and not isNil(conn.peerInfo): if initiator and not isNil(conn.peerInfo):
let pid = PeerID.init(remotePubKey) let pid = PeerID.init(remotePubKey)
if not conn.peerInfo.peerId.validate(): if not conn.peerInfo.peerId.validate():
raise newException(NoiseHandshakeError, "Failed to validate peerId.") raise newException(NoiseHandshakeError, "Failed to validate peerId.")
if pid != conn.peerInfo.peerId: if pid != conn.peerInfo.peerId:
var
failedKey: PublicKey
discard extractPublicKey(conn.peerInfo.peerId, failedKey)
debug "Noise handshake, peer infos don't match!", initiator, dealt_peer=conn.peerInfo.peerId, dealt_key=failedKey, received_peer=pid, received_key=remotePubKey
raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId) raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId)
var secure = new NoiseConnection var secure = new NoiseConnection
@ -498,7 +481,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[S
secure.readCs = handshakeRes.cs1 secure.readCs = handshakeRes.cs1
secure.writeCs = handshakeRes.cs2 secure.writeCs = handshakeRes.cs2
trace "Noise handshake completed!" debug "Noise handshake completed!", initiator, peer=secure.peerInfo
return secure return secure
@ -511,7 +494,7 @@ proc newNoise*(privateKey: PrivateKey; outgoing: bool = true; commonPrologue: se
result.outgoing = outgoing result.outgoing = outgoing
result.localPrivateKey = privateKey result.localPrivateKey = privateKey
result.localPublicKey = privateKey.getKey().tryGet() result.localPublicKey = privateKey.getKey().tryGet()
discard randomBytes(result.noisePrivateKey) result.noisePrivateKey = Curve25519Key.random().tryGet()
result.noisePublicKey = result.noisePrivateKey.public() result.noisePublicKey = result.noisePrivateKey.public()
result.commonPrologue = commonPrologue result.commonPrologue = commonPrologue
result.init() result.init()

View File

@ -14,6 +14,9 @@ import ../protocol,
../../stream/connection, ../../stream/connection,
../../peerinfo ../../peerinfo
logScope:
topics = "secure"
type type
Secure* = ref object of LPProtocol # base type for secure managers Secure* = ref object of LPProtocol # base type for secure managers

View File

@ -28,9 +28,9 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
triggerSelf = false, triggerSelf = false,
gossip = false, gossip = false,
secureManagers: openarray[SecureProtocol] = [ secureManagers: openarray[SecureProtocol] = [
# TODO investigate why we're getting fewer peers on public testnets with noise # array cos order matters
SecureProtocol.Secio, SecureProtocol.Secio,
SecureProtocol.Noise, # array cos order matters SecureProtocol.Noise,
], ],
verifySignature = libp2p_pubsub_verify, verifySignature = libp2p_pubsub_verify,
sign = libp2p_pubsub_sign, sign = libp2p_pubsub_sign,

View File

@ -63,10 +63,15 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
raise newException(CatchableError, "No secure managers registered!") raise newException(CatchableError, "No secure managers registered!")
let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec))
if manager.len == 0: if manager.len == 0 or manager == "na":
raise newException(CatchableError, "Unable to negotiate a secure channel!") raise newException(CatchableError, "Unable to negotiate a secure channel!")
result = await s.secureManagers.filterIt(it.codec == manager)[0].secure(conn, true) trace "securing connection", codec=manager
let secureProtocol = s.secureManagers.filterIt(it.codec == manager)
# ms.select should deal with the correctness of this
# let's avoid duplicating checks but detect if it fails to do it properly
doAssert(secureProtocol.len > 0)
result = await secureProtocol[0].secure(conn, true)
proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
## identify the connection ## identify the connection
@ -107,7 +112,7 @@ proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
## mux incoming connection ## mux incoming connection
trace "muxing connection" trace "muxing connection", peer=conn
let muxers = toSeq(s.muxers.keys) let muxers = toSeq(s.muxers.keys)
if muxers.len == 0: if muxers.len == 0:
warn "no muxers registered, skipping upgrade flow" warn "no muxers registered, skipping upgrade flow"
@ -115,10 +120,14 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
let muxerName = await s.ms.select(conn, muxers) let muxerName = await s.ms.select(conn, muxers)
if muxerName.len == 0 or muxerName == "na": if muxerName.len == 0 or muxerName == "na":
debug "no muxer available, early exit", peer=conn
return return
# create new muxer for connection # create new muxer for connection
let muxer = s.muxers[muxerName].newMuxer(conn) let muxer = s.muxers[muxerName].newMuxer(conn)
trace "found a muxer", name=muxerName, peer=conn
# install stream handler # install stream handler
muxer.streamHandler = s.streamHandler muxer.streamHandler = s.streamHandler

View File

@ -232,7 +232,7 @@ suite "FloodSub":
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch() nodes.add newStandardSwitch(secureManagers = [SecureProtocol.Noise])
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
@ -285,7 +285,7 @@ suite "FloodSub":
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true) nodes.add newStandardSwitch(triggerSelf = true, secureManagers = [SecureProtocol.Secio])
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]

View File

@ -170,7 +170,7 @@ suite "GossipSub":
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2: for i in 0..<2:
nodes.add newStandardSwitch(gossip = true) nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Noise])
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for node in nodes: for node in nodes:
@ -203,7 +203,7 @@ suite "GossipSub":
var nodes: seq[Switch] = newSeq[Switch]() var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2: for i in 0..<2:
nodes.add newStandardSwitch(gossip = true) nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Secio])
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for node in nodes: for node in nodes:
@ -327,7 +327,7 @@ suite "GossipSub":
var runs = 10 var runs = 10
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true) nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Noise])
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
await subscribeRandom(nodes) await subscribeRandom(nodes)
@ -376,7 +376,7 @@ suite "GossipSub":
var runs = 10 var runs = 10
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, gossip = true) nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start())) awaitters.add((await nodes[i].start()))
await subscribeSparseNodes(nodes, 4) await subscribeSparseNodes(nodes, 4)

View File

@ -25,7 +25,6 @@ import ../libp2p/[daemon/daemonapi,
transports/transport, transports/transport,
transports/tcptransport, transports/tcptransport,
protocols/secure/secure, protocols/secure/secure,
protocols/secure/secio,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/gossipsub, protocols/pubsub/gossipsub,
protocols/pubsub/floodsub] protocols/pubsub/floodsub]
@ -72,7 +71,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(gossip = gossip) let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Noise])
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
@ -123,7 +122,7 @@ proc testPubSubNodePublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(gossip = gossip) let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Secio])
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
@ -176,7 +175,7 @@ suite "Interop":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var protos = @["/test-stream"] var protos = @["/test-stream"]
let nativeNode = newStandardSwitch() let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
@ -228,7 +227,7 @@ suite "Interop":
var expect = newString(len(buffer) - 2) var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let nativeNode = newStandardSwitch() let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
@ -274,7 +273,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch() let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
@ -313,7 +312,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch() let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
@ -361,7 +360,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch() let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()

View File

@ -276,8 +276,8 @@ suite "Multistream select":
await transport1.close() await transport1.close()
await allFuturesThrowing( await allFuturesThrowing(
handlerWait1.wait(5000.millis), handlerWait1.wait(30.seconds),
handlerWait2.wait(5000.millis)) handlerWait2.wait(30.seconds))
check: check:
waitFor(endToEnd()) == true waitFor(endToEnd()) == true

View File

@ -53,10 +53,10 @@ suite "Switch":
testProto.codec = TestCodec testProto.codec = TestCodec
testProto.handler = handle testProto.handler = handle
let switch1 = newStandardSwitch() let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
switch1.mount(testProto) switch1.mount(testProto)
let switch2 = newStandardSwitch() let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
@ -93,10 +93,10 @@ suite "Switch":
testProto.codec = TestCodec testProto.codec = TestCodec
testProto.handler = handle testProto.handler = handle
let switch1 = newStandardSwitch() let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
switch1.mount(testProto) switch1.mount(testProto)
let switch2 = newStandardSwitch() let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
@ -151,10 +151,10 @@ suite "Switch":
testProto.codec = TestCodec testProto.codec = TestCodec
testProto.handler = handle testProto.handler = handle
let switch1 = newStandardSwitch() let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
switch1.mount(testProto) switch1.mount(testProto)
let switch2 = newStandardSwitch() let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
@ -185,8 +185,8 @@ suite "Switch":
proc testSwitch() {.async, gcsafe.} = proc testSwitch() {.async, gcsafe.} =
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch() let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
let switch2 = newStandardSwitch() let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())