Unstable (#695)
This commit is contained in:
commit
cba3ca3c3e
|
@ -9,78 +9,49 @@ on:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
timeout-minutes: 90
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
max-parallel: 20
|
|
||||||
matrix:
|
matrix:
|
||||||
target:
|
target:
|
||||||
# Unit tests
|
|
||||||
- os: linux
|
- os: linux
|
||||||
cpu: amd64
|
cpu: amd64
|
||||||
- os: linux
|
- os: linux
|
||||||
cpu: i386
|
cpu: i386
|
||||||
- os: macos
|
- os: macos
|
||||||
cpu: amd64
|
cpu: amd64
|
||||||
- os: windows
|
|
||||||
cpu: i386
|
|
||||||
- os: windows
|
- os: windows
|
||||||
cpu: amd64
|
cpu: amd64
|
||||||
|
#- os: windows
|
||||||
|
#cpu: i386
|
||||||
|
branch: [version-1-2, devel]
|
||||||
include:
|
include:
|
||||||
- target:
|
- target:
|
||||||
os: linux
|
os: linux
|
||||||
builder: ubuntu-20.04
|
builder: ubuntu-20.04
|
||||||
|
shell: bash
|
||||||
- target:
|
- target:
|
||||||
os: macos
|
os: macos
|
||||||
builder: macos-10.15
|
builder: macos-10.15
|
||||||
|
shell: bash
|
||||||
- target:
|
- target:
|
||||||
os: windows
|
os: windows
|
||||||
builder: windows-2019
|
builder: windows-2019
|
||||||
|
shell: msys2 {0}
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
run:
|
run:
|
||||||
shell: bash
|
shell: ${{ matrix.shell }}
|
||||||
|
|
||||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}'
|
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
||||||
runs-on: ${{ matrix.builder }}
|
runs-on: ${{ matrix.builder }}
|
||||||
|
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout nim-libp2p
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
submodules: true
|
submodules: true
|
||||||
|
|
||||||
- name: Derive environment variables
|
|
||||||
run: |
|
|
||||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
|
||||||
ARCH=64
|
|
||||||
PLATFORM=x64
|
|
||||||
else
|
|
||||||
ARCH=32
|
|
||||||
PLATFORM=x86
|
|
||||||
fi
|
|
||||||
echo "ARCH=$ARCH" >> $GITHUB_ENV
|
|
||||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
ncpu=
|
|
||||||
ext=
|
|
||||||
MAKE_CMD="make"
|
|
||||||
case '${{ runner.os }}' in
|
|
||||||
'Linux')
|
|
||||||
ncpu=$(nproc)
|
|
||||||
;;
|
|
||||||
'macOS')
|
|
||||||
ncpu=$(sysctl -n hw.ncpu)
|
|
||||||
;;
|
|
||||||
'Windows')
|
|
||||||
ncpu=$NUMBER_OF_PROCESSORS
|
|
||||||
ext=.exe
|
|
||||||
MAKE_CMD="mingw32-make"
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
|
||||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
|
||||||
echo "ext=$ext" >> $GITHUB_ENV
|
|
||||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
- name: Install build dependencies (Linux i386)
|
- name: Install build dependencies (Linux i386)
|
||||||
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
||||||
run: |
|
run: |
|
||||||
|
@ -101,68 +72,83 @@ jobs:
|
||||||
chmod 755 external/bin/gcc external/bin/g++
|
chmod 755 external/bin/gcc external/bin/g++
|
||||||
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
||||||
|
|
||||||
- name: Restore MinGW-W64 (Windows) from cache
|
- name: MSYS2 (Windows i386)
|
||||||
if: runner.os == 'Windows'
|
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
|
||||||
id: windows-mingw-cache
|
uses: msys2/setup-msys2@v2
|
||||||
uses: actions/cache@v2
|
|
||||||
with:
|
with:
|
||||||
path: external/mingw-${{ matrix.target.cpu }}
|
path-type: inherit
|
||||||
key: 'mingw-${{ matrix.target.cpu }}'
|
msystem: MINGW32
|
||||||
|
install: >-
|
||||||
|
base-devel
|
||||||
|
git
|
||||||
|
mingw-w64-i686-toolchain
|
||||||
|
|
||||||
|
- name: MSYS2 (Windows amd64)
|
||||||
|
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
|
||||||
|
uses: msys2/setup-msys2@v2
|
||||||
|
with:
|
||||||
|
path-type: inherit
|
||||||
|
install: >-
|
||||||
|
base-devel
|
||||||
|
git
|
||||||
|
mingw-w64-x86_64-toolchain
|
||||||
|
|
||||||
- name: Restore Nim DLLs dependencies (Windows) from cache
|
- name: Restore Nim DLLs dependencies (Windows) from cache
|
||||||
if: runner.os == 'Windows'
|
if: runner.os == 'Windows'
|
||||||
id: windows-dlls-cache
|
id: windows-dlls-cache
|
||||||
uses: actions/cache@v2
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: external/dlls-${{ matrix.target.cpu }}
|
path: external/dlls
|
||||||
key: 'dlls-${{ matrix.target.cpu }}'
|
key: 'dlls'
|
||||||
|
|
||||||
- name: Install MinGW64 dependency (Windows)
|
- name: Install DLL dependencies (Windows)
|
||||||
if: >
|
|
||||||
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
|
|
||||||
runner.os == 'Windows'
|
|
||||||
run: |
|
|
||||||
mkdir -p external
|
|
||||||
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
|
|
||||||
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
|
|
||||||
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
|
|
||||||
|
|
||||||
- name: Install DLLs dependencies (Windows)
|
|
||||||
if: >
|
if: >
|
||||||
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
||||||
runner.os == 'Windows'
|
runner.os == 'Windows'
|
||||||
run: |
|
run: |
|
||||||
mkdir -p external
|
mkdir external
|
||||||
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
||||||
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
|
7z x external/windeps.zip -oexternal/dlls
|
||||||
|
|
||||||
- name: Path to cached dependencies (Windows)
|
- name: Path to cached dependencies (Windows)
|
||||||
if: >
|
if: >
|
||||||
runner.os == 'Windows'
|
runner.os == 'Windows'
|
||||||
run: |
|
run: |
|
||||||
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
|
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
|
||||||
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
|
|
||||||
|
|
||||||
- name: Get latest Nim commit hash
|
- name: Derive environment variables
|
||||||
id: versions
|
|
||||||
run: |
|
run: |
|
||||||
getHash() {
|
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||||
git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1
|
PLATFORM=x64
|
||||||
}
|
else
|
||||||
nbsHash=$(getHash status-im/nimbus-build-system)
|
PLATFORM=x86
|
||||||
echo "::set-output name=nimbus_build_system::$nbsHash"
|
fi
|
||||||
|
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||||
|
|
||||||
- name: Restore prebuilt Nim from cache
|
ncpu=
|
||||||
id: nim-cache
|
MAKE_CMD="make"
|
||||||
uses: actions/cache@v2
|
case '${{ runner.os }}' in
|
||||||
with:
|
'Linux')
|
||||||
path: NimBinaries
|
ncpu=$(nproc)
|
||||||
key: 'NimBinaries-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}'
|
;;
|
||||||
|
'macOS')
|
||||||
|
ncpu=$(sysctl -n hw.ncpu)
|
||||||
|
;;
|
||||||
|
'Windows')
|
||||||
|
ncpu=$NUMBER_OF_PROCESSORS
|
||||||
|
MAKE_CMD="mingw32-make"
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||||
|
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||||
|
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||||
|
|
||||||
- name: Build Nim and associated tools
|
- name: Build Nim and Nimble
|
||||||
run: |
|
run: |
|
||||||
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
||||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} CC=gcc bash build_nim.sh nim csources dist/nimble NimBinaries
|
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \
|
||||||
|
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
|
||||||
|
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||||
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
||||||
|
|
||||||
- name: Setup Go
|
- name: Setup Go
|
||||||
|
@ -174,8 +160,14 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||||
|
|
||||||
- name: Run nim-libp2p tests
|
- name: Run tests
|
||||||
run: |
|
run: |
|
||||||
|
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
|
||||||
|
# https://github.com/status-im/nimbus-eth2/issues/3121
|
||||||
|
export NIMFLAGS="-d:nimRawSetjmp"
|
||||||
|
fi
|
||||||
|
nim --version
|
||||||
|
nimble --version
|
||||||
nimble install_pinned
|
nimble install_pinned
|
||||||
nimble test
|
nimble test
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ on:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
timeout-minutes: 120
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
|
@ -16,69 +17,39 @@ jobs:
|
||||||
cpu: i386
|
cpu: i386
|
||||||
- os: macos
|
- os: macos
|
||||||
cpu: amd64
|
cpu: amd64
|
||||||
#- os: windows
|
|
||||||
#cpu: i386
|
|
||||||
- os: windows
|
- os: windows
|
||||||
cpu: amd64
|
cpu: amd64
|
||||||
|
#- os: windows
|
||||||
|
#cpu: i386
|
||||||
branch: [version-1-2, version-1-4, version-1-6, devel]
|
branch: [version-1-2, version-1-4, version-1-6, devel]
|
||||||
include:
|
include:
|
||||||
- target:
|
- target:
|
||||||
os: linux
|
os: linux
|
||||||
builder: ubuntu-20.04
|
builder: ubuntu-20.04
|
||||||
|
shell: bash
|
||||||
- target:
|
- target:
|
||||||
os: macos
|
os: macos
|
||||||
builder: macos-10.15
|
builder: macos-10.15
|
||||||
|
shell: bash
|
||||||
- target:
|
- target:
|
||||||
os: windows
|
os: windows
|
||||||
builder: windows-2019
|
builder: windows-2019
|
||||||
|
shell: msys2 {0}
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
run:
|
run:
|
||||||
shell: bash
|
shell: ${{ matrix.shell }}
|
||||||
|
|
||||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
||||||
runs-on: ${{ matrix.builder }}
|
runs-on: ${{ matrix.builder }}
|
||||||
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
|
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout nim-libp2p
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
ref: master
|
ref: unstable
|
||||||
submodules: true
|
submodules: true
|
||||||
|
|
||||||
- name: Derive environment variables
|
|
||||||
run: |
|
|
||||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
|
||||||
ARCH=64
|
|
||||||
PLATFORM=x64
|
|
||||||
else
|
|
||||||
ARCH=32
|
|
||||||
PLATFORM=x86
|
|
||||||
fi
|
|
||||||
echo "ARCH=$ARCH" >> $GITHUB_ENV
|
|
||||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
ncpu=
|
|
||||||
ext=
|
|
||||||
MAKE_CMD="make"
|
|
||||||
case '${{ runner.os }}' in
|
|
||||||
'Linux')
|
|
||||||
ncpu=$(nproc)
|
|
||||||
;;
|
|
||||||
'macOS')
|
|
||||||
ncpu=$(sysctl -n hw.ncpu)
|
|
||||||
;;
|
|
||||||
'Windows')
|
|
||||||
ncpu=$NUMBER_OF_PROCESSORS
|
|
||||||
ext=.exe
|
|
||||||
MAKE_CMD="mingw32-make"
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
|
||||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
|
||||||
echo "ext=$ext" >> $GITHUB_ENV
|
|
||||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
- name: Install build dependencies (Linux i386)
|
- name: Install build dependencies (Linux i386)
|
||||||
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
||||||
run: |
|
run: |
|
||||||
|
@ -99,47 +70,76 @@ jobs:
|
||||||
chmod 755 external/bin/gcc external/bin/g++
|
chmod 755 external/bin/gcc external/bin/g++
|
||||||
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
||||||
|
|
||||||
- name: Restore MinGW-W64 (Windows) from cache
|
- name: MSYS2 (Windows i386)
|
||||||
if: runner.os == 'Windows'
|
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
|
||||||
id: windows-mingw-cache
|
uses: msys2/setup-msys2@v2
|
||||||
uses: actions/cache@v2
|
|
||||||
with:
|
with:
|
||||||
path: external/mingw-${{ matrix.target.cpu }}
|
path-type: inherit
|
||||||
key: 'mingw-${{ matrix.target.cpu }}'
|
msystem: MINGW32
|
||||||
|
install: >-
|
||||||
|
base-devel
|
||||||
|
git
|
||||||
|
mingw-w64-i686-toolchain
|
||||||
|
|
||||||
|
- name: MSYS2 (Windows amd64)
|
||||||
|
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
|
||||||
|
uses: msys2/setup-msys2@v2
|
||||||
|
with:
|
||||||
|
path-type: inherit
|
||||||
|
install: >-
|
||||||
|
base-devel
|
||||||
|
git
|
||||||
|
mingw-w64-x86_64-toolchain
|
||||||
|
|
||||||
- name: Restore Nim DLLs dependencies (Windows) from cache
|
- name: Restore Nim DLLs dependencies (Windows) from cache
|
||||||
if: runner.os == 'Windows'
|
if: runner.os == 'Windows'
|
||||||
id: windows-dlls-cache
|
id: windows-dlls-cache
|
||||||
uses: actions/cache@v2
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: external/dlls-${{ matrix.target.cpu }}
|
path: external/dlls
|
||||||
key: 'dlls-${{ matrix.target.cpu }}'
|
key: 'dlls'
|
||||||
|
|
||||||
- name: Install MinGW64 dependency (Windows)
|
- name: Install DLL dependencies (Windows)
|
||||||
if: >
|
|
||||||
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
|
|
||||||
runner.os == 'Windows'
|
|
||||||
run: |
|
|
||||||
mkdir -p external
|
|
||||||
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
|
|
||||||
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
|
|
||||||
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
|
|
||||||
|
|
||||||
- name: Install DLLs dependencies (Windows)
|
|
||||||
if: >
|
if: >
|
||||||
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
||||||
runner.os == 'Windows'
|
runner.os == 'Windows'
|
||||||
run: |
|
run: |
|
||||||
mkdir -p external
|
mkdir external
|
||||||
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
||||||
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
|
7z x external/windeps.zip -oexternal/dlls
|
||||||
|
|
||||||
- name: Path to cached dependencies (Windows)
|
- name: Path to cached dependencies (Windows)
|
||||||
if: >
|
if: >
|
||||||
runner.os == 'Windows'
|
runner.os == 'Windows'
|
||||||
run: |
|
run: |
|
||||||
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
|
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
|
||||||
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
|
|
||||||
|
- name: Derive environment variables
|
||||||
|
run: |
|
||||||
|
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||||
|
PLATFORM=x64
|
||||||
|
else
|
||||||
|
PLATFORM=x86
|
||||||
|
fi
|
||||||
|
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||||
|
|
||||||
|
ncpu=
|
||||||
|
MAKE_CMD="make"
|
||||||
|
case '${{ runner.os }}' in
|
||||||
|
'Linux')
|
||||||
|
ncpu=$(nproc)
|
||||||
|
;;
|
||||||
|
'macOS')
|
||||||
|
ncpu=$(sysctl -n hw.ncpu)
|
||||||
|
;;
|
||||||
|
'Windows')
|
||||||
|
ncpu=$NUMBER_OF_PROCESSORS
|
||||||
|
MAKE_CMD="mingw32-make"
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||||
|
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||||
|
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||||
|
|
||||||
- name: Build Nim and Nimble
|
- name: Build Nim and Nimble
|
||||||
run: |
|
run: |
|
||||||
|
@ -149,12 +149,27 @@ jobs:
|
||||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||||
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
||||||
|
|
||||||
- name: Run nim-libp2p tests
|
- name: Setup Go
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: '^1.15.5'
|
||||||
|
|
||||||
|
- name: Install p2pd
|
||||||
run: |
|
run: |
|
||||||
|
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: |
|
||||||
|
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
|
||||||
|
# https://github.com/status-im/nimbus-eth2/issues/3121
|
||||||
|
export NIMFLAGS="-d:nimRawSetjmp"
|
||||||
|
fi
|
||||||
|
nim --version
|
||||||
|
nimble --version
|
||||||
nimble install -y --depsOnly
|
nimble install -y --depsOnly
|
||||||
nimble test_slim
|
nimble test
|
||||||
if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then
|
if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then
|
||||||
echo -e "\nTesting with '--gc:orc':\n"
|
echo -e "\nTesting with '--gc:orc':\n"
|
||||||
export NIMFLAGS="--gc:orc"
|
export NIMFLAGS="${NIMFLAGS} --gc:orc"
|
||||||
nimble test_slim
|
nimble test
|
||||||
fi
|
fi
|
||||||
|
|
22
.pinned
22
.pinned
|
@ -1,17 +1,17 @@
|
||||||
asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798
|
asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798
|
||||||
bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7
|
bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7
|
||||||
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
|
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
|
||||||
chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e
|
chronos;https://github.com/status-im/nim-chronos@#87197230779002a2bfa8642f0e2ae07e2349e304
|
||||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#536cc6b7933e5f86590bb27083c0ffeab31255f9
|
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be
|
||||||
faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33
|
faststreams;https://github.com/status-im/nim-faststreams@#37a183153c071539ab870f427c09a1376ba311b9
|
||||||
httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775
|
httputils;https://github.com/status-im/nim-http-utils@#40048e8b3e69284bdb5d4daa0a16ad93402c55db
|
||||||
json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841
|
json_serialization;https://github.com/status-im/nim-json-serialization@#4b8f487d2dfdd941df7408ceaa70b174cce02180
|
||||||
metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286
|
metrics;https://github.com/status-im/nim-metrics@#71e0f0e354e1f4c59e3dc92153989c8b723c3440
|
||||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
|
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
|
||||||
secp256k1;https://github.com/status-im/nim-secp256k1@#d790c42206fab4b8008eaa91181ca8c8c68a0105
|
secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13
|
||||||
serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24
|
serialization;https://github.com/status-im/nim-serialization@#37bc0db558d85711967acb16e9bb822b06911d46
|
||||||
stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d39da2
|
stew;https://github.com/status-im/nim-stew@#bb705bf17b46d2c8f9bfb106d9cc7437009a2501
|
||||||
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
|
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
|
||||||
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
|
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
|
||||||
websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542
|
websock;https://github.com/status-im/nim-websock@#73edde4417f7b45003113b7a34212c3ccd95b9fd
|
||||||
zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd
|
zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2
|
|
@ -162,7 +162,10 @@ proc main() {.async.} =
|
||||||
stdinReader = fromPipe(rfd)
|
stdinReader = fromPipe(rfd)
|
||||||
|
|
||||||
var thread: Thread[AsyncFD]
|
var thread: Thread[AsyncFD]
|
||||||
thread.createThread(readInput, wfd)
|
try:
|
||||||
|
thread.createThread(readInput, wfd)
|
||||||
|
except Exception as exc:
|
||||||
|
quit("Failed to create thread: " & exc.msg)
|
||||||
|
|
||||||
var localAddress = MultiAddress.init(DefaultAddr).tryGet()
|
var localAddress = MultiAddress.init(DefaultAddr).tryGet()
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
|
||||||
|
|
||||||
requires "nim >= 1.2.0",
|
requires "nim >= 1.2.0",
|
||||||
"nimcrypto >= 0.4.1",
|
"nimcrypto >= 0.4.1",
|
||||||
"https://github.com/ba0f3/dnsclient.nim == 0.1.0",
|
"dnsclient >= 0.1.2",
|
||||||
"bearssl >= 0.1.4",
|
"bearssl >= 0.1.4",
|
||||||
"chronicles >= 0.10.2",
|
"chronicles >= 0.10.2",
|
||||||
"chronos >= 3.0.6",
|
"chronos >= 3.0.6",
|
||||||
|
@ -18,11 +18,18 @@ requires "nim >= 1.2.0",
|
||||||
"stew#head",
|
"stew#head",
|
||||||
"websock"
|
"websock"
|
||||||
|
|
||||||
|
const nimflags =
|
||||||
|
"--verbosity:0 --hints:off " &
|
||||||
|
"--warning[CaseTransition]:off --warning[ObservableStores]:off " &
|
||||||
|
"--warning[LockLevel]:off " &
|
||||||
|
"-d:chronosStrictException " &
|
||||||
|
"--styleCheck:usages --styleCheck:hint "
|
||||||
|
|
||||||
proc runTest(filename: string, verify: bool = true, sign: bool = true,
|
proc runTest(filename: string, verify: bool = true, sign: bool = true,
|
||||||
moreoptions: string = "") =
|
moreoptions: string = "") =
|
||||||
let env_nimflags = getEnv("NIMFLAGS")
|
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics "
|
||||||
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics --verbosity:0 --hints:off --styleCheck:usages --styleCheck:hint " & env_nimflags
|
excstr.add(" " & getEnv("NIMFLAGS") & " ")
|
||||||
excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
|
excstr.add(" " & nimflags & " ")
|
||||||
excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
|
excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
|
||||||
excstr.add(" -d:libp2p_pubsub_verify=" & $verify)
|
excstr.add(" -d:libp2p_pubsub_verify=" & $verify)
|
||||||
excstr.add(" " & moreoptions & " ")
|
excstr.add(" " & moreoptions & " ")
|
||||||
|
@ -34,8 +41,8 @@ proc runTest(filename: string, verify: bool = true, sign: bool = true,
|
||||||
rmFile "tests/" & filename.toExe
|
rmFile "tests/" & filename.toExe
|
||||||
|
|
||||||
proc buildSample(filename: string, run = false) =
|
proc buildSample(filename: string, run = false) =
|
||||||
var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off "
|
var excstr = "nim c --opt:speed --threads:on -d:debug "
|
||||||
excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
|
excstr.add(" " & nimflags & " ")
|
||||||
excstr.add(" examples/" & filename)
|
excstr.add(" examples/" & filename)
|
||||||
exec excstr
|
exec excstr
|
||||||
if run:
|
if run:
|
||||||
|
@ -44,7 +51,7 @@ proc buildSample(filename: string, run = false) =
|
||||||
|
|
||||||
proc buildTutorial(filename: string) =
|
proc buildTutorial(filename: string) =
|
||||||
discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " &
|
discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " &
|
||||||
" nim --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off c -"
|
" nim " & nimflags & " c -"
|
||||||
|
|
||||||
task testnative, "Runs libp2p native tests":
|
task testnative, "Runs libp2p native tests":
|
||||||
runTest("testnative")
|
runTest("testnative")
|
||||||
|
@ -83,7 +90,7 @@ task test, "Runs the test suite":
|
||||||
exec "nimble testfilter"
|
exec "nimble testfilter"
|
||||||
exec "nimble examples_build"
|
exec "nimble examples_build"
|
||||||
|
|
||||||
task test_slim, "Runs the test suite":
|
task test_slim, "Runs the (slimmed down) test suite":
|
||||||
exec "nimble testnative"
|
exec "nimble testnative"
|
||||||
exec "nimble testpubsub_slim"
|
exec "nimble testpubsub_slim"
|
||||||
exec "nimble testfilter"
|
exec "nimble testfilter"
|
||||||
|
|
|
@ -25,7 +25,7 @@ declareGauge(libp2p_peers, "total connected peers")
|
||||||
|
|
||||||
const
|
const
|
||||||
MaxConnections* = 50
|
MaxConnections* = 50
|
||||||
MaxConnectionsPerPeer* = 5
|
MaxConnectionsPerPeer* = 1
|
||||||
|
|
||||||
type
|
type
|
||||||
TooManyConnectionsError* = object of LPError
|
TooManyConnectionsError* = object of LPError
|
||||||
|
@ -65,7 +65,7 @@ type
|
||||||
discard
|
discard
|
||||||
|
|
||||||
PeerEventHandler* =
|
PeerEventHandler* =
|
||||||
proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.}
|
proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
MuxerHolder = object
|
MuxerHolder = object
|
||||||
muxer: Muxer
|
muxer: Muxer
|
||||||
|
@ -452,7 +452,8 @@ proc trackIncomingConn*(c: ConnManager,
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
proc trackOutgoingConn*(c: ConnManager,
|
proc trackOutgoingConn*(c: ConnManager,
|
||||||
provider: ConnProvider):
|
provider: ConnProvider,
|
||||||
|
forceDial = false):
|
||||||
Future[Connection] {.async.} =
|
Future[Connection] {.async.} =
|
||||||
## try acquiring a connection if all slots
|
## try acquiring a connection if all slots
|
||||||
## are already taken, raise TooManyConnectionsError
|
## are already taken, raise TooManyConnectionsError
|
||||||
|
@ -462,7 +463,9 @@ proc trackOutgoingConn*(c: ConnManager,
|
||||||
trace "Tracking outgoing connection", count = c.outSema.count,
|
trace "Tracking outgoing connection", count = c.outSema.count,
|
||||||
max = c.outSema.size
|
max = c.outSema.size
|
||||||
|
|
||||||
if not c.outSema.tryAcquire():
|
if forceDial:
|
||||||
|
c.outSema.forceAcquire()
|
||||||
|
elif not c.outSema.tryAcquire():
|
||||||
trace "Too many outgoing connections!", count = c.outSema.count,
|
trace "Too many outgoing connections!", count = c.outSema.count,
|
||||||
max = c.outSema.size
|
max = c.outSema.size
|
||||||
raise newTooManyConnectionsError()
|
raise newTooManyConnectionsError()
|
||||||
|
|
|
@ -150,10 +150,10 @@ type
|
||||||
key*: PublicKey
|
key*: PublicKey
|
||||||
|
|
||||||
P2PStreamCallback* = proc(api: DaemonAPI,
|
P2PStreamCallback* = proc(api: DaemonAPI,
|
||||||
stream: P2PStream): Future[void] {.gcsafe.}
|
stream: P2PStream): Future[void] {.gcsafe, raises: [Defect, CatchableError].}
|
||||||
P2PPubSubCallback* = proc(api: DaemonAPI,
|
P2PPubSubCallback* = proc(api: DaemonAPI,
|
||||||
ticket: PubsubTicket,
|
ticket: PubsubTicket,
|
||||||
message: PubSubMessage): Future[bool] {.gcsafe.}
|
message: PubSubMessage): Future[bool] {.gcsafe, raises: [Defect, CatchableError].}
|
||||||
|
|
||||||
DaemonError* = object of LPError
|
DaemonError* = object of LPError
|
||||||
DaemonRemoteError* = object of DaemonError
|
DaemonRemoteError* = object of DaemonError
|
||||||
|
@ -755,7 +755,13 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
|
||||||
|
|
||||||
# Starting daemon process
|
# Starting daemon process
|
||||||
# echo "Starting ", cmd, " ", args.join(" ")
|
# echo "Starting ", cmd, " ", args.join(" ")
|
||||||
api.process = startProcess(cmd, "", args, env, {poParentStreams})
|
api.process =
|
||||||
|
try:
|
||||||
|
startProcess(cmd, "", args, env, {poParentStreams})
|
||||||
|
except CatchableError as exc:
|
||||||
|
raise exc
|
||||||
|
except Exception as exc:
|
||||||
|
raiseAssert exc.msg
|
||||||
# Waiting until daemon will not be bound to control socket.
|
# Waiting until daemon will not be bound to control socket.
|
||||||
while true:
|
while true:
|
||||||
if not api.process.running():
|
if not api.process.running():
|
||||||
|
@ -900,7 +906,7 @@ proc openStream*(api: DaemonAPI, peer: PeerId,
|
||||||
stream.flags.incl(Outbound)
|
stream.flags.incl(Outbound)
|
||||||
stream.transp = transp
|
stream.transp = transp
|
||||||
result = stream
|
result = stream
|
||||||
except Exception as exc:
|
except CatchableError as exc:
|
||||||
await api.closeConnection(transp)
|
await api.closeConnection(transp)
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
@ -936,7 +942,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string],
|
||||||
protocols))
|
protocols))
|
||||||
pb.withMessage() do:
|
pb.withMessage() do:
|
||||||
api.servers.add(P2PServer(server: server, address: maddress))
|
api.servers.add(P2PServer(server: server, address: maddress))
|
||||||
except Exception as exc:
|
except CatchableError as exc:
|
||||||
for item in protocols:
|
for item in protocols:
|
||||||
api.handlers.del(item)
|
api.handlers.del(item)
|
||||||
server.stop()
|
server.stop()
|
||||||
|
@ -1301,7 +1307,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
|
||||||
ticket.transp = transp
|
ticket.transp = transp
|
||||||
asyncSpawn pubsubLoop(api, ticket)
|
asyncSpawn pubsubLoop(api, ticket)
|
||||||
result = ticket
|
result = ticket
|
||||||
except Exception as exc:
|
except CatchableError as exc:
|
||||||
await api.closeConnection(transp)
|
await api.closeConnection(transp)
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,8 @@ type
|
||||||
method connect*(
|
method connect*(
|
||||||
self: Dial,
|
self: Dial,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress]) {.async, base.} =
|
addrs: seq[MultiAddress],
|
||||||
|
forceDial = false) {.async, base.} =
|
||||||
## connect remote peer without negotiating
|
## connect remote peer without negotiating
|
||||||
## a protocol
|
## a protocol
|
||||||
##
|
##
|
||||||
|
@ -29,7 +30,8 @@ method connect*(
|
||||||
method dial*(
|
method dial*(
|
||||||
self: Dial,
|
self: Dial,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
protos: seq[string],
|
||||||
|
): Future[Connection] {.async, base.} =
|
||||||
## create a protocol stream over an
|
## create a protocol stream over an
|
||||||
## existing connection
|
## existing connection
|
||||||
##
|
##
|
||||||
|
@ -40,7 +42,8 @@ method dial*(
|
||||||
self: Dial,
|
self: Dial,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
protos: seq[string],
|
||||||
|
forceDial = false): Future[Connection] {.async, base.} =
|
||||||
## create a protocol stream and establish
|
## create a protocol stream and establish
|
||||||
## a connection if one doesn't exist already
|
## a connection if one doesn't exist already
|
||||||
##
|
##
|
||||||
|
|
|
@ -47,7 +47,8 @@ type
|
||||||
proc dialAndUpgrade(
|
proc dialAndUpgrade(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress]):
|
addrs: seq[MultiAddress],
|
||||||
|
forceDial: bool):
|
||||||
Future[Connection] {.async.} =
|
Future[Connection] {.async.} =
|
||||||
debug "Dialing peer", peerId
|
debug "Dialing peer", peerId
|
||||||
|
|
||||||
|
@ -72,7 +73,8 @@ proc dialAndUpgrade(
|
||||||
transportCopy = transport
|
transportCopy = transport
|
||||||
addressCopy = a
|
addressCopy = a
|
||||||
await self.connManager.trackOutgoingConn(
|
await self.connManager.trackOutgoingConn(
|
||||||
() => transportCopy.dial(hostname, addressCopy)
|
() => transportCopy.dial(hostname, addressCopy),
|
||||||
|
forceDial
|
||||||
)
|
)
|
||||||
except TooManyConnectionsError as exc:
|
except TooManyConnectionsError as exc:
|
||||||
trace "Connection limit reached!"
|
trace "Connection limit reached!"
|
||||||
|
@ -112,7 +114,8 @@ proc dialAndUpgrade(
|
||||||
proc internalConnect(
|
proc internalConnect(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress]):
|
addrs: seq[MultiAddress],
|
||||||
|
forceDial: bool):
|
||||||
Future[Connection] {.async.} =
|
Future[Connection] {.async.} =
|
||||||
if self.localPeerId == peerId:
|
if self.localPeerId == peerId:
|
||||||
raise newException(CatchableError, "can't dial self!")
|
raise newException(CatchableError, "can't dial self!")
|
||||||
|
@ -136,7 +139,7 @@ proc internalConnect(
|
||||||
trace "Reusing existing connection", conn, direction = $conn.dir
|
trace "Reusing existing connection", conn, direction = $conn.dir
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
conn = await self.dialAndUpgrade(peerId, addrs)
|
conn = await self.dialAndUpgrade(peerId, addrs, forceDial)
|
||||||
if isNil(conn): # None of the addresses connected
|
if isNil(conn): # None of the addresses connected
|
||||||
raise newException(DialFailedError, "Unable to establish outgoing link")
|
raise newException(DialFailedError, "Unable to establish outgoing link")
|
||||||
|
|
||||||
|
@ -159,7 +162,8 @@ proc internalConnect(
|
||||||
method connect*(
|
method connect*(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress]) {.async.} =
|
addrs: seq[MultiAddress],
|
||||||
|
forceDial = false) {.async.} =
|
||||||
## connect remote peer without negotiating
|
## connect remote peer without negotiating
|
||||||
## a protocol
|
## a protocol
|
||||||
##
|
##
|
||||||
|
@ -167,7 +171,7 @@ method connect*(
|
||||||
if self.connManager.connCount(peerId) > 0:
|
if self.connManager.connCount(peerId) > 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
discard await self.internalConnect(peerId, addrs)
|
discard await self.internalConnect(peerId, addrs, forceDial)
|
||||||
|
|
||||||
proc negotiateStream(
|
proc negotiateStream(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
|
@ -200,7 +204,8 @@ method dial*(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
protos: seq[string]): Future[Connection] {.async.} =
|
protos: seq[string],
|
||||||
|
forceDial = false): Future[Connection] {.async.} =
|
||||||
## create a protocol stream and establish
|
## create a protocol stream and establish
|
||||||
## a connection if one doesn't exist already
|
## a connection if one doesn't exist already
|
||||||
##
|
##
|
||||||
|
@ -218,7 +223,7 @@ method dial*(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trace "Dialing (new)", peerId, protos
|
trace "Dialing (new)", peerId, protos
|
||||||
conn = await self.internalConnect(peerId, addrs)
|
conn = await self.internalConnect(peerId, addrs, forceDial)
|
||||||
trace "Opening stream", conn
|
trace "Opening stream", conn
|
||||||
stream = await self.connManager.getStream(conn)
|
stream = await self.connManager.getStream(conn)
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
|
||||||
for fut in args:
|
for fut in args:
|
||||||
futs &= fut
|
futs &= fut
|
||||||
proc call() {.async.} =
|
proc call() {.async.} =
|
||||||
var first: ref Exception = nil
|
var first: ref CatchableError = nil
|
||||||
futs = await allFinished(futs)
|
futs = await allFinished(futs)
|
||||||
for fut in futs:
|
for fut in futs:
|
||||||
if fut.failed:
|
if fut.failed:
|
||||||
|
|
|
@ -78,7 +78,12 @@ proc getDnsResponse(
|
||||||
dataStream = newStringStream()
|
dataStream = newStringStream()
|
||||||
dataStream.writeData(addr rawResponse[0], rawResponse.len)
|
dataStream.writeData(addr rawResponse[0], rawResponse.len)
|
||||||
dataStream.setPosition(0)
|
dataStream.setPosition(0)
|
||||||
|
# parseResponse can has a raises: [Exception, ..] because of
|
||||||
|
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||||
|
# it can't actually raise though
|
||||||
return parseResponse(dataStream)
|
return parseResponse(dataStream)
|
||||||
|
except CatchableError as exc: raise exc
|
||||||
|
except Exception as exc: raiseAssert exc.msg
|
||||||
finally:
|
finally:
|
||||||
await sock.closeWait()
|
await sock.closeWait()
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import options
|
import std/[sequtils, options, strutils]
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import ../protobuf/minprotobuf,
|
import ../protobuf/minprotobuf,
|
||||||
../peerinfo,
|
../peerinfo,
|
||||||
|
@ -107,11 +107,14 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
|
||||||
iinfo.protoVersion = some(protoVersion)
|
iinfo.protoVersion = some(protoVersion)
|
||||||
if r6.get():
|
if r6.get():
|
||||||
iinfo.agentVersion = some(agentVersion)
|
iinfo.agentVersion = some(agentVersion)
|
||||||
debug "decodeMsg: decoded message", pubkey = ($pubkey).shortLog,
|
debug "decodeMsg: decoded identify", pubkey = ($pubkey).shortLog,
|
||||||
addresses = $iinfo.addrs, protocols = $iinfo.protos,
|
addresses = iinfo.addrs.mapIt($it).join(","),
|
||||||
observable_address = $iinfo.observedAddr,
|
protocols = iinfo.protos.mapIt($it).join(","),
|
||||||
proto_version = $iinfo.protoVersion,
|
observable_address =
|
||||||
agent_version = $iinfo.agentVersion
|
if iinfo.observedAddr.isSome(): $iinfo.observedAddr.get()
|
||||||
|
else: "None",
|
||||||
|
proto_version = iinfo.protoVersion.get("None"),
|
||||||
|
agent_version = iinfo.agentVersion.get("None")
|
||||||
some(iinfo)
|
some(iinfo)
|
||||||
else:
|
else:
|
||||||
trace "decodeMsg: failed to decode received message"
|
trace "decodeMsg: failed to decode received message"
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import chronos, chronicles, bearssl
|
import chronos, chronicles, bearssl
|
||||||
import ../protobuf/minprotobuf,
|
import ../protobuf/minprotobuf,
|
||||||
../peerinfo,
|
../peerinfo,
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
# this module will be further extended in PR
|
||||||
|
# https://github.com/status-im/nim-libp2p/pull/107/
|
||||||
|
|
||||||
|
type
|
||||||
|
ValidationResult* {.pure.} = enum
|
||||||
|
Accept, Reject, Ignore
|
|
@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub,
|
||||||
f.handleSubscribe(peer, sub.topic, sub.subscribe)
|
f.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||||
|
|
||||||
for msg in rpcMsg.messages: # for every message
|
for msg in rpcMsg.messages: # for every message
|
||||||
let msgId = f.msgIdProvider(msg)
|
let msgIdResult = f.msgIdProvider(msg)
|
||||||
|
if msgIdResult.isErr:
|
||||||
|
debug "Dropping message due to failed message id generation",
|
||||||
|
error = msgIdResult.error
|
||||||
|
# TODO: descore peers due to error during message validation (malicious?)
|
||||||
|
continue
|
||||||
|
|
||||||
|
let msgId = msgIdResult.get
|
||||||
|
|
||||||
if f.addSeen(msgId):
|
if f.addSeen(msgId):
|
||||||
trace "Dropping already-seen message", msgId, peer
|
trace "Dropping already-seen message", msgId, peer
|
||||||
|
@ -184,7 +191,14 @@ method publish*(f: FloodSub,
|
||||||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||||
else:
|
else:
|
||||||
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
|
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
|
||||||
msgId = f.msgIdProvider(msg)
|
msgIdResult = f.msgIdProvider(msg)
|
||||||
|
|
||||||
|
if msgIdResult.isErr:
|
||||||
|
trace "Error generating message id, skipping publish",
|
||||||
|
error = msgIdResult.error
|
||||||
|
return 0
|
||||||
|
|
||||||
|
let msgId = msgIdResult.get
|
||||||
|
|
||||||
trace "Created new message",
|
trace "Created new message",
|
||||||
msg = shortLog(msg), peers = peers.len, topic, msgId
|
msg = shortLog(msg), peers = peers.len, topic, msgId
|
||||||
|
|
|
@ -186,16 +186,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
||||||
if s[].len == 0:
|
if s[].len == 0:
|
||||||
g.peersInIP.del(pubSubPeer.address.get())
|
g.peersInIP.del(pubSubPeer.address.get())
|
||||||
|
|
||||||
for t in toSeq(g.gossipsub.keys):
|
|
||||||
g.gossipsub.removePeer(t, pubSubPeer)
|
|
||||||
# also try to remove from explicit table here
|
|
||||||
g.explicit.removePeer(t, pubSubPeer)
|
|
||||||
|
|
||||||
for t in toSeq(g.mesh.keys):
|
for t in toSeq(g.mesh.keys):
|
||||||
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
|
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
|
||||||
g.pruned(pubSubPeer, t)
|
g.pruned(pubSubPeer, t)
|
||||||
g.mesh.removePeer(t, pubSubPeer)
|
g.mesh.removePeer(t, pubSubPeer)
|
||||||
|
|
||||||
|
for t in toSeq(g.gossipsub.keys):
|
||||||
|
g.gossipsub.removePeer(t, pubSubPeer)
|
||||||
|
# also try to remove from explicit table here
|
||||||
|
g.explicit.removePeer(t, pubSubPeer)
|
||||||
|
|
||||||
for t in toSeq(g.fanout.keys):
|
for t in toSeq(g.fanout.keys):
|
||||||
g.fanout.removePeer(t, pubSubPeer)
|
g.fanout.removePeer(t, pubSubPeer)
|
||||||
|
|
||||||
|
@ -237,9 +237,14 @@ proc handleSubscribe*(g: GossipSub,
|
||||||
else:
|
else:
|
||||||
trace "peer unsubscribed from topic"
|
trace "peer unsubscribed from topic"
|
||||||
|
|
||||||
|
if g.mesh.hasPeer(topic, peer):
|
||||||
|
#against spec
|
||||||
|
g.mesh.removePeer(topic, peer)
|
||||||
|
g.pruned(peer, topic)
|
||||||
|
|
||||||
# unsubscribe remote peer from the topic
|
# unsubscribe remote peer from the topic
|
||||||
g.gossipsub.removePeer(topic, peer)
|
g.gossipsub.removePeer(topic, peer)
|
||||||
g.mesh.removePeer(topic, peer)
|
|
||||||
g.fanout.removePeer(topic, peer)
|
g.fanout.removePeer(topic, peer)
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
g.explicit.removePeer(topic, peer)
|
g.explicit.removePeer(topic, peer)
|
||||||
|
@ -282,6 +287,64 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
peer,
|
peer,
|
||||||
RPCMsg(control: some(respControl), messages: messages))
|
RPCMsg(control: some(respControl), messages: messages))
|
||||||
|
|
||||||
|
proc validateAndRelay(g: GossipSub,
|
||||||
|
msg: Message,
|
||||||
|
msgId, msgIdSalted: MessageId,
|
||||||
|
peer: PubSubPeer) {.async.} =
|
||||||
|
try:
|
||||||
|
let validation = await g.validate(msg)
|
||||||
|
|
||||||
|
var seenPeers: HashSet[PubSubPeer]
|
||||||
|
discard g.validationSeen.pop(msgIdSalted, seenPeers)
|
||||||
|
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||||
|
|
||||||
|
case validation
|
||||||
|
of ValidationResult.Reject:
|
||||||
|
debug "Dropping message after validation, reason: reject",
|
||||||
|
msgId = shortLog(msgId), peer
|
||||||
|
g.punishInvalidMessage(peer, msg.topicIDs)
|
||||||
|
return
|
||||||
|
of ValidationResult.Ignore:
|
||||||
|
debug "Dropping message after validation, reason: ignore",
|
||||||
|
msgId = shortLog(msgId), peer
|
||||||
|
return
|
||||||
|
of ValidationResult.Accept:
|
||||||
|
discard
|
||||||
|
|
||||||
|
# store in cache only after validation
|
||||||
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
|
g.rewardDelivered(peer, msg.topicIDs, true)
|
||||||
|
|
||||||
|
var toSendPeers = HashSet[PubSubPeer]()
|
||||||
|
for t in msg.topicIDs: # for every topic in the message
|
||||||
|
if t notin g.topics:
|
||||||
|
continue
|
||||||
|
|
||||||
|
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
||||||
|
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
||||||
|
|
||||||
|
# Don't send it to source peer, or peers that
|
||||||
|
# sent it during validation
|
||||||
|
toSendPeers.excl(peer)
|
||||||
|
toSendPeers.excl(seenPeers)
|
||||||
|
|
||||||
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
|
# also have to be careful to only include validated messages
|
||||||
|
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||||
|
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
|
||||||
|
for topic in msg.topicIDs:
|
||||||
|
if topic notin g.topics: continue
|
||||||
|
|
||||||
|
if g.knownTopics.contains(topic):
|
||||||
|
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
|
||||||
|
else:
|
||||||
|
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
|
||||||
|
|
||||||
|
await handleData(g, topic, msg.data)
|
||||||
|
except CatchableError as exc:
|
||||||
|
info "validateAndRelay failed", msg=exc.msg
|
||||||
|
|
||||||
method rpcHandler*(g: GossipSub,
|
method rpcHandler*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
rpcMsg: RPCMsg) {.async.} =
|
rpcMsg: RPCMsg) {.async.} =
|
||||||
|
@ -299,8 +362,16 @@ method rpcHandler*(g: GossipSub,
|
||||||
|
|
||||||
for i in 0..<rpcMsg.messages.len(): # for every message
|
for i in 0..<rpcMsg.messages.len(): # for every message
|
||||||
template msg: untyped = rpcMsg.messages[i]
|
template msg: untyped = rpcMsg.messages[i]
|
||||||
|
let msgIdResult = g.msgIdProvider(msg)
|
||||||
|
|
||||||
|
if msgIdResult.isErr:
|
||||||
|
debug "Dropping message due to failed message id generation",
|
||||||
|
error = msgIdResult.error
|
||||||
|
# TODO: descore peers due to error during message validation (malicious?)
|
||||||
|
continue
|
||||||
|
|
||||||
let
|
let
|
||||||
msgId = g.msgIdProvider(msg)
|
msgId = msgIdResult.get
|
||||||
msgIdSalted = msgId & g.seenSalt
|
msgIdSalted = msgId & g.seenSalt
|
||||||
|
|
||||||
# addSeen adds salt to msgId to avoid
|
# addSeen adds salt to msgId to avoid
|
||||||
|
@ -343,54 +414,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
# (eg, pop everything you put in it)
|
# (eg, pop everything you put in it)
|
||||||
g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]()
|
g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]()
|
||||||
|
|
||||||
let validation = await g.validate(msg)
|
asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer)
|
||||||
|
|
||||||
var seenPeers: HashSet[PubSubPeer]
|
|
||||||
discard g.validationSeen.pop(msgIdSalted, seenPeers)
|
|
||||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
|
||||||
|
|
||||||
case validation
|
|
||||||
of ValidationResult.Reject:
|
|
||||||
debug "Dropping message after validation, reason: reject",
|
|
||||||
msgId = shortLog(msgId), peer
|
|
||||||
g.punishInvalidMessage(peer, msg.topicIDs)
|
|
||||||
continue
|
|
||||||
of ValidationResult.Ignore:
|
|
||||||
debug "Dropping message after validation, reason: ignore",
|
|
||||||
msgId = shortLog(msgId), peer
|
|
||||||
continue
|
|
||||||
of ValidationResult.Accept:
|
|
||||||
discard
|
|
||||||
|
|
||||||
# store in cache only after validation
|
|
||||||
g.mcache.put(msgId, msg)
|
|
||||||
|
|
||||||
g.rewardDelivered(peer, msg.topicIDs, true)
|
|
||||||
|
|
||||||
var toSendPeers = HashSet[PubSubPeer]()
|
|
||||||
for t in msg.topicIDs: # for every topic in the message
|
|
||||||
if t notin g.topics:
|
|
||||||
continue
|
|
||||||
|
|
||||||
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
|
||||||
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
|
||||||
|
|
||||||
await handleData(g, t, msg.data)
|
|
||||||
|
|
||||||
# Don't send it to source peer, or peers that
|
|
||||||
# sent it during validation
|
|
||||||
toSendPeers.excl(peer)
|
|
||||||
toSendPeers.excl(seenPeers)
|
|
||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
|
||||||
# also have to be careful to only include validated messages
|
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
|
||||||
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
|
|
||||||
for topic in msg.topicIDs:
|
|
||||||
if g.knownTopics.contains(topic):
|
|
||||||
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
|
|
||||||
else:
|
|
||||||
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
|
|
||||||
|
|
||||||
if rpcMsg.control.isSome():
|
if rpcMsg.control.isSome():
|
||||||
g.handleControl(peer, rpcMsg.control.unsafeGet())
|
g.handleControl(peer, rpcMsg.control.unsafeGet())
|
||||||
|
@ -458,13 +482,23 @@ method publish*(g: GossipSub,
|
||||||
|
|
||||||
if topic in g.topics: # if we're subscribed use the mesh
|
if topic in g.topics: # if we're subscribed use the mesh
|
||||||
peers.incl(g.mesh.getOrDefault(topic))
|
peers.incl(g.mesh.getOrDefault(topic))
|
||||||
else: # not subscribed, send to fanout peers
|
|
||||||
# try optimistically
|
if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
|
||||||
peers.incl(g.fanout.getOrDefault(topic))
|
# not subscribed or bad mesh, send to fanout peers
|
||||||
if peers.len == 0:
|
# disable for floodPublish, since we already sent to every good peer
|
||||||
# ok we had nothing.. let's try replenish inline
|
#
|
||||||
|
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||||
|
if fanoutPeers.len == 0:
|
||||||
g.replenishFanout(topic)
|
g.replenishFanout(topic)
|
||||||
peers.incl(g.fanout.getOrDefault(topic))
|
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||||
|
|
||||||
|
g.rng.shuffle(fanoutPeers)
|
||||||
|
if fanoutPeers.len + peers.len > g.parameters.d:
|
||||||
|
fanoutPeers.setLen(g.parameters.d - peers.len)
|
||||||
|
|
||||||
|
for fanPeer in fanoutPeers:
|
||||||
|
peers.incl(fanPeer)
|
||||||
|
if peers.len > g.parameters.d: break
|
||||||
|
|
||||||
# even if we couldn't publish,
|
# even if we couldn't publish,
|
||||||
# we still attempted to publish
|
# we still attempted to publish
|
||||||
|
@ -489,7 +523,15 @@ method publish*(g: GossipSub,
|
||||||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||||
else:
|
else:
|
||||||
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
||||||
msgId = g.msgIdProvider(msg)
|
msgIdResult = g.msgIdProvider(msg)
|
||||||
|
|
||||||
|
if msgIdResult.isErr:
|
||||||
|
trace "Error generating message id, skipping publish",
|
||||||
|
error = msgIdResult.error
|
||||||
|
libp2p_gossipsub_failed_publish.inc()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
let msgId = msgIdResult.get
|
||||||
|
|
||||||
logScope: msgId = shortLog(msgId)
|
logScope: msgId = shortLog(msgId)
|
||||||
|
|
||||||
|
|
|
@ -489,9 +489,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} =
|
||||||
logScope: topic
|
logScope: topic
|
||||||
trace "about to replenish fanout"
|
trace "about to replenish fanout"
|
||||||
|
|
||||||
|
let currentMesh = g.mesh.getOrDefault(topic)
|
||||||
if g.fanout.peers(topic) < g.parameters.dLow:
|
if g.fanout.peers(topic) < g.parameters.dLow:
|
||||||
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
||||||
for peer in g.gossipsub.getOrDefault(topic):
|
for peer in g.gossipsub.getOrDefault(topic):
|
||||||
|
if peer in currentMesh: continue
|
||||||
if g.fanout.addPeer(topic, peer):
|
if g.fanout.addPeer(topic, peer):
|
||||||
if g.fanout.peers(topic) == g.parameters.d:
|
if g.fanout.peers(topic) == g.parameters.d:
|
||||||
break
|
break
|
||||||
|
|
|
@ -11,7 +11,8 @@
|
||||||
|
|
||||||
import std/[tables, sequtils, sets, strutils]
|
import std/[tables, sequtils, sets, strutils]
|
||||||
import chronos, chronicles, metrics, bearssl
|
import chronos, chronicles, metrics, bearssl
|
||||||
import ./pubsubpeer,
|
import ./errors as pubsub_errors,
|
||||||
|
./pubsubpeer,
|
||||||
./rpc/[message, messages, protobuf],
|
./rpc/[message, messages, protobuf],
|
||||||
../../switch,
|
../../switch,
|
||||||
../protocol,
|
../protocol,
|
||||||
|
@ -29,6 +30,7 @@ export results
|
||||||
export PubSubPeer
|
export PubSubPeer
|
||||||
export PubSubObserver
|
export PubSubObserver
|
||||||
export protocol
|
export protocol
|
||||||
|
export pubsub_errors
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p pubsub"
|
topics = "libp2p pubsub"
|
||||||
|
@ -76,16 +78,13 @@ type
|
||||||
TopicHandler* = proc(topic: string,
|
TopicHandler* = proc(topic: string,
|
||||||
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
ValidationResult* {.pure.} = enum
|
|
||||||
Accept, Reject, Ignore
|
|
||||||
|
|
||||||
ValidatorHandler* = proc(topic: string,
|
ValidatorHandler* = proc(topic: string,
|
||||||
message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||||
|
|
||||||
MsgIdProvider* =
|
MsgIdProvider* =
|
||||||
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.}
|
proc(m: Message): Result[MessageID, ValidationResult] {.noSideEffect, raises: [Defect], gcsafe.}
|
||||||
|
|
||||||
SubscriptionValidator* =
|
SubscriptionValidator* =
|
||||||
proc(topic: string): bool {.raises: [Defect], gcsafe.}
|
proc(topic: string): bool {.raises: [Defect], gcsafe.}
|
||||||
|
@ -452,6 +451,11 @@ proc subscribe*(p: PubSub,
|
||||||
## on every received message
|
## on every received message
|
||||||
##
|
##
|
||||||
|
|
||||||
|
# Check that this is an allowed topic
|
||||||
|
if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false:
|
||||||
|
warn "Trying to subscribe to a topic not passing validation!", topic
|
||||||
|
return
|
||||||
|
|
||||||
p.topics.withValue(topic, handlers) do:
|
p.topics.withValue(topic, handlers) do:
|
||||||
# Already subscribed, just adding another handler
|
# Already subscribed, just adding another handler
|
||||||
handlers[].add(handler)
|
handlers[].add(handler)
|
||||||
|
|
|
@ -16,9 +16,10 @@ import ./messages,
|
||||||
../../../peerid,
|
../../../peerid,
|
||||||
../../../peerinfo,
|
../../../peerinfo,
|
||||||
../../../crypto/crypto,
|
../../../crypto/crypto,
|
||||||
../../../protobuf/minprotobuf
|
../../../protobuf/minprotobuf,
|
||||||
|
../../../protocols/pubsub/errors
|
||||||
|
|
||||||
export messages
|
export errors, messages
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "pubsubmessage"
|
topics = "pubsubmessage"
|
||||||
|
@ -28,16 +29,12 @@ const PubSubPrefix = toBytes("libp2p-pubsub:")
|
||||||
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
||||||
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
||||||
|
|
||||||
func defaultMsgIdProvider*(m: Message): MessageID =
|
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||||
let mid =
|
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
let mid = byteutils.toHex(m.seqno) & $m.fromPeer
|
||||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
ok mid.toBytes()
|
||||||
else:
|
else:
|
||||||
# This part is irrelevant because it's not standard,
|
err ValidationResult.Reject
|
||||||
# We use it exclusively for testing basically and users should
|
|
||||||
# implement their own logic in the case they use anonymization
|
|
||||||
$m.data.hash & $m.topicIDs.hash
|
|
||||||
mid.toBytes()
|
|
||||||
|
|
||||||
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
||||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
||||||
|
|
|
@ -99,8 +99,9 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe.} =
|
||||||
method connect*(
|
method connect*(
|
||||||
s: Switch,
|
s: Switch,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress]): Future[void] =
|
addrs: seq[MultiAddress],
|
||||||
s.dialer.connect(peerId, addrs)
|
forceDial = false): Future[void] =
|
||||||
|
s.dialer.connect(peerId, addrs, forceDial)
|
||||||
|
|
||||||
method dial*(
|
method dial*(
|
||||||
s: Switch,
|
s: Switch,
|
||||||
|
@ -117,8 +118,9 @@ method dial*(
|
||||||
s: Switch,
|
s: Switch,
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
protos: seq[string]): Future[Connection] =
|
protos: seq[string],
|
||||||
s.dialer.dial(peerId, addrs, protos)
|
forceDial = false): Future[Connection] =
|
||||||
|
s.dialer.dial(peerId, addrs, protos, forceDial)
|
||||||
|
|
||||||
proc dial*(
|
proc dial*(
|
||||||
s: Switch,
|
s: Switch,
|
||||||
|
|
|
@ -225,7 +225,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
||||||
debug "Server was closed", exc = exc.msg
|
debug "Server was closed", exc = exc.msg
|
||||||
raise newTransportClosedError(exc)
|
raise newTransportClosedError(exc)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
debug "Unexpected error accepting connection", exc = exc.msg
|
debug "Unexpected error accepting connection", exc = exc.msg
|
||||||
raise exc
|
raise exc
|
||||||
|
|
|
@ -49,11 +49,24 @@ proc new*(T: type WsStream,
|
||||||
stream.initStream()
|
stream.initStream()
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
|
template mapExceptions(body: untyped) =
|
||||||
|
try:
|
||||||
|
body
|
||||||
|
except AsyncStreamIncompleteError:
|
||||||
|
raise newLPStreamEOFError()
|
||||||
|
except AsyncStreamUseClosedError:
|
||||||
|
raise newLPStreamEOFError()
|
||||||
|
except WSClosedError:
|
||||||
|
raise newLPStreamEOFError()
|
||||||
|
except AsyncStreamLimitError:
|
||||||
|
raise newLPStreamLimitError()
|
||||||
|
|
||||||
method readOnce*(
|
method readOnce*(
|
||||||
s: WsStream,
|
s: WsStream,
|
||||||
pbytes: pointer,
|
pbytes: pointer,
|
||||||
nbytes: int): Future[int] {.async.} =
|
nbytes: int): Future[int] {.async.} =
|
||||||
let res = await s.session.recv(pbytes, nbytes)
|
let res = mapExceptions(await s.session.recv(pbytes, nbytes))
|
||||||
|
|
||||||
if res == 0 and s.session.readyState == ReadyState.Closed:
|
if res == 0 and s.session.readyState == ReadyState.Closed:
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
return res
|
return res
|
||||||
|
@ -61,10 +74,7 @@ method readOnce*(
|
||||||
method write*(
|
method write*(
|
||||||
s: WsStream,
|
s: WsStream,
|
||||||
msg: seq[byte]): Future[void] {.async.} =
|
msg: seq[byte]): Future[void] {.async.} =
|
||||||
try:
|
mapExceptions(await s.session.send(msg, Opcode.Binary))
|
||||||
await s.session.send(msg, Opcode.Binary)
|
|
||||||
except WSClosedError:
|
|
||||||
raise newLPStreamEOFError()
|
|
||||||
|
|
||||||
method closeImpl*(s: WsStream): Future[void] {.async.} =
|
method closeImpl*(s: WsStream): Future[void] {.async.} =
|
||||||
await s.session.close()
|
await s.session.close()
|
||||||
|
|
|
@ -54,16 +54,21 @@ proc acquire*(s: AsyncSemaphore): Future[void] =
|
||||||
fut.cancelCallback = nil
|
fut.cancelCallback = nil
|
||||||
if not fut.finished:
|
if not fut.finished:
|
||||||
s.queue.keepItIf( it != fut )
|
s.queue.keepItIf( it != fut )
|
||||||
s.count.inc
|
|
||||||
|
|
||||||
fut.cancelCallback = cancellation
|
fut.cancelCallback = cancellation
|
||||||
|
|
||||||
s.queue.add(fut)
|
s.queue.add(fut)
|
||||||
s.count.dec
|
|
||||||
|
|
||||||
trace "Queued slot", available = s.count, queue = s.queue.len
|
trace "Queued slot", available = s.count, queue = s.queue.len
|
||||||
return fut
|
return fut
|
||||||
|
|
||||||
|
proc forceAcquire*(s: AsyncSemaphore) =
|
||||||
|
## ForceAcquire will always succeed,
|
||||||
|
## creating a temporary slot if required.
|
||||||
|
## This temporary slot will stay usable until
|
||||||
|
## there is less `acquire`s than `release`s
|
||||||
|
s.count.dec
|
||||||
|
|
||||||
proc release*(s: AsyncSemaphore) =
|
proc release*(s: AsyncSemaphore) =
|
||||||
## Release a resource from the semaphore,
|
## Release a resource from the semaphore,
|
||||||
## by picking the first future from the queue
|
## by picking the first future from the queue
|
||||||
|
@ -77,13 +82,15 @@ proc release*(s: AsyncSemaphore) =
|
||||||
trace "Releasing slot", available = s.count,
|
trace "Releasing slot", available = s.count,
|
||||||
queue = s.queue.len
|
queue = s.queue.len
|
||||||
|
|
||||||
if s.queue.len > 0:
|
s.count.inc
|
||||||
|
while s.queue.len > 0:
|
||||||
var fut = s.queue[0]
|
var fut = s.queue[0]
|
||||||
s.queue.delete(0)
|
s.queue.delete(0)
|
||||||
if not fut.finished():
|
if not fut.finished():
|
||||||
|
s.count.dec
|
||||||
fut.complete()
|
fut.complete()
|
||||||
|
break
|
||||||
|
|
||||||
s.count.inc # increment the resource count
|
|
||||||
trace "Released slot", available = s.count,
|
trace "Released slot", available = s.count,
|
||||||
queue = s.queue.len
|
queue = s.queue.len
|
||||||
return
|
return
|
||||||
|
|
|
@ -22,3 +22,25 @@ template asyncTest*(name: string, body: untyped): untyped =
|
||||||
proc() {.async, gcsafe.} =
|
proc() {.async, gcsafe.} =
|
||||||
body
|
body
|
||||||
)())
|
)())
|
||||||
|
|
||||||
|
template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped =
|
||||||
|
test name:
|
||||||
|
var attemptNumber = 0
|
||||||
|
while attemptNumber < attempts:
|
||||||
|
let isLastAttempt = attemptNumber == attempts - 1
|
||||||
|
inc attemptNumber
|
||||||
|
try:
|
||||||
|
waitFor((
|
||||||
|
proc() {.async, gcsafe.} =
|
||||||
|
body
|
||||||
|
)())
|
||||||
|
except Exception as e:
|
||||||
|
if isLastAttempt: raise e
|
||||||
|
else: testStatusIMPL = TestStatus.FAILED
|
||||||
|
finally:
|
||||||
|
if not isLastAttempt:
|
||||||
|
if testStatusIMPL == TestStatus.FAILED:
|
||||||
|
# Retry
|
||||||
|
testStatusIMPL = TestStatus.OK
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
|
@ -11,7 +11,7 @@ import ../libp2p/[stream/connection,
|
||||||
|
|
||||||
import ./helpers
|
import ./helpers
|
||||||
|
|
||||||
type TransportProvider* = proc(): Transport {.gcsafe.}
|
type TransportProvider* = proc(): Transport {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
suite name & " common tests":
|
suite name & " common tests":
|
||||||
|
@ -137,6 +137,10 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
await transport1.stop()
|
await transport1.stop()
|
||||||
|
|
||||||
asyncTest "e2e should allow multiple local addresses":
|
asyncTest "e2e should allow multiple local addresses":
|
||||||
|
when defined(windows):
|
||||||
|
# this randomly locks the Windows CI job
|
||||||
|
skip()
|
||||||
|
return
|
||||||
let addrs = @[MultiAddress.init(ma).tryGet(),
|
let addrs = @[MultiAddress.init(ma).tryGet(),
|
||||||
MultiAddress.init(ma).tryGet()]
|
MultiAddress.init(ma).tryGet()]
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@ import ../libp2p/protocols/secure/secure
|
||||||
import ./asyncunit
|
import ./asyncunit
|
||||||
export asyncunit
|
export asyncunit
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
const
|
const
|
||||||
StreamTransportTrackerName = "stream.transport"
|
StreamTransportTrackerName = "stream.transport"
|
||||||
StreamServerTrackerName = "stream.server"
|
StreamServerTrackerName = "stream.server"
|
||||||
|
@ -51,7 +53,9 @@ template checkTrackers*() =
|
||||||
checkpoint tracker.dump()
|
checkpoint tracker.dump()
|
||||||
fail()
|
fail()
|
||||||
# Also test the GC is not fooling with us
|
# Also test the GC is not fooling with us
|
||||||
GC_fullCollect()
|
try:
|
||||||
|
GC_fullCollect()
|
||||||
|
except: discard
|
||||||
|
|
||||||
type RngWrap = object
|
type RngWrap = object
|
||||||
rng: ref BrHmacDrbgContext
|
rng: ref BrHmacDrbgContext
|
||||||
|
|
|
@ -20,6 +20,7 @@ import utils,
|
||||||
protocols/pubsub/floodsub,
|
protocols/pubsub/floodsub,
|
||||||
protocols/pubsub/rpc/messages,
|
protocols/pubsub/rpc/messages,
|
||||||
protocols/pubsub/peertable]
|
protocols/pubsub/peertable]
|
||||||
|
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||||
|
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,8 @@ proc randomPeerId(): PeerId =
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
raise newException(Defect, exc.msg)
|
raise newException(Defect, exc.msg)
|
||||||
|
|
||||||
|
const MsgIdSuccess = "msg id gen success"
|
||||||
|
|
||||||
suite "GossipSub internal":
|
suite "GossipSub internal":
|
||||||
teardown:
|
teardown:
|
||||||
checkTrackers()
|
checkTrackers()
|
||||||
|
@ -308,7 +310,7 @@ suite "GossipSub internal":
|
||||||
conn.peerId = peerId
|
conn.peerId = peerId
|
||||||
inc seqno
|
inc seqno
|
||||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||||
|
|
||||||
check gossipSub.fanout[topic].len == 15
|
check gossipSub.fanout[topic].len == 15
|
||||||
check gossipSub.mesh[topic].len == 15
|
check gossipSub.mesh[topic].len == 15
|
||||||
|
@ -355,7 +357,7 @@ suite "GossipSub internal":
|
||||||
conn.peerId = peerId
|
conn.peerId = peerId
|
||||||
inc seqno
|
inc seqno
|
||||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == gossipSub.parameters.d
|
check peers.len == gossipSub.parameters.d
|
||||||
|
@ -396,7 +398,7 @@ suite "GossipSub internal":
|
||||||
conn.peerId = peerId
|
conn.peerId = peerId
|
||||||
inc seqno
|
inc seqno
|
||||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == gossipSub.parameters.d
|
check peers.len == gossipSub.parameters.d
|
||||||
|
@ -437,7 +439,7 @@ suite "GossipSub internal":
|
||||||
conn.peerId = peerId
|
conn.peerId = peerId
|
||||||
inc seqno
|
inc seqno
|
||||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == 0
|
check peers.len == 0
|
||||||
|
|
|
@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors,
|
||||||
protocols/pubsub/peertable,
|
protocols/pubsub/peertable,
|
||||||
protocols/pubsub/timedcache,
|
protocols/pubsub/timedcache,
|
||||||
protocols/pubsub/rpc/messages]
|
protocols/pubsub/rpc/messages]
|
||||||
|
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
|
||||||
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
||||||
|
@ -564,6 +565,72 @@ suite "GossipSub":
|
||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
check observed == 2
|
check observed == 2
|
||||||
|
|
||||||
|
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
|
||||||
|
var passed = newFuture[void]()
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
passed.complete()
|
||||||
|
|
||||||
|
let
|
||||||
|
nodes = generateNodes(
|
||||||
|
2,
|
||||||
|
gossip = true,
|
||||||
|
unsubscribeBackoff = 10.minutes)
|
||||||
|
|
||||||
|
# start switches
|
||||||
|
nodesFut = await allFinished(
|
||||||
|
nodes[0].switch.start(),
|
||||||
|
nodes[1].switch.start(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# start pubsub
|
||||||
|
await allFuturesThrowing(
|
||||||
|
allFinished(
|
||||||
|
nodes[0].start(),
|
||||||
|
nodes[1].start(),
|
||||||
|
))
|
||||||
|
|
||||||
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
|
nodes[1].subscribe("foobar", handler)
|
||||||
|
nodes[0].subscribe("foobar", handler)
|
||||||
|
await waitSub(nodes[0], nodes[1], "foobar")
|
||||||
|
await waitSub(nodes[1], nodes[0], "foobar")
|
||||||
|
|
||||||
|
nodes[0].unsubscribe("foobar", handler)
|
||||||
|
|
||||||
|
let gsNode = GossipSub(nodes[1])
|
||||||
|
check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0)
|
||||||
|
|
||||||
|
nodes[0].subscribe("foobar", handler)
|
||||||
|
|
||||||
|
check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||||
|
|
||||||
|
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
|
||||||
|
|
||||||
|
check:
|
||||||
|
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
|
||||||
|
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||||
|
|
||||||
|
await passed.wait(2.seconds)
|
||||||
|
|
||||||
|
trace "test done, stopping..."
|
||||||
|
|
||||||
|
await nodes[0].stop()
|
||||||
|
await nodes[1].stop()
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes[0].switch.stop(),
|
||||||
|
nodes[1].switch.stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes[0].stop(),
|
||||||
|
nodes[1].stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "e2e - GossipSub send over mesh A -> B":
|
asyncTest "e2e - GossipSub send over mesh A -> B":
|
||||||
var passed: Future[bool] = newFuture[bool]()
|
var passed: Future[bool] = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
|
|
@ -147,6 +147,9 @@ suite "GossipSub":
|
||||||
nodes[1].start(),
|
nodes[1].start(),
|
||||||
))
|
))
|
||||||
|
|
||||||
|
# We must subscribe before setting the validator
|
||||||
|
nodes[0].subscribe("foobar", handler)
|
||||||
|
|
||||||
var gossip = GossipSub(nodes[0])
|
var gossip = GossipSub(nodes[0])
|
||||||
let invalidDetected = newFuture[void]()
|
let invalidDetected = newFuture[void]()
|
||||||
gossip.subscriptionValidator =
|
gossip.subscriptionValidator =
|
||||||
|
@ -162,7 +165,6 @@ suite "GossipSub":
|
||||||
|
|
||||||
await subscribeNodes(nodes)
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", handler)
|
|
||||||
nodes[1].subscribe("foobar", handler)
|
nodes[1].subscribe("foobar", handler)
|
||||||
|
|
||||||
await invalidDetected.wait(10.seconds)
|
await invalidDetected.wait(10.seconds)
|
||||||
|
|
|
@ -5,19 +5,21 @@ import stew/byteutils
|
||||||
import ../../libp2p/[peerid,
|
import ../../libp2p/[peerid,
|
||||||
crypto/crypto,
|
crypto/crypto,
|
||||||
protocols/pubsub/mcache,
|
protocols/pubsub/mcache,
|
||||||
protocols/pubsub/rpc/message,
|
|
||||||
protocols/pubsub/rpc/messages]
|
protocols/pubsub/rpc/messages]
|
||||||
|
import ./utils
|
||||||
|
|
||||||
var rng = newRng()
|
var rng = newRng()
|
||||||
|
|
||||||
proc randomPeerId(): PeerId =
|
proc randomPeerId(): PeerId =
|
||||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
||||||
|
|
||||||
|
const MsgIdGenSuccess = "msg id generation success"
|
||||||
|
|
||||||
suite "MCache":
|
suite "MCache":
|
||||||
test "put/get":
|
test "put/get":
|
||||||
var mCache = MCache.init(3, 5)
|
var mCache = MCache.init(3, 5)
|
||||||
var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes())
|
var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes())
|
||||||
let msgId = defaultMsgIdProvider(msg)
|
let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenSuccess)
|
||||||
mCache.put(msgId, msg)
|
mCache.put(msgId, msg)
|
||||||
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
|
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
|
||||||
|
|
||||||
|
@ -28,13 +30,13 @@ suite "MCache":
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<5:
|
for i in 0..<5:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
var mids = mCache.window("foo")
|
var mids = mCache.window("foo")
|
||||||
check mids.len == 3
|
check mids.len == 3
|
||||||
|
@ -49,7 +51,7 @@ suite "MCache":
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("foo").len == 0
|
check mCache.window("foo").len == 0
|
||||||
|
@ -58,7 +60,7 @@ suite "MCache":
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("bar").len == 0
|
check mCache.window("bar").len == 0
|
||||||
|
@ -67,7 +69,7 @@ suite "MCache":
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["baz"])
|
topicIDs: @["baz"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("baz").len == 0
|
check mCache.window("baz").len == 0
|
||||||
|
@ -79,19 +81,19 @@ suite "MCache":
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var msg = Message(fromPeer: randomPeerId(),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["baz"])
|
topicIDs: @["baz"])
|
||||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("foo").len == 0
|
check mCache.window("foo").len == 0
|
||||||
|
|
|
@ -3,8 +3,10 @@ import unittest2
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import options
|
import options
|
||||||
|
import stew/byteutils
|
||||||
import ../../libp2p/[peerid, peerinfo,
|
import ../../libp2p/[peerid, peerinfo,
|
||||||
crypto/crypto,
|
crypto/crypto,
|
||||||
|
protocols/pubsub/errors,
|
||||||
protocols/pubsub/rpc/message,
|
protocols/pubsub/rpc/message,
|
||||||
protocols/pubsub/rpc/messages]
|
protocols/pubsub/rpc/messages]
|
||||||
|
|
||||||
|
@ -18,3 +20,56 @@ suite "Message":
|
||||||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||||
|
|
||||||
check verify(msg)
|
check verify(msg)
|
||||||
|
|
||||||
|
test "defaultMsgIdProvider success":
|
||||||
|
let
|
||||||
|
seqno = 11'u64
|
||||||
|
pkHex =
|
||||||
|
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||||
|
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||||
|
E731065A"""
|
||||||
|
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||||
|
.expect("valid private key bytes")
|
||||||
|
peer = PeerInfo.new(seckey)
|
||||||
|
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||||
|
msgIdResult = msg.defaultMsgIdProvider()
|
||||||
|
|
||||||
|
check:
|
||||||
|
msgIdResult.isOk
|
||||||
|
string.fromBytes(msgIdResult.get) ==
|
||||||
|
"000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw"
|
||||||
|
|
||||||
|
test "defaultMsgIdProvider error - no source peer id":
|
||||||
|
let
|
||||||
|
seqno = 11'u64
|
||||||
|
pkHex =
|
||||||
|
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||||
|
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||||
|
E731065A"""
|
||||||
|
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||||
|
.expect("valid private key bytes")
|
||||||
|
peer = PeerInfo.new(seckey)
|
||||||
|
|
||||||
|
var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true)
|
||||||
|
msg.fromPeer = PeerId()
|
||||||
|
let msgIdResult = msg.defaultMsgIdProvider()
|
||||||
|
|
||||||
|
check:
|
||||||
|
msgIdResult.isErr
|
||||||
|
msgIdResult.error == ValidationResult.Reject
|
||||||
|
|
||||||
|
test "defaultMsgIdProvider error - no source seqno":
|
||||||
|
let
|
||||||
|
pkHex =
|
||||||
|
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||||
|
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||||
|
E731065A"""
|
||||||
|
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||||
|
.expect("valid private key bytes")
|
||||||
|
peer = PeerInfo.new(seckey)
|
||||||
|
msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true)
|
||||||
|
msgIdResult = msg.defaultMsgIdProvider()
|
||||||
|
|
||||||
|
check:
|
||||||
|
msgIdResult.isErr
|
||||||
|
msgIdResult.error == ValidationResult.Reject
|
||||||
|
|
|
@ -4,29 +4,43 @@ const
|
||||||
libp2p_pubsub_verify {.booldefine.} = true
|
libp2p_pubsub_verify {.booldefine.} = true
|
||||||
libp2p_pubsub_anonymize {.booldefine.} = false
|
libp2p_pubsub_anonymize {.booldefine.} = false
|
||||||
|
|
||||||
import random, tables
|
import hashes, random, tables
|
||||||
import chronos
|
import chronos, stew/[byteutils, results]
|
||||||
import ../../libp2p/[builders,
|
import ../../libp2p/[builders,
|
||||||
|
protocols/pubsub/errors,
|
||||||
protocols/pubsub/pubsub,
|
protocols/pubsub/pubsub,
|
||||||
protocols/pubsub/gossipsub,
|
protocols/pubsub/gossipsub,
|
||||||
protocols/pubsub/floodsub,
|
protocols/pubsub/floodsub,
|
||||||
|
protocols/pubsub/rpc/messages,
|
||||||
protocols/secure/secure]
|
protocols/secure/secure]
|
||||||
|
|
||||||
export builders
|
export builders
|
||||||
|
|
||||||
randomize()
|
randomize()
|
||||||
|
|
||||||
|
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||||
|
let mid =
|
||||||
|
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||||
|
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||||
|
else:
|
||||||
|
# This part is irrelevant because it's not standard,
|
||||||
|
# We use it exclusively for testing basically and users should
|
||||||
|
# implement their own logic in the case they use anonymization
|
||||||
|
$m.data.hash & $m.topicIDs.hash
|
||||||
|
ok mid.toBytes()
|
||||||
|
|
||||||
proc generateNodes*(
|
proc generateNodes*(
|
||||||
num: Natural,
|
num: Natural,
|
||||||
secureManagers: openArray[SecureProtocol] = [
|
secureManagers: openArray[SecureProtocol] = [
|
||||||
SecureProtocol.Noise
|
SecureProtocol.Noise
|
||||||
],
|
],
|
||||||
msgIdProvider: MsgIdProvider = nil,
|
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
|
||||||
gossip: bool = false,
|
gossip: bool = false,
|
||||||
triggerSelf: bool = false,
|
triggerSelf: bool = false,
|
||||||
verifySignature: bool = libp2p_pubsub_verify,
|
verifySignature: bool = libp2p_pubsub_verify,
|
||||||
anonymize: bool = libp2p_pubsub_anonymize,
|
anonymize: bool = libp2p_pubsub_anonymize,
|
||||||
sign: bool = libp2p_pubsub_sign,
|
sign: bool = libp2p_pubsub_sign,
|
||||||
|
unsubscribeBackoff = 1.seconds,
|
||||||
maxMessageSize: int = 1024 * 1024): seq[PubSub] =
|
maxMessageSize: int = 1024 * 1024): seq[PubSub] =
|
||||||
|
|
||||||
for i in 0..<num:
|
for i in 0..<num:
|
||||||
|
@ -40,7 +54,7 @@ proc generateNodes*(
|
||||||
msgIdProvider = msgIdProvider,
|
msgIdProvider = msgIdProvider,
|
||||||
anonymize = anonymize,
|
anonymize = anonymize,
|
||||||
maxMessageSize = maxMessageSize,
|
maxMessageSize = maxMessageSize,
|
||||||
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = 1.seconds; p))
|
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = unsubscribeBackoff; p))
|
||||||
# set some testing params, to enable scores
|
# set some testing params, to enable scores
|
||||||
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
|
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
|
||||||
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
|
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
|
||||||
|
|
|
@ -463,3 +463,33 @@ suite "Connection Manager":
|
||||||
await connMngr.close()
|
await connMngr.close()
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
allFutures(conns.mapIt( it.close() )))
|
allFutures(conns.mapIt( it.close() )))
|
||||||
|
|
||||||
|
asyncTest "allow force dial":
|
||||||
|
let connMngr = ConnManager.new(maxConnections = 2)
|
||||||
|
|
||||||
|
var conns: seq[Connection]
|
||||||
|
for i in 0..<3:
|
||||||
|
let conn = connMngr.trackOutgoingConn(
|
||||||
|
(proc(): Future[Connection] {.async.} =
|
||||||
|
return Connection.new(
|
||||||
|
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
|
||||||
|
Direction.In)
|
||||||
|
), true
|
||||||
|
)
|
||||||
|
|
||||||
|
check await conn.withTimeout(10.millis)
|
||||||
|
conns.add(await conn)
|
||||||
|
|
||||||
|
# should throw adding a connection over the limit
|
||||||
|
expect TooManyConnectionsError:
|
||||||
|
discard await connMngr.trackOutgoingConn(
|
||||||
|
(proc(): Future[Connection] {.async.} =
|
||||||
|
return Connection.new(
|
||||||
|
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
|
||||||
|
Direction.In)
|
||||||
|
), false
|
||||||
|
)
|
||||||
|
|
||||||
|
await connMngr.close()
|
||||||
|
await allFuturesThrowing(
|
||||||
|
allFutures(conns.mapIt( it.close() )))
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import chronos, unittest2
|
import chronos, unittest2, helpers
|
||||||
import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec,
|
import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec,
|
||||||
../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid
|
../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid
|
||||||
|
|
||||||
|
@ -140,9 +140,7 @@ when isMainModule:
|
||||||
# test "Provide CID test":
|
# test "Provide CID test":
|
||||||
# check:
|
# check:
|
||||||
# waitFor(provideCidTest()) == true
|
# waitFor(provideCidTest()) == true
|
||||||
test "GossipSub test":
|
flakyAsyncTest "GossipSub test", attempts=4:
|
||||||
check:
|
check (await pubsubTest({PSGossipSub})) == true
|
||||||
waitFor(pubsubTest({PSGossipSub})) == true
|
flakyAsyncTest "FloodSub test", attempts=4:
|
||||||
test "FloodSub test":
|
check (await pubsubTest({PSFloodSub})) == true
|
||||||
check:
|
|
||||||
waitFor(pubsubTest({PSFloodSub})) == true
|
|
||||||
|
|
|
@ -10,6 +10,9 @@ import ../libp2p/errors,
|
||||||
../libp2p/protocols/protocol,
|
../libp2p/protocols/protocol,
|
||||||
../libp2p/upgrademngrs/upgrade
|
../libp2p/upgrademngrs/upgrade
|
||||||
|
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import ./helpers
|
import ./helpers
|
||||||
|
|
||||||
when defined(nimHasUsed): {.used.}
|
when defined(nimHasUsed): {.used.}
|
||||||
|
@ -53,7 +56,7 @@ method readOnce*(s: TestSelectStream,
|
||||||
|
|
||||||
method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
|
method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
|
||||||
|
|
||||||
method close(s: TestSelectStream) {.async, gcsafe.} =
|
method close(s: TestSelectStream) {.async, gcsafe, raises: [Defect].} =
|
||||||
s.isClosed = true
|
s.isClosed = true
|
||||||
s.isEof = true
|
s.isEof = true
|
||||||
|
|
||||||
|
@ -63,7 +66,7 @@ proc newTestSelectStream(): TestSelectStream =
|
||||||
|
|
||||||
## Mock stream for handles `ls` test
|
## Mock stream for handles `ls` test
|
||||||
type
|
type
|
||||||
LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe.}
|
LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
TestLsStream = ref object of Connection
|
TestLsStream = ref object of Connection
|
||||||
step*: int
|
step*: int
|
||||||
|
@ -115,7 +118,7 @@ proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
|
||||||
|
|
||||||
## Mock stream for handles `na` test
|
## Mock stream for handles `na` test
|
||||||
type
|
type
|
||||||
NaHandler = proc(procs: string): Future[void] {.gcsafe.}
|
NaHandler = proc(procs: string): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
TestNaStream = ref object of Connection
|
TestNaStream = ref object of Connection
|
||||||
step*: int
|
step*: int
|
||||||
|
@ -195,14 +198,14 @@ suite "Multistream select":
|
||||||
asyncTest "test handle `ls`":
|
asyncTest "test handle `ls`":
|
||||||
let ms = MultistreamSelect.new()
|
let ms = MultistreamSelect.new()
|
||||||
|
|
||||||
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration
|
var conn: Connection = nil
|
||||||
let conn = Connection(newTestLsStream(testLsHandler))
|
|
||||||
let done = newFuture[void]()
|
let done = newFuture[void]()
|
||||||
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} =
|
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} =
|
||||||
var strProto: string = string.fromBytes(proto)
|
var strProto: string = string.fromBytes(proto)
|
||||||
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
|
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
|
||||||
await conn.close()
|
await conn.close()
|
||||||
done.complete()
|
done.complete()
|
||||||
|
conn = Connection(newTestLsStream(testLsHandler))
|
||||||
|
|
||||||
proc testHandler(conn: Connection, proto: string): Future[void]
|
proc testHandler(conn: Connection, proto: string): Future[void]
|
||||||
{.async, gcsafe.} = discard
|
{.async, gcsafe.} = discard
|
||||||
|
@ -216,13 +219,12 @@ suite "Multistream select":
|
||||||
asyncTest "test handle `na`":
|
asyncTest "test handle `na`":
|
||||||
let ms = MultistreamSelect.new()
|
let ms = MultistreamSelect.new()
|
||||||
|
|
||||||
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.}
|
var conn: Connection = nil
|
||||||
let conn = newTestNaStream(testNaHandler)
|
|
||||||
|
|
||||||
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
|
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
|
||||||
echo msg
|
echo msg
|
||||||
check msg == Na
|
check msg == Na
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
conn = newTestNaStream(testNaHandler)
|
||||||
|
|
||||||
var protocol: LPProtocol = new LPProtocol
|
var protocol: LPProtocol = new LPProtocol
|
||||||
proc testHandler(conn: Connection,
|
proc testHandler(conn: Connection,
|
||||||
|
|
|
@ -40,7 +40,7 @@ const
|
||||||
type
|
type
|
||||||
TestProto = ref object of LPProtocol
|
TestProto = ref object of LPProtocol
|
||||||
|
|
||||||
method init(p: TestProto) {.gcsafe.} =
|
method init(p: TestProto) {.gcsafe, raises: [Defect].} =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||||
let msg = string.fromBytes(await conn.readLp(1024))
|
let msg = string.fromBytes(await conn.readLp(1024))
|
||||||
check "Hello!" == msg
|
check "Hello!" == msg
|
||||||
|
|
|
@ -36,7 +36,7 @@ suite "AsyncSemaphore":
|
||||||
await sema.acquire()
|
await sema.acquire()
|
||||||
let fut = sema.acquire()
|
let fut = sema.acquire()
|
||||||
|
|
||||||
check sema.count == -1
|
check sema.count == 0
|
||||||
sema.release()
|
sema.release()
|
||||||
sema.release()
|
sema.release()
|
||||||
check sema.count == 1
|
check sema.count == 1
|
||||||
|
@ -66,7 +66,7 @@ suite "AsyncSemaphore":
|
||||||
|
|
||||||
let fut = sema.acquire()
|
let fut = sema.acquire()
|
||||||
check fut.finished == false
|
check fut.finished == false
|
||||||
check sema.count == -1
|
check sema.count == 0
|
||||||
|
|
||||||
sema.release()
|
sema.release()
|
||||||
sema.release()
|
sema.release()
|
||||||
|
@ -104,12 +104,20 @@ suite "AsyncSemaphore":
|
||||||
|
|
||||||
await sema.acquire()
|
await sema.acquire()
|
||||||
|
|
||||||
let tmp = sema.acquire()
|
let
|
||||||
check not tmp.finished()
|
tmp = sema.acquire()
|
||||||
|
tmp2 = sema.acquire()
|
||||||
|
check:
|
||||||
|
not tmp.finished()
|
||||||
|
not tmp2.finished()
|
||||||
|
|
||||||
tmp.cancel()
|
tmp.cancel()
|
||||||
sema.release()
|
sema.release()
|
||||||
|
|
||||||
|
check tmp2.finished()
|
||||||
|
|
||||||
|
sema.release()
|
||||||
|
|
||||||
check await sema.acquire().withTimeout(10.millis)
|
check await sema.acquire().withTimeout(10.millis)
|
||||||
|
|
||||||
asyncTest "should handle out of order cancellations":
|
asyncTest "should handle out of order cancellations":
|
||||||
|
@ -145,3 +153,43 @@ suite "AsyncSemaphore":
|
||||||
sema.release()
|
sema.release()
|
||||||
|
|
||||||
check await sema.acquire().withTimeout(10.millis)
|
check await sema.acquire().withTimeout(10.millis)
|
||||||
|
|
||||||
|
asyncTest "should handle forceAcquire properly":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel
|
||||||
|
|
||||||
|
let
|
||||||
|
fut1 = sema.acquire()
|
||||||
|
fut2 = sema.acquire()
|
||||||
|
|
||||||
|
sema.forceAcquire()
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
await fut1 or fut2 or sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut1.finished()
|
||||||
|
not fut2.finished()
|
||||||
|
|
||||||
|
sema.release()
|
||||||
|
await fut1 or fut2 or sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut1.finished()
|
||||||
|
fut2.finished()
|
||||||
|
|
||||||
|
|
||||||
|
sema.forceAcquire()
|
||||||
|
sema.forceAcquire()
|
||||||
|
|
||||||
|
let
|
||||||
|
fut3 = sema.acquire()
|
||||||
|
fut4 = sema.acquire()
|
||||||
|
fut5 = sema.acquire()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
await sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut3.finished()
|
||||||
|
fut4.finished()
|
||||||
|
not fut5.finished()
|
||||||
|
|
|
@ -562,6 +562,7 @@ suite "Switch":
|
||||||
conns.dec
|
conns.dec
|
||||||
|
|
||||||
switches.add(newStandardSwitch(
|
switches.add(newStandardSwitch(
|
||||||
|
maxConnsPerPeer = 10,
|
||||||
rng = rng))
|
rng = rng))
|
||||||
|
|
||||||
switches[0].addConnEventHandler(hook, ConnEventKind.Connected)
|
switches[0].addConnEventHandler(hook, ConnEventKind.Connected)
|
||||||
|
@ -840,6 +841,10 @@ suite "Switch":
|
||||||
switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet()
|
switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet()
|
||||||
|
|
||||||
asyncTest "e2e should allow multiple local addresses":
|
asyncTest "e2e should allow multiple local addresses":
|
||||||
|
when defined(windows):
|
||||||
|
# this randomly locks the Windows CI job
|
||||||
|
skip()
|
||||||
|
return
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||||
try:
|
try:
|
||||||
let msg = string.fromBytes(await conn.readLp(1024))
|
let msg = string.fromBytes(await conn.readLp(1024))
|
||||||
|
|
|
@ -13,7 +13,7 @@ import ../libp2p/[stream/connection,
|
||||||
import ./helpers, ./commontransport
|
import ./helpers, ./commontransport
|
||||||
|
|
||||||
const
|
const
|
||||||
SecureKey* = """
|
SecureKey = """
|
||||||
-----BEGIN PRIVATE KEY-----
|
-----BEGIN PRIVATE KEY-----
|
||||||
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAP0yH7F7FtGunC91
|
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAP0yH7F7FtGunC91
|
||||||
IPkU+u8B4gdxiwYW0J3PrixtB1Xz3e4dfjwQqhIJlG6BxQ4myCxmSPjxP/eOOYp+
|
IPkU+u8B4gdxiwYW0J3PrixtB1Xz3e4dfjwQqhIJlG6BxQ4myCxmSPjxP/eOOYp+
|
||||||
|
@ -32,7 +32,7 @@ NABr5ec1FxuJa/8=
|
||||||
-----END PRIVATE KEY-----
|
-----END PRIVATE KEY-----
|
||||||
"""
|
"""
|
||||||
|
|
||||||
SecureCert* = """
|
SecureCert = """
|
||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIICjDCCAfWgAwIBAgIURjeiJmkNbBVktqXvnXh44DKx364wDQYJKoZIhvcNAQEL
|
MIICjDCCAfWgAwIBAgIURjeiJmkNbBVktqXvnXh44DKx364wDQYJKoZIhvcNAQEL
|
||||||
BQAwVzELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
|
BQAwVzELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
|
||||||
|
@ -62,13 +62,16 @@ suite "WebSocket transport":
|
||||||
|
|
||||||
commonTransportTest(
|
commonTransportTest(
|
||||||
"WebSocket Secure",
|
"WebSocket Secure",
|
||||||
proc (): Transport =
|
(proc (): Transport {.gcsafe.} =
|
||||||
WsTransport.new(
|
try:
|
||||||
Upgrade(),
|
return WsTransport.new(
|
||||||
TLSPrivateKey.init(SecureKey),
|
Upgrade(),
|
||||||
TLSCertificate.init(SecureCert),
|
TLSPrivateKey.init(SecureKey),
|
||||||
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}),
|
TLSCertificate.init(SecureCert),
|
||||||
"/ip4/0.0.0.0/tcp/0/wss")
|
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName})
|
||||||
|
except Exception: check(false)
|
||||||
|
),
|
||||||
|
"/ip4/0.0.0.0/tcp/0/wss")
|
||||||
|
|
||||||
asyncTest "Hostname verification":
|
asyncTest "Hostname verification":
|
||||||
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet()]
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet()]
|
||||||
|
|
Loading…
Reference in New Issue