diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6d0bcc..a02312e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,78 +9,49 @@ on: jobs: build: + timeout-minutes: 90 strategy: fail-fast: false - max-parallel: 20 matrix: target: - # Unit tests - os: linux cpu: amd64 - os: linux cpu: i386 - os: macos cpu: amd64 - - os: windows - cpu: i386 - os: windows cpu: amd64 + #- os: windows + #cpu: i386 + branch: [version-1-2, devel] include: - target: os: linux builder: ubuntu-20.04 + shell: bash - target: os: macos builder: macos-10.15 + shell: bash - target: os: windows builder: windows-2019 + shell: msys2 {0} defaults: run: - shell: bash + shell: ${{ matrix.shell }} - name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}' + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' runs-on: ${{ matrix.builder }} + continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }} steps: - - name: Checkout nim-libp2p + - name: Checkout uses: actions/checkout@v2 with: submodules: true - - name: Derive environment variables - run: | - if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then - ARCH=64 - PLATFORM=x64 - else - ARCH=32 - PLATFORM=x86 - fi - echo "ARCH=$ARCH" >> $GITHUB_ENV - echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - ncpu= - ext= - MAKE_CMD="make" - case '${{ runner.os }}' in - 'Linux') - ncpu=$(nproc) - ;; - 'macOS') - ncpu=$(sysctl -n hw.ncpu) - ;; - 'Windows') - ncpu=$NUMBER_OF_PROCESSORS - ext=.exe - MAKE_CMD="mingw32-make" - ;; - esac - [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 - echo "ncpu=$ncpu" >> $GITHUB_ENV - echo "ext=$ext" >> $GITHUB_ENV - echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Install build dependencies (Linux i386) if: runner.os == 'Linux' && matrix.target.cpu == 'i386' run: | @@ -101,68 +72,83 @@ jobs: chmod 755 external/bin/gcc external/bin/g++ echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH - - name: Restore MinGW-W64 (Windows) from cache - if: runner.os == 'Windows' - id: windows-mingw-cache - uses: actions/cache@v2 + - name: MSYS2 (Windows i386) + if: runner.os == 'Windows' && matrix.target.cpu == 'i386' + uses: msys2/setup-msys2@v2 with: - path: external/mingw-${{ matrix.target.cpu }} - key: 'mingw-${{ matrix.target.cpu }}' + path-type: inherit + msystem: MINGW32 + install: >- + base-devel + git + mingw-w64-i686-toolchain + + - name: MSYS2 (Windows amd64) + if: runner.os == 'Windows' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + install: >- + base-devel + git + mingw-w64-x86_64-toolchain - name: Restore Nim DLLs dependencies (Windows) from cache if: runner.os == 'Windows' id: windows-dlls-cache uses: actions/cache@v2 with: - path: external/dlls-${{ matrix.target.cpu }} - key: 'dlls-${{ matrix.target.cpu }}' + path: external/dlls + key: 'dlls' - - name: Install MinGW64 dependency (Windows) - if: > - steps.windows-mingw-cache.outputs.cache-hit != 'true' && - runner.os == 'Windows' - run: | - mkdir -p external - curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z" - 7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/ - mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }} - - - name: Install DLLs dependencies (Windows) + - name: Install DLL dependencies (Windows) if: > steps.windows-dlls-cache.outputs.cache-hit != 'true' && runner.os == 'Windows' run: | - mkdir -p external + mkdir external curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip - 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + 7z x external/windeps.zip -oexternal/dlls - name: Path to cached dependencies (Windows) if: > runner.os == 'Windows' run: | - echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH - echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH - - name: Get latest Nim commit hash - id: versions + - name: Derive environment variables run: | - getHash() { - git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1 - } - nbsHash=$(getHash status-im/nimbus-build-system) - echo "::set-output name=nimbus_build_system::$nbsHash" + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + PLATFORM=x64 + else + PLATFORM=x86 + fi + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - name: Restore prebuilt Nim from cache - id: nim-cache - uses: actions/cache@v2 - with: - path: NimBinaries - key: 'NimBinaries-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}' + ncpu= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Build Nim and associated tools + - name: Build Nim and Nimble run: | curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh - env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} CC=gcc bash build_nim.sh nim csources dist/nimble NimBinaries + env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \ + QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \ + bash build_nim.sh nim csources dist/nimble NimBinaries echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH - name: Setup Go @@ -174,8 +160,14 @@ jobs: run: | V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 - - name: Run nim-libp2p tests + - name: Run tests run: | + if [[ "${{ matrix.target.os }}" == "windows" ]]; then + # https://github.com/status-im/nimbus-eth2/issues/3121 + export NIMFLAGS="-d:nimRawSetjmp" + fi + nim --version + nimble --version nimble install_pinned nimble test diff --git a/.github/workflows/multi_nim.yml b/.github/workflows/multi_nim.yml index 655ca7e..5d55afb 100644 --- a/.github/workflows/multi_nim.yml +++ b/.github/workflows/multi_nim.yml @@ -6,6 +6,7 @@ on: jobs: build: + timeout-minutes: 120 strategy: fail-fast: false matrix: @@ -16,69 +17,39 @@ jobs: cpu: i386 - os: macos cpu: amd64 - #- os: windows - #cpu: i386 - os: windows cpu: amd64 + #- os: windows + #cpu: i386 branch: [version-1-2, version-1-4, version-1-6, devel] include: - target: os: linux builder: ubuntu-20.04 + shell: bash - target: os: macos builder: macos-10.15 + shell: bash - target: os: windows builder: windows-2019 + shell: msys2 {0} defaults: run: - shell: bash + shell: ${{ matrix.shell }} name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' runs-on: ${{ matrix.builder }} continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }} steps: - - name: Checkout nim-libp2p + - name: Checkout uses: actions/checkout@v2 with: - ref: master + ref: unstable submodules: true - - name: Derive environment variables - run: | - if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then - ARCH=64 - PLATFORM=x64 - else - ARCH=32 - PLATFORM=x86 - fi - echo "ARCH=$ARCH" >> $GITHUB_ENV - echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV - - ncpu= - ext= - MAKE_CMD="make" - case '${{ runner.os }}' in - 'Linux') - ncpu=$(nproc) - ;; - 'macOS') - ncpu=$(sysctl -n hw.ncpu) - ;; - 'Windows') - ncpu=$NUMBER_OF_PROCESSORS - ext=.exe - MAKE_CMD="mingw32-make" - ;; - esac - [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 - echo "ncpu=$ncpu" >> $GITHUB_ENV - echo "ext=$ext" >> $GITHUB_ENV - echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - - name: Install build dependencies (Linux i386) if: runner.os == 'Linux' && matrix.target.cpu == 'i386' run: | @@ -99,47 +70,76 @@ jobs: chmod 755 external/bin/gcc external/bin/g++ echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH - - name: Restore MinGW-W64 (Windows) from cache - if: runner.os == 'Windows' - id: windows-mingw-cache - uses: actions/cache@v2 + - name: MSYS2 (Windows i386) + if: runner.os == 'Windows' && matrix.target.cpu == 'i386' + uses: msys2/setup-msys2@v2 with: - path: external/mingw-${{ matrix.target.cpu }} - key: 'mingw-${{ matrix.target.cpu }}' + path-type: inherit + msystem: MINGW32 + install: >- + base-devel + git + mingw-w64-i686-toolchain + + - name: MSYS2 (Windows amd64) + if: runner.os == 'Windows' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + install: >- + base-devel + git + mingw-w64-x86_64-toolchain - name: Restore Nim DLLs dependencies (Windows) from cache if: runner.os == 'Windows' id: windows-dlls-cache uses: actions/cache@v2 with: - path: external/dlls-${{ matrix.target.cpu }} - key: 'dlls-${{ matrix.target.cpu }}' + path: external/dlls + key: 'dlls' - - name: Install MinGW64 dependency (Windows) - if: > - steps.windows-mingw-cache.outputs.cache-hit != 'true' && - runner.os == 'Windows' - run: | - mkdir -p external - curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z" - 7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/ - mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }} - - - name: Install DLLs dependencies (Windows) + - name: Install DLL dependencies (Windows) if: > steps.windows-dlls-cache.outputs.cache-hit != 'true' && runner.os == 'Windows' run: | - mkdir -p external + mkdir external curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip - 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + 7z x external/windeps.zip -oexternal/dlls - name: Path to cached dependencies (Windows) if: > runner.os == 'Windows' run: | - echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH - echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH + + - name: Derive environment variables + run: | + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + PLATFORM=x64 + else + PLATFORM=x86 + fi + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV + + ncpu= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV - name: Build Nim and Nimble run: | @@ -149,12 +149,27 @@ jobs: bash build_nim.sh nim csources dist/nimble NimBinaries echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH - - name: Run nim-libp2p tests + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: '^1.15.5' + + - name: Install p2pd run: | + V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 + + - name: Run tests + run: | + if [[ "${{ matrix.target.os }}" == "windows" ]]; then + # https://github.com/status-im/nimbus-eth2/issues/3121 + export NIMFLAGS="-d:nimRawSetjmp" + fi + nim --version + nimble --version nimble install -y --depsOnly - nimble test_slim + nimble test if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then echo -e "\nTesting with '--gc:orc':\n" - export NIMFLAGS="--gc:orc" - nimble test_slim + export NIMFLAGS="${NIMFLAGS} --gc:orc" + nimble test fi diff --git a/.pinned b/.pinned index 43951a3..de01465 100644 --- a/.pinned +++ b/.pinned @@ -1,17 +1,17 @@ asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798 bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7 chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882 -chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e -dnsclient;https://github.com/ba0f3/dnsclient.nim@#536cc6b7933e5f86590bb27083c0ffeab31255f9 -faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33 -httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775 -json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841 -metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286 +chronos;https://github.com/status-im/nim-chronos@#87197230779002a2bfa8642f0e2ae07e2349e304 +dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be +faststreams;https://github.com/status-im/nim-faststreams@#37a183153c071539ab870f427c09a1376ba311b9 +httputils;https://github.com/status-im/nim-http-utils@#40048e8b3e69284bdb5d4daa0a16ad93402c55db +json_serialization;https://github.com/status-im/nim-json-serialization@#4b8f487d2dfdd941df7408ceaa70b174cce02180 +metrics;https://github.com/status-im/nim-metrics@#71e0f0e354e1f4c59e3dc92153989c8b723c3440 nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00 -secp256k1;https://github.com/status-im/nim-secp256k1@#d790c42206fab4b8008eaa91181ca8c8c68a0105 -serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24 -stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d39da2 +secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13 +serialization;https://github.com/status-im/nim-serialization@#37bc0db558d85711967acb16e9bb822b06911d46 +stew;https://github.com/status-im/nim-stew@#bb705bf17b46d2c8f9bfb106d9cc7437009a2501 testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2 unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c -websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542 -zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd \ No newline at end of file +websock;https://github.com/status-im/nim-websock@#73edde4417f7b45003113b7a34212c3ccd95b9fd +zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2 \ No newline at end of file diff --git a/examples/directchat.nim b/examples/directchat.nim index d473a9f..121f031 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -162,7 +162,10 @@ proc main() {.async.} = stdinReader = fromPipe(rfd) var thread: Thread[AsyncFD] - thread.createThread(readInput, wfd) + try: + thread.createThread(readInput, wfd) + except Exception as exc: + quit("Failed to create thread: " & exc.msg) var localAddress = MultiAddress.init(DefaultAddr).tryGet() diff --git a/libp2p.nimble b/libp2p.nimble index 33be693..a5483ac 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -9,7 +9,7 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"] requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", - "https://github.com/ba0f3/dnsclient.nim == 0.1.0", + "dnsclient >= 0.1.2", "bearssl >= 0.1.4", "chronicles >= 0.10.2", "chronos >= 3.0.6", @@ -18,11 +18,18 @@ requires "nim >= 1.2.0", "stew#head", "websock" +const nimflags = + "--verbosity:0 --hints:off " & + "--warning[CaseTransition]:off --warning[ObservableStores]:off " & + "--warning[LockLevel]:off " & + "-d:chronosStrictException " & + "--styleCheck:usages --styleCheck:hint " + proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = - let env_nimflags = getEnv("NIMFLAGS") - var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics --verbosity:0 --hints:off --styleCheck:usages --styleCheck:hint " & env_nimflags - excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") + var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics " + excstr.add(" " & getEnv("NIMFLAGS") & " ") + excstr.add(" " & nimflags & " ") excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_verify=" & $verify) excstr.add(" " & moreoptions & " ") @@ -34,8 +41,8 @@ proc runTest(filename: string, verify: bool = true, sign: bool = true, rmFile "tests/" & filename.toExe proc buildSample(filename: string, run = false) = - var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off " - excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") + var excstr = "nim c --opt:speed --threads:on -d:debug " + excstr.add(" " & nimflags & " ") excstr.add(" examples/" & filename) exec excstr if run: @@ -44,7 +51,7 @@ proc buildSample(filename: string, run = false) = proc buildTutorial(filename: string) = discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " & - " nim --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off c -" + " nim " & nimflags & " c -" task testnative, "Runs libp2p native tests": runTest("testnative") @@ -83,7 +90,7 @@ task test, "Runs the test suite": exec "nimble testfilter" exec "nimble examples_build" -task test_slim, "Runs the test suite": +task test_slim, "Runs the (slimmed down) test suite": exec "nimble testnative" exec "nimble testpubsub_slim" exec "nimble testfilter" diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 9613874..f56c7b7 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -25,7 +25,7 @@ declareGauge(libp2p_peers, "total connected peers") const MaxConnections* = 50 - MaxConnectionsPerPeer* = 5 + MaxConnectionsPerPeer* = 1 type TooManyConnectionsError* = object of LPError @@ -65,7 +65,7 @@ type discard PeerEventHandler* = - proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} + proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [Defect].} MuxerHolder = object muxer: Muxer @@ -452,7 +452,8 @@ proc trackIncomingConn*(c: ConnManager, raise exc proc trackOutgoingConn*(c: ConnManager, - provider: ConnProvider): + provider: ConnProvider, + forceDial = false): Future[Connection] {.async.} = ## try acquiring a connection if all slots ## are already taken, raise TooManyConnectionsError @@ -462,7 +463,9 @@ proc trackOutgoingConn*(c: ConnManager, trace "Tracking outgoing connection", count = c.outSema.count, max = c.outSema.size - if not c.outSema.tryAcquire(): + if forceDial: + c.outSema.forceAcquire() + elif not c.outSema.tryAcquire(): trace "Too many outgoing connections!", count = c.outSema.count, max = c.outSema.size raise newTooManyConnectionsError() diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index ff9a502..c082a60 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -150,10 +150,10 @@ type key*: PublicKey P2PStreamCallback* = proc(api: DaemonAPI, - stream: P2PStream): Future[void] {.gcsafe.} + stream: P2PStream): Future[void] {.gcsafe, raises: [Defect, CatchableError].} P2PPubSubCallback* = proc(api: DaemonAPI, ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.gcsafe.} + message: PubSubMessage): Future[bool] {.gcsafe, raises: [Defect, CatchableError].} DaemonError* = object of LPError DaemonRemoteError* = object of DaemonError @@ -755,7 +755,13 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, # Starting daemon process # echo "Starting ", cmd, " ", args.join(" ") - api.process = startProcess(cmd, "", args, env, {poParentStreams}) + api.process = + try: + startProcess(cmd, "", args, env, {poParentStreams}) + except CatchableError as exc: + raise exc + except Exception as exc: + raiseAssert exc.msg # Waiting until daemon will not be bound to control socket. while true: if not api.process.running(): @@ -900,7 +906,7 @@ proc openStream*(api: DaemonAPI, peer: PeerId, stream.flags.incl(Outbound) stream.transp = transp result = stream - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) raise exc @@ -936,7 +942,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], protocols)) pb.withMessage() do: api.servers.add(P2PServer(server: server, address: maddress)) - except Exception as exc: + except CatchableError as exc: for item in protocols: api.handlers.del(item) server.stop() @@ -1301,7 +1307,7 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, ticket.transp = transp asyncSpawn pubsubLoop(api, ticket) result = ticket - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) raise exc diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 7eb3a46..ea51270 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -19,7 +19,8 @@ type method connect*( self: Dial, peerId: PeerId, - addrs: seq[MultiAddress]) {.async, base.} = + addrs: seq[MultiAddress], + forceDial = false) {.async, base.} = ## connect remote peer without negotiating ## a protocol ## @@ -29,7 +30,8 @@ method connect*( method dial*( self: Dial, peerId: PeerId, - protos: seq[string]): Future[Connection] {.async, base.} = + protos: seq[string], + ): Future[Connection] {.async, base.} = ## create a protocol stream over an ## existing connection ## @@ -40,7 +42,8 @@ method dial*( self: Dial, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] {.async, base.} = + protos: seq[string], + forceDial = false): Future[Connection] {.async, base.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 3d02379..65cc1d6 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -47,7 +47,8 @@ type proc dialAndUpgrade( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]): + addrs: seq[MultiAddress], + forceDial: bool): Future[Connection] {.async.} = debug "Dialing peer", peerId @@ -72,7 +73,8 @@ proc dialAndUpgrade( transportCopy = transport addressCopy = a await self.connManager.trackOutgoingConn( - () => transportCopy.dial(hostname, addressCopy) + () => transportCopy.dial(hostname, addressCopy), + forceDial ) except TooManyConnectionsError as exc: trace "Connection limit reached!" @@ -112,7 +114,8 @@ proc dialAndUpgrade( proc internalConnect( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]): + addrs: seq[MultiAddress], + forceDial: bool): Future[Connection] {.async.} = if self.localPeerId == peerId: raise newException(CatchableError, "can't dial self!") @@ -136,7 +139,7 @@ proc internalConnect( trace "Reusing existing connection", conn, direction = $conn.dir return conn - conn = await self.dialAndUpgrade(peerId, addrs) + conn = await self.dialAndUpgrade(peerId, addrs, forceDial) if isNil(conn): # None of the addresses connected raise newException(DialFailedError, "Unable to establish outgoing link") @@ -159,7 +162,8 @@ proc internalConnect( method connect*( self: Dialer, peerId: PeerId, - addrs: seq[MultiAddress]) {.async.} = + addrs: seq[MultiAddress], + forceDial = false) {.async.} = ## connect remote peer without negotiating ## a protocol ## @@ -167,7 +171,7 @@ method connect*( if self.connManager.connCount(peerId) > 0: return - discard await self.internalConnect(peerId, addrs) + discard await self.internalConnect(peerId, addrs, forceDial) proc negotiateStream( self: Dialer, @@ -200,7 +204,8 @@ method dial*( self: Dialer, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] {.async.} = + protos: seq[string], + forceDial = false): Future[Connection] {.async.} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -218,7 +223,7 @@ method dial*( try: trace "Dialing (new)", peerId, protos - conn = await self.internalConnect(peerId, addrs) + conn = await self.internalConnect(peerId, addrs, forceDial) trace "Opening stream", conn stream = await self.connManager.getStream(conn) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 83d4116..3b6f3ed 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -49,7 +49,7 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = for fut in args: futs &= fut proc call() {.async.} = - var first: ref Exception = nil + var first: ref CatchableError = nil futs = await allFinished(futs) for fut in futs: if fut.failed: diff --git a/libp2p/nameresolving/dnsresolver.nim b/libp2p/nameresolving/dnsresolver.nim index 1bb6660..d224afc 100644 --- a/libp2p/nameresolving/dnsresolver.nim +++ b/libp2p/nameresolving/dnsresolver.nim @@ -78,7 +78,12 @@ proc getDnsResponse( dataStream = newStringStream() dataStream.writeData(addr rawResponse[0], rawResponse.len) dataStream.setPosition(0) + # parseResponse can has a raises: [Exception, ..] because of + # https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008 + # it can't actually raise though return parseResponse(dataStream) + except CatchableError as exc: raise exc + except Exception as exc: raiseAssert exc.msg finally: await sock.closeWait() diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 28ae801..dfa0813 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -9,7 +9,7 @@ {.push raises: [Defect].} -import options +import std/[sequtils, options, strutils] import chronos, chronicles import ../protobuf/minprotobuf, ../peerinfo, @@ -107,11 +107,14 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = iinfo.protoVersion = some(protoVersion) if r6.get(): iinfo.agentVersion = some(agentVersion) - debug "decodeMsg: decoded message", pubkey = ($pubkey).shortLog, - addresses = $iinfo.addrs, protocols = $iinfo.protos, - observable_address = $iinfo.observedAddr, - proto_version = $iinfo.protoVersion, - agent_version = $iinfo.agentVersion + debug "decodeMsg: decoded identify", pubkey = ($pubkey).shortLog, + addresses = iinfo.addrs.mapIt($it).join(","), + protocols = iinfo.protos.mapIt($it).join(","), + observable_address = + if iinfo.observedAddr.isSome(): $iinfo.observedAddr.get() + else: "None", + proto_version = iinfo.protoVersion.get("None"), + agent_version = iinfo.agentVersion.get("None") some(iinfo) else: trace "decodeMsg: failed to decode received message" diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index 6e5e6bc..c07b2a0 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import chronos, chronicles, bearssl import ../protobuf/minprotobuf, ../peerinfo, diff --git a/libp2p/protocols/pubsub/errors.nim b/libp2p/protocols/pubsub/errors.nim new file mode 100644 index 0000000..cfb2ccc --- /dev/null +++ b/libp2p/protocols/pubsub/errors.nim @@ -0,0 +1,6 @@ +# this module will be further extended in PR +# https://github.com/status-im/nim-libp2p/pull/107/ + +type + ValidationResult* {.pure.} = enum + Accept, Reject, Ignore diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index eda35ee..54d6f28 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub, f.handleSubscribe(peer, sub.topic, sub.subscribe) for msg in rpcMsg.messages: # for every message - let msgId = f.msgIdProvider(msg) + let msgIdResult = f.msgIdProvider(msg) + if msgIdResult.isErr: + debug "Dropping message due to failed message id generation", + error = msgIdResult.error + # TODO: descore peers due to error during message validation (malicious?) + continue + + let msgId = msgIdResult.get if f.addSeen(msgId): trace "Dropping already-seen message", msgId, peer @@ -184,7 +191,14 @@ method publish*(f: FloodSub, Message.init(none(PeerInfo), data, topic, none(uint64), false) else: Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign) - msgId = f.msgIdProvider(msg) + msgIdResult = f.msgIdProvider(msg) + + if msgIdResult.isErr: + trace "Error generating message id, skipping publish", + error = msgIdResult.error + return 0 + + let msgId = msgIdResult.get trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f46260e..8261ac5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -186,16 +186,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = if s[].len == 0: g.peersInIP.del(pubSubPeer.address.get()) - for t in toSeq(g.gossipsub.keys): - g.gossipsub.removePeer(t, pubSubPeer) - # also try to remove from explicit table here - g.explicit.removePeer(t, pubSubPeer) - for t in toSeq(g.mesh.keys): trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score g.pruned(pubSubPeer, t) g.mesh.removePeer(t, pubSubPeer) + for t in toSeq(g.gossipsub.keys): + g.gossipsub.removePeer(t, pubSubPeer) + # also try to remove from explicit table here + g.explicit.removePeer(t, pubSubPeer) + for t in toSeq(g.fanout.keys): g.fanout.removePeer(t, pubSubPeer) @@ -237,9 +237,14 @@ proc handleSubscribe*(g: GossipSub, else: trace "peer unsubscribed from topic" + if g.mesh.hasPeer(topic, peer): + #against spec + g.mesh.removePeer(topic, peer) + g.pruned(peer, topic) + # unsubscribe remote peer from the topic g.gossipsub.removePeer(topic, peer) - g.mesh.removePeer(topic, peer) + g.fanout.removePeer(topic, peer) if peer.peerId in g.parameters.directPeers: g.explicit.removePeer(topic, peer) @@ -282,6 +287,64 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = peer, RPCMsg(control: some(respControl), messages: messages)) +proc validateAndRelay(g: GossipSub, + msg: Message, + msgId, msgIdSalted: MessageId, + peer: PubSubPeer) {.async.} = + try: + let validation = await g.validate(msg) + + var seenPeers: HashSet[PubSubPeer] + discard g.validationSeen.pop(msgIdSalted, seenPeers) + libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) + + case validation + of ValidationResult.Reject: + debug "Dropping message after validation, reason: reject", + msgId = shortLog(msgId), peer + g.punishInvalidMessage(peer, msg.topicIDs) + return + of ValidationResult.Ignore: + debug "Dropping message after validation, reason: ignore", + msgId = shortLog(msgId), peer + return + of ValidationResult.Accept: + discard + + # store in cache only after validation + g.mcache.put(msgId, msg) + + g.rewardDelivered(peer, msg.topicIDs, true) + + var toSendPeers = HashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + if t notin g.topics: + continue + + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + + # Don't send it to source peer, or peers that + # sent it during validation + toSendPeers.excl(peer) + toSendPeers.excl(seenPeers) + + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer + for topic in msg.topicIDs: + if topic notin g.topics: continue + + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + + await handleData(g, topic, msg.data) + except CatchableError as exc: + info "validateAndRelay failed", msg=exc.msg + method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -299,8 +362,16 @@ method rpcHandler*(g: GossipSub, for i in 0.. g.parameters.d: + fanoutPeers.setLen(g.parameters.d - peers.len) + + for fanPeer in fanoutPeers: + peers.incl(fanPeer) + if peers.len > g.parameters.d: break # even if we couldn't publish, # we still attempted to publish @@ -489,7 +523,15 @@ method publish*(g: GossipSub, Message.init(none(PeerInfo), data, topic, none(uint64), false) else: Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) - msgId = g.msgIdProvider(msg) + msgIdResult = g.msgIdProvider(msg) + + if msgIdResult.isErr: + trace "Error generating message id, skipping publish", + error = msgIdResult.error + libp2p_gossipsub_failed_publish.inc() + return 0 + + let msgId = msgIdResult.get logScope: msgId = shortLog(msgId) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 9bb2a94..e545846 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -489,9 +489,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} = logScope: topic trace "about to replenish fanout" + let currentMesh = g.mesh.getOrDefault(topic) if g.fanout.peers(topic) < g.parameters.dLow: trace "replenishing fanout", peers = g.fanout.peers(topic) for peer in g.gossipsub.getOrDefault(topic): + if peer in currentMesh: continue if g.fanout.addPeer(topic, peer): if g.fanout.peers(topic) == g.parameters.d: break diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 4c119ca..a79d15f 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -11,7 +11,8 @@ import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics, bearssl -import ./pubsubpeer, +import ./errors as pubsub_errors, + ./pubsubpeer, ./rpc/[message, messages, protobuf], ../../switch, ../protocol, @@ -29,6 +30,7 @@ export results export PubSubPeer export PubSubObserver export protocol +export pubsub_errors logScope: topics = "libp2p pubsub" @@ -76,16 +78,13 @@ type TopicHandler* = proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} - ValidationResult* {.pure.} = enum - Accept, Reject, Ignore - ValidatorHandler* = proc(topic: string, message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].} TopicPair* = tuple[topic: string, handler: TopicHandler] MsgIdProvider* = - proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.} + proc(m: Message): Result[MessageID, ValidationResult] {.noSideEffect, raises: [Defect], gcsafe.} SubscriptionValidator* = proc(topic: string): bool {.raises: [Defect], gcsafe.} @@ -452,6 +451,11 @@ proc subscribe*(p: PubSub, ## on every received message ## + # Check that this is an allowed topic + if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false: + warn "Trying to subscribe to a topic not passing validation!", topic + return + p.topics.withValue(topic, handlers) do: # Already subscribed, just adding another handler handlers[].add(handler) diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 51cb3eb..2750c88 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -16,9 +16,10 @@ import ./messages, ../../../peerid, ../../../peerinfo, ../../../crypto/crypto, - ../../../protobuf/minprotobuf + ../../../protobuf/minprotobuf, + ../../../protocols/pubsub/errors -export messages +export errors, messages logScope: topics = "pubsubmessage" @@ -28,16 +29,12 @@ const PubSubPrefix = toBytes("libp2p-pubsub:") declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") -func defaultMsgIdProvider*(m: Message): MessageID = - let mid = - if m.seqno.len > 0 and m.fromPeer.data.len > 0: - byteutils.toHex(m.seqno) & $m.fromPeer - else: - # This part is irrelevant because it's not standard, - # We use it exclusively for testing basically and users should - # implement their own logic in the case they use anonymization - $m.data.hash & $m.topicIDs.hash - mid.toBytes() +func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + let mid = byteutils.toHex(m.seqno) & $m.fromPeer + ok mid.toBytes() + else: + err ValidationResult.Reject proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes()) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index c7a0a5b..da68e86 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -99,8 +99,9 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe.} = method connect*( s: Switch, peerId: PeerId, - addrs: seq[MultiAddress]): Future[void] = - s.dialer.connect(peerId, addrs) + addrs: seq[MultiAddress], + forceDial = false): Future[void] = + s.dialer.connect(peerId, addrs, forceDial) method dial*( s: Switch, @@ -117,8 +118,9 @@ method dial*( s: Switch, peerId: PeerId, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] = - s.dialer.dial(peerId, addrs, protos) + protos: seq[string], + forceDial = false): Future[Connection] = + s.dialer.dial(peerId, addrs, protos, forceDial) proc dial*( s: Switch, diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 1c2fd3d..4238c08 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -225,7 +225,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = debug "Server was closed", exc = exc.msg raise newTransportClosedError(exc) except CancelledError as exc: - raise + raise exc except CatchableError as exc: debug "Unexpected error accepting connection", exc = exc.msg raise exc diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index c0b1427..4f64442 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -49,11 +49,24 @@ proc new*(T: type WsStream, stream.initStream() return stream +template mapExceptions(body: untyped) = + try: + body + except AsyncStreamIncompleteError: + raise newLPStreamEOFError() + except AsyncStreamUseClosedError: + raise newLPStreamEOFError() + except WSClosedError: + raise newLPStreamEOFError() + except AsyncStreamLimitError: + raise newLPStreamLimitError() + method readOnce*( s: WsStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = - let res = await s.session.recv(pbytes, nbytes) + let res = mapExceptions(await s.session.recv(pbytes, nbytes)) + if res == 0 and s.session.readyState == ReadyState.Closed: raise newLPStreamEOFError() return res @@ -61,10 +74,7 @@ method readOnce*( method write*( s: WsStream, msg: seq[byte]): Future[void] {.async.} = - try: - await s.session.send(msg, Opcode.Binary) - except WSClosedError: - raise newLPStreamEOFError() + mapExceptions(await s.session.send(msg, Opcode.Binary)) method closeImpl*(s: WsStream): Future[void] {.async.} = await s.session.close() diff --git a/libp2p/utils/semaphore.nim b/libp2p/utils/semaphore.nim index 8ded05e..e396f27 100644 --- a/libp2p/utils/semaphore.nim +++ b/libp2p/utils/semaphore.nim @@ -54,16 +54,21 @@ proc acquire*(s: AsyncSemaphore): Future[void] = fut.cancelCallback = nil if not fut.finished: s.queue.keepItIf( it != fut ) - s.count.inc fut.cancelCallback = cancellation s.queue.add(fut) - s.count.dec trace "Queued slot", available = s.count, queue = s.queue.len return fut +proc forceAcquire*(s: AsyncSemaphore) = + ## ForceAcquire will always succeed, + ## creating a temporary slot if required. + ## This temporary slot will stay usable until + ## there is less `acquire`s than `release`s + s.count.dec + proc release*(s: AsyncSemaphore) = ## Release a resource from the semaphore, ## by picking the first future from the queue @@ -77,13 +82,15 @@ proc release*(s: AsyncSemaphore) = trace "Releasing slot", available = s.count, queue = s.queue.len - if s.queue.len > 0: + s.count.inc + while s.queue.len > 0: var fut = s.queue[0] s.queue.delete(0) if not fut.finished(): + s.count.dec fut.complete() + break - s.count.inc # increment the resource count trace "Released slot", available = s.count, queue = s.queue.len return diff --git a/tests/asyncunit.nim b/tests/asyncunit.nim index 9da6838..fa10c9e 100644 --- a/tests/asyncunit.nim +++ b/tests/asyncunit.nim @@ -22,3 +22,25 @@ template asyncTest*(name: string, body: untyped): untyped = proc() {.async, gcsafe.} = body )()) + +template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped = + test name: + var attemptNumber = 0 + while attemptNumber < attempts: + let isLastAttempt = attemptNumber == attempts - 1 + inc attemptNumber + try: + waitFor(( + proc() {.async, gcsafe.} = + body + )()) + except Exception as e: + if isLastAttempt: raise e + else: testStatusIMPL = TestStatus.FAILED + finally: + if not isLastAttempt: + if testStatusIMPL == TestStatus.FAILED: + # Retry + testStatusIMPL = TestStatus.OK + else: + break diff --git a/tests/commontransport.nim b/tests/commontransport.nim index 92f817a..532689e 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -11,7 +11,7 @@ import ../libp2p/[stream/connection, import ./helpers -type TransportProvider* = proc(): Transport {.gcsafe.} +type TransportProvider* = proc(): Transport {.gcsafe, raises: [Defect].} proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = suite name & " common tests": @@ -137,6 +137,10 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = await transport1.stop() asyncTest "e2e should allow multiple local addresses": + when defined(windows): + # this randomly locks the Windows CI job + skip() + return let addrs = @[MultiAddress.init(ma).tryGet(), MultiAddress.init(ma).tryGet()] diff --git a/tests/helpers.nim b/tests/helpers.nim index c336291..a607df6 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -13,6 +13,8 @@ import ../libp2p/protocols/secure/secure import ./asyncunit export asyncunit +{.push raises: [Defect].} + const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" @@ -51,7 +53,9 @@ template checkTrackers*() = checkpoint tracker.dump() fail() # Also test the GC is not fooling with us - GC_fullCollect() + try: + GC_fullCollect() + except: discard type RngWrap = object rng: ref BrHmacDrbgContext diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 0ef5080..38d00d9 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -20,6 +20,7 @@ import utils, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, protocols/pubsub/peertable] +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index d1ff8e9..6915793 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -39,6 +39,8 @@ proc randomPeerId(): PeerId = except CatchableError as exc: raise newException(Defect, exc.msg) +const MsgIdSuccess = "msg id gen success" + suite "GossipSub internal": teardown: checkTrackers() @@ -308,7 +310,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15 @@ -355,7 +357,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -396,7 +398,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == gossipSub.parameters.d @@ -437,7 +439,7 @@ suite "GossipSub internal": conn.peerId = peerId inc seqno let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) - gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) let peers = gossipSub.getGossipPeers() check peers.len == 0 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a1332ff..5986d47 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors, protocols/pubsub/peertable, protocols/pubsub/timedcache, protocols/pubsub/rpc/messages] +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers proc `$`(peer: PubSubPeer): string = shortLog(peer) @@ -564,6 +565,72 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) check observed == 2 + asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": + var passed = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete() + + let + nodes = generateNodes( + 2, + gossip = true, + unsubscribeBackoff = 10.minutes) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + nodes[1].subscribe("foobar", handler) + nodes[0].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + await waitSub(nodes[1], nodes[0], "foobar") + + nodes[0].unsubscribe("foobar", handler) + + let gsNode = GossipSub(nodes[1]) + check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0) + + nodes[0].subscribe("foobar", handler) + + check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check: + GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0 + GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 + + await passed.wait(2.seconds) + + trace "test done, stopping..." + + await nodes[0].stop() + await nodes[1].stop() + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub send over mesh A -> B": var passed: Future[bool] = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 62562c3..6e9c891 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -147,6 +147,9 @@ suite "GossipSub": nodes[1].start(), )) + # We must subscribe before setting the validator + nodes[0].subscribe("foobar", handler) + var gossip = GossipSub(nodes[0]) let invalidDetected = newFuture[void]() gossip.subscriptionValidator = @@ -162,7 +165,6 @@ suite "GossipSub": await subscribeNodes(nodes) - nodes[0].subscribe("foobar", handler) nodes[1].subscribe("foobar", handler) await invalidDetected.wait(10.seconds) diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 4b7b7ce..6aecb8a 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -5,19 +5,21 @@ import stew/byteutils import ../../libp2p/[peerid, crypto/crypto, protocols/pubsub/mcache, - protocols/pubsub/rpc/message, protocols/pubsub/rpc/messages] +import ./utils var rng = newRng() proc randomPeerId(): PeerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get() +const MsgIdGenSuccess = "msg id generation success" + suite "MCache": test "put/get": var mCache = MCache.init(3, 5) var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes()) - let msgId = defaultMsgIdProvider(msg) + let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenSuccess) mCache.put(msgId, msg) check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg @@ -28,13 +30,13 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<5: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) var mids = mCache.window("foo") check mids.len == 3 @@ -49,7 +51,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("foo").len == 0 @@ -58,7 +60,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("bar").len == 0 @@ -67,7 +69,7 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("baz").len == 0 @@ -79,19 +81,19 @@ suite "MCache": var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(defaultMsgIdProvider(msg), msg) + mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("foo").len == 0 diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index d555d89..7bc4b26 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -3,8 +3,10 @@ import unittest2 {.used.} import options +import stew/byteutils import ../../libp2p/[peerid, peerinfo, crypto/crypto, + protocols/pubsub/errors, protocols/pubsub/rpc/message, protocols/pubsub/rpc/messages] @@ -18,3 +20,56 @@ suite "Message": msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) check verify(msg) + + test "defaultMsgIdProvider success": + let + seqno = 11'u64 + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("valid private key bytes") + peer = PeerInfo.new(seckey) + msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) + msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isOk + string.fromBytes(msgIdResult.get) == + "000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw" + + test "defaultMsgIdProvider error - no source peer id": + let + seqno = 11'u64 + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("valid private key bytes") + peer = PeerInfo.new(seckey) + + var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true) + msg.fromPeer = PeerId() + let msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isErr + msgIdResult.error == ValidationResult.Reject + + test "defaultMsgIdProvider error - no source seqno": + let + pkHex = + """08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C + E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F + E731065A""" + seckey = PrivateKey.init(fromHex(stripSpaces(pkHex))) + .expect("valid private key bytes") + peer = PeerInfo.new(seckey) + msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true) + msgIdResult = msg.defaultMsgIdProvider() + + check: + msgIdResult.isErr + msgIdResult.error == ValidationResult.Reject diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 50644de..846c8bb 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -4,29 +4,43 @@ const libp2p_pubsub_verify {.booldefine.} = true libp2p_pubsub_anonymize {.booldefine.} = false -import random, tables -import chronos +import hashes, random, tables +import chronos, stew/[byteutils, results] import ../../libp2p/[builders, + protocols/pubsub/errors, protocols/pubsub/pubsub, protocols/pubsub/gossipsub, protocols/pubsub/floodsub, + protocols/pubsub/rpc/messages, protocols/secure/secure] export builders randomize() +func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] = + let mid = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + byteutils.toHex(m.seqno) & $m.fromPeer + else: + # This part is irrelevant because it's not standard, + # We use it exclusively for testing basically and users should + # implement their own logic in the case they use anonymization + $m.data.hash & $m.topicIDs.hash + ok mid.toBytes() + proc generateNodes*( num: Natural, secureManagers: openArray[SecureProtocol] = [ SecureProtocol.Noise ], - msgIdProvider: MsgIdProvider = nil, + msgIdProvider: MsgIdProvider = defaultMsgIdProvider, gossip: bool = false, triggerSelf: bool = false, verifySignature: bool = libp2p_pubsub_verify, anonymize: bool = libp2p_pubsub_anonymize, sign: bool = libp2p_pubsub_sign, + unsubscribeBackoff = 1.seconds, maxMessageSize: int = 1024 * 1024): seq[PubSub] = for i in 0..