diff --git a/.github/workflows/multi_nim.yml b/.github/workflows/multi_nim.yml new file mode 100644 index 0000000..207e02d --- /dev/null +++ b/.github/workflows/multi_nim.yml @@ -0,0 +1,170 @@ +name: Daily +on: + schedule: + - cron: "30 6 * * *" + workflow_dispatch: + +jobs: + build: + strategy: + fail-fast: false + matrix: + target: + - os: linux + cpu: amd64 + - os: linux + cpu: i386 + - os: macos + cpu: amd64 + #- os: windows + #cpu: i386 + - os: windows + cpu: amd64 + branch: [version-1-2, version-1-4, version-1-6, devel] + include: + - target: + os: linux + builder: ubuntu-20.04 + - target: + os: macos + builder: macos-10.15 + - target: + os: windows + builder: windows-2019 + + defaults: + run: + shell: bash + + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' + runs-on: ${{ matrix.builder }} + continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }} + steps: + - name: Checkout nim-libp2p + uses: actions/checkout@v2 + with: + ref: master + submodules: true + + - name: Derive environment variables + run: | + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + ARCH=64 + PLATFORM=x64 + else + ARCH=32 + PLATFORM=x86 + fi + echo "ARCH=$ARCH" >> $GITHUB_ENV + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV + + ncpu= + ext= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + ext=.exe + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "ext=$ext" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV + + - name: Install build dependencies (Linux i386) + if: runner.os == 'Linux' && matrix.target.cpu == 'i386' + run: | + sudo dpkg --add-architecture i386 + sudo apt-get update -qq + sudo DEBIAN_FRONTEND='noninteractive' apt-get install \ + --no-install-recommends -yq gcc-multilib g++-multilib \ + libssl-dev:i386 + mkdir -p external/bin + cat << EOF > external/bin/gcc + #!/bin/bash + exec $(which gcc) -m32 "\$@" + EOF + cat << EOF > external/bin/g++ + #!/bin/bash + exec $(which g++) -m32 "\$@" + EOF + chmod 755 external/bin/gcc external/bin/g++ + echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH + + - name: Restore MinGW-W64 (Windows) from cache + if: runner.os == 'Windows' + id: windows-mingw-cache + uses: actions/cache@v2 + with: + path: external/mingw-${{ matrix.target.cpu }} + key: 'mingw-${{ matrix.target.cpu }}' + + - name: Restore Nim DLLs dependencies (Windows) from cache + if: runner.os == 'Windows' + id: windows-dlls-cache + uses: actions/cache@v2 + with: + path: external/dlls-${{ matrix.target.cpu }} + key: 'dlls-${{ matrix.target.cpu }}' + + - name: Install MinGW64 dependency (Windows) + if: > + steps.windows-mingw-cache.outputs.cache-hit != 'true' && + runner.os == 'Windows' + run: | + mkdir -p external + curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z" + 7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/ + mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }} + + - name: Install DLLs dependencies (Windows) + if: > + steps.windows-dlls-cache.outputs.cache-hit != 'true' && + runner.os == 'Windows' + run: | + mkdir -p external + curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip + 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + + - name: Path to cached dependencies (Windows) + if: > + runner.os == 'Windows' + run: | + echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH + echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + + - name: Build Nim and Nimble + run: | + curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh + env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \ + QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \ + bash build_nim.sh nim csources dist/nimble NimBinaries + echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH + + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: '^1.15.5' + + - name: Install p2pd + run: | + V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 + + - name: Run nim-libp2p tests + run: | + nimble install -y --depsOnly + nimble test + if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then + echo -e "\nTesting with '--gc:orc':\n" + export NIMFLAGS="--gc:orc" + nimble test + fi + diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md index a5f5c10..a0cb72a 100644 --- a/docs/GETTING_STARTED.md +++ b/docs/GETTING_STARTED.md @@ -1,98 +1,7 @@ # Getting Started -Welcome to nim-libp2p! This guide will walk you through a peer to peer chat example.
-The full code can be found in [directchat.nim](examples/directchat.nim) under the examples folder. +Welcome to nim-libp2p! -### Direct Chat Example -To run nim-libp2p, add it to your project's nimble file and spawn a node as follows: +To get started, please look at the [tutorials](../examples/tutorial_1_connect.md) -```nim -import tables -import chronos -import ../libp2p/[switch, - multistream, - protocols/identify, - connection, - transports/transport, - transports/tcptransport, - multiaddress, - peerinfo, - crypto/crypto, - peerid, - protocols/protocol, - muxers/muxer, - muxers/mplex/mplex, - protocols/secure/secio, - protocols/secure/secure] - -const TestCodec = "/test/proto/1.0.0" # custom protocol string - -type - TestProto = ref object of LPProtocol # declare a custom protocol - -method init(p: TestProto) {.gcsafe.} = - # handle incoming connections in closure - proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - echo "Got from remote - ", cast[string](await conn.readLp(1024)) - await conn.writeLp("Hello!") - await conn.close() - - p.codec = TestCodec # init proto with the correct string id - p.handler = handle # set proto handler - -proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = - ## Helper to create a swith - - let seckey = PrivateKey.random(RSA) # use a random key for peer id - var peerInfo = PeerInfo.init(seckey) # create a peer id and assign - peerInfo.addrs.add(ma) # set this peer's multiaddresses (can be any number) - - let identify = newIdentify(peerInfo) # create the identify proto - - proc createMplex(conn: Connection): Muxer = - # helper proc to create multiplexers, - # use this to perform any custom setup up, - # such as adjusting timeout or anything else - # that the muxer requires - result = newMplex(conn) - - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) # create multiplexer - let transports = @[Transport(newTransport(TcpTransport))] # add all transports (tcp only for now, but can be anything in the future) - let muxers = {MplexCodec: mplexProvider}.toTable() # add all muxers - let secureManagers = {SecioCodec: Secure(Secio.new(seckey))}.toTable() # setup the secio and any other secure provider - - # create the switch - let switch = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers) - result = (switch, peerInfo) - -proc main() {.async, gcsafe.} = - let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - - var peerInfo1, peerInfo2: PeerInfo - var switch1, switch2: Switch - (switch1, peerInfo1) = createSwitch(ma1) # create node 1 - - # setup the custom proto - let testProto = new TestProto - testProto.init() # run it's init method to perform any required initialization - switch1.mount(testProto) # mount the proto - var switch1Fut = await switch1.start() # start the node - - (switch2, peerInfo2) = createSwitch(ma2) # create node 2 - var switch2Fut = await switch2.start() # start second node - let conn = await switch2.dial(switch1.peerInfo, TestCodec) # dial the first node - - await conn.writeLp("Hello!") # writeLp send a length prefixed buffer over the wire - # readLp reads length prefixed bytes and returns a buffer without the prefix - echo "Remote responded with - ", cast[string](await conn.readLp(1024)) - - await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports - await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown - -waitFor(main()) -``` +For more concrete examples, you can look at the [hello world example](../examples/helloworld.nim) or the [direct chat](../examples/directchat.nim) diff --git a/examples/directchat.nim b/examples/directchat.nim index 197be4a..225a9d5 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -1,54 +1,88 @@ when not(compileOption("threads")): {.fatal: "Please, compile this program with the --threads:on option!".} -import tables, strformat, strutils, bearssl -import chronos # an efficient library for async -import ../libp2p/[switch, # manage transports, a single entry point for dialing and listening - builders, # helper to build the switch object - multistream, # tag stream with short header to identify it - multicodec, # multicodec utilities - crypto/crypto, # cryptographic functions - errors, # error handling utilities - protocols/identify, # identify the peer info of a peer - stream/connection, # create and close stream read / write connections - transports/transport, # listen and dial to other peers using p2p protocol - transports/tcptransport, # listen and dial to other peers using client-server protocol - multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP - peerinfo, # manage the information of a peer, such as peer ID and public / private key - peerid, # Implement how peers interact - protocols/protocol, # define the protocol base type - protocols/secure/secure, # define the protocol of secure connection - protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS - muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection - muxers/mplex/mplex] # define some contants and message types for stream multiplexing +import + strformat, strutils, bearssl, + stew/byteutils, + chronos, + ../libp2p -const ChatCodec = "/nim-libp2p/chat/1.0.0" -const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" +const DefaultAddr = "/ip4/127.0.0.1/tcp/0" const Help = """ - Commands: /[?|hep|connect|disconnect|exit] + Commands: /[?|help|connect|disconnect|exit] help: Prints this help connect: dials a remote peer disconnect: ends current session exit: closes the chat """ -type ChatProto = ref object of LPProtocol - switch: Switch # a single entry point for dialing and listening to peer - transp: StreamTransport # transport streams between read & write file descriptor - conn: Connection # create and close read & write stream - connected: bool # if the node is connected to another peer - started: bool # if the node has started +type + Chat = ref object + switch: Switch # a single entry point for dialing and listening to peer + stdinReader: StreamTransport # transport streams between read & write file descriptor + conn: Connection # connection to the other peer + connected: bool # if the node is connected to another peer -proc readAndPrint(p: ChatProto) {.async.} = - while true: - var strData = await p.conn.readLp(1024) - strData &= '\0'.uint8 - var str = cast[cstring](addr strdata[0]) - echo $p.switch.peerInfo.peerId & ": " & $str - await sleepAsync(100.millis) +## +# Stdout helpers, to write the prompt +## +proc writePrompt(c: Chat) = + if c.connected: + stdout.write '\r' & $c.switch.peerInfo.peerId & ": " + stdout.flushFile() -proc dialPeer(p: ChatProto, address: string) {.async.} = +proc writeStdout(c: Chat, str: string) = + echo '\r' & str + c.writePrompt() + +## +# Chat Protocol +## +const ChatCodec = "/nim-libp2p/chat/1.0.0" + +type + ChatProto = ref object of LPProtocol + +proc new(T: typedesc[ChatProto], c: Chat): T = + let chatproto = T() + + # create handler for incoming connection + proc handle(stream: Connection, proto: string) {.async.} = + if c.connected and not c.conn.closed: + c.writeStdout "a chat session is already in progress - refusing incoming peer!" + await stream.close() + else: + await c.handlePeer(stream) + await stream.close() + + # assign the new handler + chatproto.handler = handle + chatproto.codec = ChatCodec + return chatproto + +## +# Chat application +## +proc handlePeer(c: Chat, conn: Connection) {.async.} = + # Handle a peer (incoming or outgoing) + try: + c.conn = conn + c.connected = true + c.writeStdout $conn.peerId & " connected" + + # Read loop + while true: + let + strData = await conn.readLp(1024) + str = string.fromBytes(strData) + c.writeStdout $conn.peerId & ": " & $str + + except LPStreamEOFError: + c.writeStdout $conn.peerId & " disconnected" + +proc dialPeer(c: Chat, address: string) {.async.} = + # Parse and dial address let multiAddr = MultiAddress.init(address).tryGet() # split the peerId part /p2p/... @@ -63,86 +97,53 @@ proc dialPeer(p: ChatProto, address: string) {.async.} = wireAddr = ip4Addr & tcpAddr echo &"dialing peer: {multiAddr}" - p.conn = await p.switch.dial(remotePeer, @[wireAddr], ChatCodec) - p.connected = true - asyncSpawn p.readAndPrint() + asyncSpawn c.handlePeer(await c.switch.dial(remotePeer, @[wireAddr], ChatCodec)) -proc writeAndPrint(p: ChatProto) {.async.} = +proc readLoop(c: Chat) {.async.} = while true: - if not p.connected: + if not c.connected: echo "type an address or wait for a connection:" echo "type /[help|?] for help" - let line = await p.transp.readLine() - if line.startsWith("/help") or line.startsWith("/?") or not p.started: + c.writePrompt() + + let line = await c.stdinReader.readLine() + if line.startsWith("/help") or line.startsWith("/?"): echo Help continue if line.startsWith("/disconnect"): - echo "Ending current session" - if p.connected and p.conn.closed.not: - await p.conn.close() - p.connected = false + c.writeStdout "Ending current session" + if c.connected and c.conn.closed.not: + await c.conn.close() + c.connected = false elif line.startsWith("/connect"): - if p.connected: - var yesno = "N" - echo "a session is already in progress, do you want end it [y/N]?" - yesno = await p.transp.readLine() - if yesno.cmpIgnoreCase("y") == 0: - await p.conn.close() - p.connected = false - elif yesno.cmpIgnoreCase("n") == 0: - continue - else: - echo "unrecognized response" - continue - - echo "enter address of remote peer" - let address = await p.transp.readLine() + c.writeStdout "enter address of remote peer" + let address = await c.stdinReader.readLine() if address.len > 0: - await p.dialPeer(address) + await c.dialPeer(address) elif line.startsWith("/exit"): - if p.connected and p.conn.closed.not: - await p.conn.close() - p.connected = false + if c.connected and c.conn.closed.not: + await c.conn.close() + c.connected = false - await p.switch.stop() - echo "quitting..." + await c.switch.stop() + c.writeStdout "quitting..." quit(0) else: - if p.connected: - await p.conn.writeLp(line) + if c.connected: + await c.conn.writeLp(line) else: try: if line.startsWith("/") and "p2p" in line: - await p.dialPeer(line) - except: + await c.dialPeer(line) + except CatchableError as exc: echo &"unable to dial remote peer {line}" - echo getCurrentExceptionMsg() - -proc readWriteLoop(p: ChatProto) {.async.} = - await p.writeAndPrint() - -proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = - var chatproto = ChatProto(switch: switch, transp: transp, codecs: @[ChatCodec]) - - # create handler for incoming connection - proc handle(stream: Connection, proto: string) {.async.} = - if chatproto.connected and not chatproto.conn.closed: - echo "a chat session is already in progress - disconnecting!" - await stream.close() - else: - chatproto.conn = stream - chatproto.connected = true - await chatproto.readAndPrint() - - # assign the new handler - chatproto.handler = handle - return chatproto + echo exc.msg proc readInput(wfd: AsyncFD) {.thread.} = - ## This procedure performs reading from `stdin` and sends data over + ## This thread performs reading from `stdin` and sends data over ## pipe to main thread. let transp = fromPipe(wfd) @@ -150,36 +151,35 @@ proc readInput(wfd: AsyncFD) {.thread.} = let line = stdin.readLine() discard waitFor transp.write(line & "\r\n") -proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = - let transp = fromPipe(rfd) +proc main() {.async.} = + let + rng = newRng() # Single random number source for the whole application - let seckey = PrivateKey.random(RSA, rng[]).get() - var localAddress = DefaultAddr - while true: - echo &"Type an address to bind to or Enter to use the default {DefaultAddr}" - let a = await transp.readLine() - try: - if a.len > 0: - localAddress = a - break - # uise default - break - except: - echo "invalid address" - localAddress = DefaultAddr - continue + # Pipe to read stdin from main thread + (rfd, wfd) = createAsyncPipe() + stdinReader = fromPipe(rfd) + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + + var localAddress = MultiAddress.init(DefaultAddr).tryGet() var switch = SwitchBuilder - .init() - .withRng(rng) - .withPrivateKey(seckey) - .withAddress(MultiAddress.init(localAddress).tryGet()) + .new() + .withRng(rng) # Give the application RNG + .withAddress(localAddress) + .withTcpTransport() # Use TCP as transport + .withMplex() # Use Mplex as muxer + .withNoise() # Use Noise as secure manager .build() - let chatProto = newChatProto(switch, transp) - switch.mount(chatProto) + let chat = Chat( + switch: switch, + stdinReader: stdinReader) + + switch.mount(ChatProto.new(chat)) + let libp2pFuts = await switch.start() - chatProto.started = true let id = $switch.peerInfo.peerId echo "PeerID: " & id @@ -187,19 +187,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = for a in switch.peerInfo.addrs: echo &"{a}/p2p/{id}" - await chatProto.readWriteLoop() + await chat.readLoop() await allFuturesThrowing(libp2pFuts) -proc main() {.async.} = - let rng = newRng() # Singe random number source for the whole application - let (rfd, wfd) = createAsyncPipe() - if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: - raise newException(ValueError, "Could not initialize pipe!") - - var thread: Thread[AsyncFD] - thread.createThread(readInput, wfd) - - await processInput(rfd, rng) - -when isMainModule: # isMainModule = true when the module is compiled as the main file - waitFor(main()) +waitFor(main()) diff --git a/examples/helloworld.nim b/examples/helloworld.nim new file mode 100644 index 0000000..f679095 --- /dev/null +++ b/examples/helloworld.nim @@ -0,0 +1,92 @@ +import bearssl +import chronos # an efficient library for async +import stew/byteutils # various utils +import ../libp2p # when installed through nimble, just use `import libp2p` + +## +# Create our custom protocol +## +const TestCodec = "/test/proto/1.0.0" # custom protocol string identifier + +type + TestProto = ref object of LPProtocol # declare a custom protocol + +proc new(T: typedesc[TestProto]): T = + + # every incoming connections will be in handled in this closure + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + echo "Got from remote - ", string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("Roger p2p!") + + # We must close the connections ourselves when we're done with it + await conn.close() + + return T(codecs: @[TestCodec], handler: handle) + +## +# Helper to create a switch/node +## +proc createSwitch(ma: MultiAddress, rng: ref BrHmacDrbgContext): Switch = + var switch = SwitchBuilder + .new() + .withRng(rng) # Give the application RNG + .withAddress(ma) # Our local address(es) + .withTcpTransport() # Use TCP as transport + .withMplex() # Use Mplex as muxer + .withNoise() # Use Noise as secure manager + .build() + + result = switch + +## +# The actual application +## +proc main() {.async, gcsafe.} = + let + rng = newRng() # Single random number source for the whole application + # port 0 will take a random available port + # `tryGet` will throw an exception if the Multiaddress failed + # (for instance, if the address is not well formatted) + ma1 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + ma2 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + + # setup the custom proto + let testProto = TestProto.new() + + # setup the two nodes + let + switch1 = createSwitch(ma1, rng) #Create the two switches + switch2 = createSwitch(ma2, rng) + + # mount the proto on switch1 + # the node will now listen for this proto + # and call the handler everytime a client request it + switch1.mount(testProto) + + # Start the nodes. This will start the transports + # and listen on each local addresses + let + switch1Fut = await switch1.start() + switch2Fut = await switch2.start() + + # the node addrs is populated with it's + # actual port during the start + + # use the second node to dial the first node + # using the first node peerid and address + # and specify our custom protocol codec + let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec) + + # conn is now a fully setup connection, we talk directly to the node1 custom protocol handler + await conn.writeLp("Hello p2p!") # writeLp send a length prefixed buffer over the wire + + # readLp reads length prefixed bytes and returns a buffer without the prefix + echo "Remote responded with - ", string.fromBytes(await conn.readLp(1024)) + + # We must close the connection ourselves when we're done with it + await conn.close() + + await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports + await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown + +waitFor(main()) diff --git a/examples/tutorial_1_connect.md b/examples/tutorial_1_connect.md new file mode 100644 index 0000000..754683b --- /dev/null +++ b/examples/tutorial_1_connect.md @@ -0,0 +1,108 @@ +Hi all, welcome to the first article of the nim-libp2p's tutorial series! + +_This tutorial is for everyone who is interested in building peer-to-peer chatting applications. No Nim programming experience is needed._ + +To give you a quick overview, **Nim** is the programming language we are using and **nim-libp2p** is the Nim implementation of [libp2p](https://libp2p.io/), a modular library that enables the development of peer-to-peer network applications. + +Hope you'll find it helpful in your journey of learning. Happy coding! ;) + +# Before you start +The only prerequisite here is [Nim](https://nim-lang.org/), the programming language with a Python-like syntax and a performance similar to C. Detailed information can be found [here](https://nim-lang.org/docs/tut1.html). + +Install Nim via their official website: [https://nim-lang.org/install.html](https://nim-lang.org/install.html) +Check Nim's installation via `nim --version` and its package manager Nimble via `nimble --version`. + +You can now install the latest version of `nim-libp2p`: +```bash +nimble install libp2p@#master +``` + +# A simple ping application +We'll start by creating a simple application, which is starting two libp2p [switch](https://docs.libp2p.io/concepts/stream-multiplexing/#switch-swarm), and pinging each other using the [Ping](https://docs.libp2p.io/concepts/protocols/#ping) protocol. + +_TIP: You can extract the code from this tutorial by running `nim c -r tools/markdown_runner.nim examples/tutorial_1_connect.md` in the libp2p folder!_ + +Let's create a `part1.nim`, and import our dependencies: +```nim +import bearssl +import chronos + +import libp2p +import libp2p/protocols/ping +``` +[bearssl](https://github.com/status-im/nim-bearssl) is used as a [cryptographic pseudorandom number generator](https://en.wikipedia.org/wiki/Cryptographically-secure_pseudorandom_number_generator) +[chronos](https://github.com/status-im/nim-chronos) the asynchronous framework used by `nim-libp2p` + +Next, we'll create an helper procedure to create our switches. A switch needs a bit of configuration, and it will be easier to do this configuration only once: +```nim +proc createSwitch(ma: MultiAddress, rng: ref BrHmacDrbgContext): Switch = + var switch = SwitchBuilder + .new() + .withRng(rng) # Give the application RNG + .withAddress(ma) # Our local address(es) + .withTcpTransport() # Use TCP as transport + .withMplex() # Use Mplex as muxer + .withNoise() # Use Noise as secure manager + .build() + + return switch +``` +This will create a switch using [Mplex](https://docs.libp2p.io/concepts/stream-multiplexing/) as a multiplexer, Noise to secure the communication, and TCP as an underlying transport. + +You can of course tweak this, to use a different or multiple transport, or tweak the configuration of Mplex and Noise, but this is some sane defaults that we'll use going forward. + + +Let's now start to create our main procedure: +```nim +proc main() {.async, gcsafe.} = + let + rng = newRng() + localAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + pingProtocol = Ping.new(rng=rng) +``` +We created some variables that we'll need for the rest of the application: the global `rng` instance, our `localAddress`, and an instance of the `Ping` protocol. +The address is in the [MultiAddress](https://github.com/multiformats/multiaddr) format. The port `0` means "take any port available". + +`tryGet` is procedure which is part of the [nim-result](https://github.com/arnetheduck/nim-result/), that will throw an exception if the supplied MultiAddress is not valid. + +We can now create our two switches: +```nim + let + switch1 = createSwitch(localAddress, rng) + switch2 = createSwitch(localAddress, rng) + + switch1.mount(pingProtocol) + + let + switch1Fut = await switch1.start() + switch2Fut = await switch2.start() +``` +We've **mounted** the `pingProtocol` on our first switch. This means that the first switch will actually listen for any ping requests coming in, and handle them accordingly. + +Now that we've started the nodes, they are listening for incoming peers. +We can find out which port was attributed, and the resulting local addresses, by using `switch1.peerInfo.addrs`. + +We'll **dial** the first switch from the second one, by specifying it's **Peer ID**, it's **MultiAddress** and the **`Ping` protocol codec**: +```nim + let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, PingCodec) +``` +We now have a `Ping` connection setup between the second and the first switch, we can use it to actually ping the node: +```nim + # ping the other node and echo the ping duration + echo "ping: ", await pingProtocol.ping(conn) + + # We must close the connection ourselves when we're done with it + await conn.close() +``` + +And that's it! Just a little bit of cleanup: shutting down the switches, waiting for them to stop, and we'll call our `main` procedure: +```nim + await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports + await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown + +waitFor(main()) +``` + +You can now run this program using `nim c -r part1.nim`, and you should see the dialing sequence, ending with a ping output. + +In the [next tutorial](tutorial_2_customproto.md), we'll look at how to create our own custom protocol. diff --git a/examples/tutorial_2_customproto.md b/examples/tutorial_2_customproto.md new file mode 100644 index 0000000..8a5b9ea --- /dev/null +++ b/examples/tutorial_2_customproto.md @@ -0,0 +1,82 @@ +In the [previous tutorial](tutorial_1_connect.md), we've looked at how to create a simple ping program using the `nim-libp2p`. + +We'll now look at how to create a custom protocol inside the libp2p + +# Custom protocol in libp2p +Let's create a `part2.nim`, and import our dependencies: +```nim +import bearssl +import chronos +import stew/byteutils + +import libp2p +``` +This is similar to the first tutorial, except we don't need to import the `Ping` protocol. + +Next, we'll declare our custom protocol +```nim +const TestCodec = "/test/proto/1.0.0" + +type TestProto = ref object of LPProtocol +``` + +We've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`. In a more complex protocol, we could use this structure to store interesting variables. + +A protocol generally has two part: and handling/server part, and a dialing/client part. +Theses two parts can be identical, but in our trivial protocol, the server will wait for a message from the client, and the client will send a message, so we have to handle the two cases separately. + +Let's start with the server part: +```nim +proc new(T: typedesc[TestProto]): T = + # every incoming connections will in be handled in this closure + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + echo "Got from remote - ", string.fromBytes(await conn.readLp(1024)) + # We must close the connections ourselves when we're done with it + await conn.close() + + return T(codecs: @[TestCodec], handler: handle) +``` +This is a constructor for our `TestProto`, that will specify our `codecs` and a `handler`, which will be called for each incoming peer asking for this protocol. +In our handle, we simply read a message from the connection and `echo` it. + +We can now create our client part: +```nim +proc hello(p: TestProto, conn: Connection) {.async.} = + await conn.writeLp("Hello p2p!") +``` +Again, pretty straight-forward, we just send a message on the connection. + +We can now create our main procedure: +```nim +proc main() {.async, gcsafe.} = + let + rng = newRng() + testProto = TestProto.new() + switch1 = newStandardSwitch(rng=rng) + switch2 = newStandardSwitch(rng=rng) + + switch1.mount(testProto) + + let + switch1Fut = await switch1.start() + switch2Fut = await switch2.start() + + conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec) + + await testProto.hello(conn) + + # We must close the connection ourselves when we're done with it + await conn.close() + + await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports + await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown +``` + +This is very similar to the first tutorial's `main`, the only noteworthy difference is that we use `newStandardSwitch`, which is similar to `createSwitch` but is bundled directly in libp2p + +We can now wrap our program by calling our main proc: +```nim +waitFor(main()) +``` + +And that's it! diff --git a/libp2p.nimble b/libp2p.nimble index 21e1782..af37ab9 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -16,11 +16,12 @@ requires "nim >= 1.2.0", "metrics", "secp256k1", "stew#head", - "https://github.com/status-im/nim-websock" + "websock" proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = - var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics --verbosity:0 --hints:off" + let env_nimflags = getEnv("NIMFLAGS") + var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics --verbosity:0 --hints:off " & env_nimflags excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_verify=" & $verify) @@ -32,12 +33,18 @@ proc runTest(filename: string, verify: bool = true, sign: bool = true, exec excstr & " -d:chronicles_log_level=INFO -r" & " tests/" & filename rmFile "tests/" & filename.toExe -proc buildSample(filename: string) = +proc buildSample(filename: string, run = false) = var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off" excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") excstr.add(" examples/" & filename) exec excstr - rmFile "examples" & filename.toExe + if run: + exec "./examples/" & filename.toExe + rmFile "examples/" & filename.toExe + +proc buildTutorial(filename: string) = + discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " & + " nim --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off c -" task testnative, "Runs libp2p native tests": runTest("testnative") @@ -74,6 +81,7 @@ task test, "Runs the test suite": exec "nimble testdaemon" exec "nimble testinterop" exec "nimble testfilter" + exec "nimble examples_build" task test_slim, "Runs the test suite": exec "nimble testnative" @@ -83,3 +91,6 @@ task test_slim, "Runs the test suite": task examples_build, "Build the samples": buildSample("directchat") + buildSample("helloworld", true) + buildTutorial("examples/tutorial_1_connect.md") + buildTutorial("examples/tutorial_2_customproto.md") diff --git a/libp2p/builders.nim b/libp2p/builders.nim index f5ec7a7..f722c66 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -80,7 +80,7 @@ proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuild proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder = proc newMuxer(conn: Connection): Muxer = - Mplex.init( + Mplex.new( conn, inTimeout = inTimeout, outTimeout = outTimeout) @@ -151,7 +151,7 @@ proc build*(b: SwitchBuilder): Switch secureManagerInstances.add(Noise.new(b.rng, seckey).Secure) let - peerInfo = PeerInfo.init( + peerInfo = PeerInfo.new( seckey, b.addresses, protoVersion = b.protoVersion, @@ -166,9 +166,9 @@ proc build*(b: SwitchBuilder): Switch let identify = Identify.new(peerInfo) - connManager = ConnManager.init(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut) + connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut) ms = MultistreamSelect.new() - muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagerInstances, connManager, ms) + muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms) let transports = block: diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 56c6736..3f83055 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -84,7 +84,7 @@ type proc newTooManyConnectionsError(): ref TooManyConnectionsError {.inline.} = result = newException(TooManyConnectionsError, "Too many connections") -proc init*(C: type ConnManager, +proc new*(C: type ConnManager, maxConnsPerPeer = MaxConnectionsPerPeer, maxConnections = MaxConnections, maxIn = -1, diff --git a/libp2p/crypto/crypto.nim b/libp2p/crypto/crypto.nim index 7a1c4fb..7301bbb 100644 --- a/libp2p/crypto/crypto.nim +++ b/libp2p/crypto/crypto.nim @@ -174,6 +174,19 @@ proc newRng*(): ref BrHmacDrbgContext = return nil rng +proc shuffle*[T]( + rng: ref BrHmacDrbgContext, + x: var openArray[T]) = + + var randValues = newSeqUninitialized[byte](len(x) * 2) + brHmacDrbgGenerate(rng[], randValues) + + for i in countdown(x.high, 1): + let + rand = randValues[i * 2].int32 or (randValues[i * 2 + 1].int32 shl 8) + y = rand mod i + swap(x[i], x[y]) + proc random*(T: typedesc[PrivateKey], scheme: PKScheme, rng: var BrHmacDrbgContext, bits = RsaDefaultKeySize): CryptoResult[PrivateKey] = @@ -331,9 +344,6 @@ proc getPublicKey*(key: PrivateKey): CryptoResult[PublicKey] = else: err(SchemeError) -proc getKey*(key: PrivateKey): CryptoResult[PublicKey] {.deprecated: "use getPublicKey".} = - key.getPublicKey() - proc toRawBytes*(key: PrivateKey | PublicKey, data: var openarray[byte]): CryptoResult[int] = ## Serialize private key ``key`` (using scheme's own serialization) and store @@ -1013,39 +1023,6 @@ proc write*(pb: var ProtoBuffer, field: int, sig: Signature) {. inline, raises: [Defect].} = write(pb, field, sig.getBytes()) -proc initProtoField*(index: int, key: PublicKey|PrivateKey): ProtoField {. - deprecated, raises: [Defect, ResultError[CryptoError]].} = - ## Initialize ProtoField with PublicKey/PrivateKey ``key``. - result = initProtoField(index, key.getBytes().tryGet()) - -proc initProtoField*(index: int, sig: Signature): ProtoField {.deprecated.} = - ## Initialize ProtoField with Signature ``sig``. - result = initProtoField(index, sig.getBytes()) - -proc getValue*[T: PublicKey|PrivateKey](data: var ProtoBuffer, field: int, - value: var T): int {.deprecated.} = - ## Read PublicKey/PrivateKey from ProtoBuf's message and validate it. - var buf: seq[byte] - var key: PublicKey - result = getLengthValue(data, field, buf) - if result > 0: - if not key.init(buf): - result = -1 - else: - value = key - -proc getValue*(data: var ProtoBuffer, field: int, value: var Signature): int {. - deprecated.} = - ## Read ``Signature`` from ProtoBuf's message and validate it. - var buf: seq[byte] - var sig: Signature - result = getLengthValue(data, field, buf) - if result > 0: - if not sig.init(buf): - result = -1 - else: - value = sig - proc getField*[T: PublicKey|PrivateKey](pb: ProtoBuffer, field: int, value: var T): ProtoResult[bool] = ## Deserialize public/private key from protobuf's message ``pb`` using field diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index de4f835..3ae31f5 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -10,7 +10,7 @@ {.push raises: [Defect].} ## This module implementes API for `go-libp2p-daemon`. -import std/[os, osproc, strutils, tables, strtabs] +import std/[os, osproc, strutils, tables, strtabs, sequtils] import pkg/[chronos, chronicles] import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid import ../wire, ../multihash, ../protobuf/minprotobuf, ../errors @@ -35,7 +35,7 @@ type Critical, Error, Warning, Notice, Info, Debug, Trace RequestType* {.pure.} = enum - IDENTITY = 0, + IDENTIFY = 0, CONNECT = 1, STREAM_OPEN = 2, STREAM_HANDLER = 3, @@ -167,7 +167,7 @@ proc requestIdentity(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doIdentify(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) - result.write(initProtoField(1, cast[uint](RequestType.IDENTITY))) + result.write(1, cast[uint](RequestType.IDENTIFY)) result.finish() proc requestConnect(peerid: PeerID, @@ -177,13 +177,13 @@ proc requestConnect(peerid: PeerID, ## Processing function `doConnect(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, peerid)) + msg.write(1, peerid) for item in addresses: - msg.write(initProtoField(2, item.data.buffer)) + msg.write(2, item.data.buffer) if timeout > 0: - msg.write(initProtoField(3, hint64(timeout))) - result.write(initProtoField(1, cast[uint](RequestType.CONNECT))) - result.write(initProtoField(2, msg)) + msg.write(3, hint64(timeout)) + result.write(1, cast[uint](RequestType.CONNECT)) + result.write(2, msg) result.finish() proc requestDisconnect(peerid: PeerID): ProtoBuffer = @@ -191,9 +191,9 @@ proc requestDisconnect(peerid: PeerID): ProtoBuffer = ## Processing function `doDisconnect(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, peerid)) - result.write(initProtoField(1, cast[uint](RequestType.DISCONNECT))) - result.write(initProtoField(7, msg)) + msg.write(1, peerid) + result.write(1, cast[uint](RequestType.DISCONNECT)) + result.write(7, msg) result.finish() proc requestStreamOpen(peerid: PeerID, @@ -203,13 +203,13 @@ proc requestStreamOpen(peerid: PeerID, ## Processing function `doStreamOpen(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, peerid)) + msg.write(1, peerid) for item in protocols: - msg.write(initProtoField(2, item)) + msg.write(2, item) if timeout > 0: - msg.write(initProtoField(3, hint64(timeout))) - result.write(initProtoField(1, cast[uint](RequestType.STREAM_OPEN))) - result.write(initProtoField(3, msg)) + msg.write(3, hint64(timeout)) + result.write(1, cast[uint](RequestType.STREAM_OPEN)) + result.write(3, msg) result.finish() proc requestStreamHandler(address: MultiAddress, @@ -218,18 +218,18 @@ proc requestStreamHandler(address: MultiAddress, ## Processing function `doStreamHandler(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, address.data.buffer)) + msg.write(1, address.data.buffer) for item in protocols: - msg.write(initProtoField(2, item)) - result.write(initProtoField(1, cast[uint](RequestType.STREAM_HANDLER))) - result.write(initProtoField(4, msg)) + msg.write(2, item) + result.write(1, cast[uint](RequestType.STREAM_HANDLER)) + result.write(4, msg) result.finish() proc requestListPeers(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doListPeers(req *pb.Request)` result = initProtoBuffer({WithVarintLength}) - result.write(initProtoField(1, cast[uint](RequestType.LIST_PEERS))) + result.write(1, cast[uint](RequestType.LIST_PEERS)) result.finish() proc requestDHTFindPeer(peer: PeerID, timeout = 0): ProtoBuffer = @@ -238,13 +238,13 @@ proc requestDHTFindPeer(peer: PeerID, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.FIND_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, peer)) + msg.write(1, msgid) + msg.write(2, peer) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTFindPeersConnectedToPeer(peer: PeerID, @@ -254,13 +254,13 @@ proc requestDHTFindPeersConnectedToPeer(peer: PeerID, let msgid = cast[uint](DHTRequestType.FIND_PEERS_CONNECTED_TO_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, peer)) + msg.write(1, msgid) + msg.write(2, peer) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTFindProviders(cid: Cid, @@ -270,14 +270,14 @@ proc requestDHTFindProviders(cid: Cid, let msgid = cast[uint](DHTRequestType.FIND_PROVIDERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(3, cid.data.buffer)) - msg.write(initProtoField(6, count)) + msg.write(1, msgid) + msg.write(3, cid.data.buffer) + msg.write(6, count) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer = @@ -286,13 +286,13 @@ proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.GET_CLOSEST_PEERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(4, key)) + msg.write(1, msgid) + msg.write(4, key) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTGetPublicKey(peer: PeerID, timeout = 0): ProtoBuffer = @@ -301,13 +301,13 @@ proc requestDHTGetPublicKey(peer: PeerID, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.GET_PUBLIC_KEY) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, peer)) + msg.write(1, msgid) + msg.write(2, peer) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer = @@ -316,13 +316,13 @@ proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.GET_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(4, key)) + msg.write(1, msgid) + msg.write(4, key) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer = @@ -331,13 +331,13 @@ proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.SEARCH_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(4, key)) + msg.write(1, msgid) + msg.write(4, key) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTPutValue(key: string, value: openarray[byte], @@ -347,14 +347,14 @@ proc requestDHTPutValue(key: string, value: openarray[byte], let msgid = cast[uint](DHTRequestType.PUT_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(4, key)) - msg.write(initProtoField(5, value)) + msg.write(1, msgid) + msg.write(4, key) + msg.write(5, value) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer = @@ -363,13 +363,13 @@ proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer = let msgid = cast[uint](DHTRequestType.PROVIDE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(3, cid.data.buffer)) + msg.write(1, msgid) + msg.write(3, cid.data.buffer) if timeout > 0: - msg.write(initProtoField(7, hint64(timeout))) + msg.write(7, hint64(timeout)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.DHT))) - result.write(initProtoField(5, msg)) + result.write(1, cast[uint](RequestType.DHT)) + result.write(5, msg) result.finish() proc requestCMTagPeer(peer: PeerID, tag: string, weight: int): ProtoBuffer = @@ -377,13 +377,13 @@ proc requestCMTagPeer(peer: PeerID, tag: string, weight: int): ProtoBuffer = let msgid = cast[uint](ConnManagerRequestType.TAG_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, peer)) - msg.write(initProtoField(3, tag)) - msg.write(initProtoField(4, hint64(weight))) + msg.write(1, msgid) + msg.write(2, peer) + msg.write(3, tag) + msg.write(4, hint64(weight)) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER))) - result.write(initProtoField(6, msg)) + result.write(1, cast[uint](RequestType.CONNMANAGER)) + result.write(6, msg) result.finish() proc requestCMUntagPeer(peer: PeerID, tag: string): ProtoBuffer = @@ -391,12 +391,12 @@ proc requestCMUntagPeer(peer: PeerID, tag: string): ProtoBuffer = let msgid = cast[uint](ConnManagerRequestType.UNTAG_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, peer)) - msg.write(initProtoField(3, tag)) + msg.write(1, msgid) + msg.write(2, peer) + msg.write(3, tag) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER))) - result.write(initProtoField(6, msg)) + result.write(1, cast[uint](RequestType.CONNMANAGER)) + result.write(6, msg) result.finish() proc requestCMTrim(): ProtoBuffer = @@ -404,10 +404,10 @@ proc requestCMTrim(): ProtoBuffer = let msgid = cast[uint](ConnManagerRequestType.TRIM) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) + msg.write(1, msgid) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER))) - result.write(initProtoField(6, msg)) + result.write(1, cast[uint](RequestType.CONNMANAGER)) + result.write(6, msg) result.finish() proc requestPSGetTopics(): ProtoBuffer = @@ -416,10 +416,10 @@ proc requestPSGetTopics(): ProtoBuffer = let msgid = cast[uint](PSRequestType.GET_TOPICS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) + msg.write(1, msgid) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) - result.write(initProtoField(8, msg)) + result.write(1, cast[uint](RequestType.PUBSUB)) + result.write(8, msg) result.finish() proc requestPSListPeers(topic: string): ProtoBuffer = @@ -428,11 +428,11 @@ proc requestPSListPeers(topic: string): ProtoBuffer = let msgid = cast[uint](PSRequestType.LIST_PEERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, topic)) + msg.write(1, msgid) + msg.write(2, topic) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) - result.write(initProtoField(8, msg)) + result.write(1, cast[uint](RequestType.PUBSUB)) + result.write(8, msg) result.finish() proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer = @@ -441,12 +441,12 @@ proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer = let msgid = cast[uint](PSRequestType.PUBLISH) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, topic)) - msg.write(initProtoField(3, data)) + msg.write(1, msgid) + msg.write(2, topic) + msg.write(3, data) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) - result.write(initProtoField(8, msg)) + result.write(1, cast[uint](RequestType.PUBSUB)) + result.write(8, msg) result.finish() proc requestPSSubscribe(topic: string): ProtoBuffer = @@ -455,25 +455,26 @@ proc requestPSSubscribe(topic: string): ProtoBuffer = let msgid = cast[uint](PSRequestType.SUBSCRIBE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() - msg.write(initProtoField(1, msgid)) - msg.write(initProtoField(2, topic)) + msg.write(1, msgid) + msg.write(2, topic) msg.finish() - result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) - result.write(initProtoField(8, msg)) + result.write(1, cast[uint](RequestType.PUBSUB)) + result.write(8, msg) result.finish() -proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} = +proc checkResponse(pb: ProtoBuffer): ResponseKind {.inline.} = result = ResponseKind.Malformed var value: uint64 - if getVarintValue(pb, 1, value) > 0: + if getRequiredField(pb, 1, value).isOk(): if value == 0: result = ResponseKind.Success else: result = ResponseKind.Error -proc getErrorMessage(pb: var ProtoBuffer): string {.inline, raises: [Defect, DaemonLocalError].} = - if pb.enterSubmessage() == cast[int](ResponseType.ERROR): - if pb.getString(1, result) == -1: +proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [Defect, DaemonLocalError].} = + var error: seq[byte] + if pb.getRequiredField(ResponseType.ERROR.int, error).isOk(): + if initProtoBuffer(error).getRequiredField(1, result).isErr(): raise newException(DaemonLocalError, "Error message is missing!") proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = @@ -830,26 +831,14 @@ proc transactMessage(transp: StreamTransport, raise newException(DaemonLocalError, "Incorrect or empty message received!") result = initProtoBuffer(message) -proc getPeerInfo(pb: var ProtoBuffer): PeerInfo +proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [Defect, DaemonLocalError].} = ## Get PeerInfo object from ``pb``. result.addresses = newSeq[MultiAddress]() - if pb.getValue(1, result.peer) == -1: - raise newException(DaemonLocalError, "Missing required field `peer`!") + if pb.getRequiredField(1, result.peer).isErr(): + raise newException(DaemonLocalError, "Incorrect or empty message received!") - var address = newSeq[byte]() - while pb.getBytes(2, address) != -1: - if len(address) != 0: - var copyaddr = address - let addrRes = MultiAddress.init(copyaddr) - - # TODO: for some reason `toException` doesn't - # work for this module - if addrRes.isErr: - raise newException(DaemonLocalError, addrRes.error) - - result.addresses.add(addrRes.get()) - address.setLen(0) + discard pb.getRepeatedField(2, result.addresses) proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = ## Get Node identity information @@ -857,9 +846,10 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = try: var pb = await transactMessage(transp, requestIdentity()) pb.withMessage() do: - let res = pb.enterSubmessage() - if res == cast[int](ResponseType.IDENTITY): - result = pb.getPeerInfo() + var res: seq[byte] + if pb.getRequiredField(ResponseType.IDENTITY.int, res).isOk(): + var resPb = initProtoBuffer(res) + result = getPeerInfo(resPb) finally: await api.closeConnection(transp) @@ -897,18 +887,16 @@ proc openStream*(api: DaemonAPI, peer: PeerID, var pb = await transp.transactMessage(requestStreamOpen(peer, protocols, timeout)) pb.withMessage() do: - var res = pb.enterSubmessage() - if res == cast[int](ResponseType.STREAMINFO): + var res: seq[byte] + if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk(): + let resPb = initProtoBuffer(res) # stream.peer = newSeq[byte]() var raddress = newSeq[byte]() stream.protocol = "" - if pb.getValue(1, stream.peer) == -1: - raise newException(DaemonLocalError, "Missing `peer` field!") - if pb.getLengthValue(2, raddress) == -1: - raise newException(DaemonLocalError, "Missing `address` field!") + resPb.getRequiredField(1, stream.peer).tryGet() + resPb.getRequiredField(2, raddress).tryGet() stream.raddress = MultiAddress.init(raddress).tryGet() - if pb.getLengthValue(3, stream.protocol) == -1: - raise newException(DaemonLocalError, "Missing `proto` field!") + resPb.getRequiredField(3, stream.protocol).tryGet() stream.flags.incl(Outbound) stream.transp = transp result = stream @@ -923,13 +911,10 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = var stream = new P2PStream var raddress = newSeq[byte]() stream.protocol = "" - if pb.getValue(1, stream.peer) == -1: - raise newException(DaemonLocalError, "Missing `peer` field!") - if pb.getLengthValue(2, raddress) == -1: - raise newException(DaemonLocalError, "Missing `address` field!") + pb.getRequiredField(1, stream.peer).tryGet() + pb.getRequiredField(2, raddress).tryGet() stream.raddress = MultiAddress.init(raddress).tryGet() - if pb.getLengthValue(3, stream.protocol) == -1: - raise newException(DaemonLocalError, "Missing `proto` field!") + pb.getRequiredField(3, stream.protocol).tryGet() stream.flags.incl(Inbound) stream.transp = transp if len(stream.protocol) > 0: @@ -968,14 +953,11 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = var pb = await transp.transactMessage(requestListPeers()) pb.withMessage() do: result = newSeq[PeerInfo]() - var res = pb.enterSubmessage() - while res != 0: - if res == cast[int](ResponseType.PEERINFO): - var peer = pb.getPeerInfo() + var ress: seq[seq[byte]] + if pb.getRequiredRepeatedField(ResponseType.PEERINFO.int, ress).isOk(): + for p in ress: + let peer = initProtoBuffer(p).getPeerInfo() result.add(peer) - else: - pb.skipSubmessage() - res = pb.enterSubmessage() finally: await api.closeConnection(transp) @@ -1010,51 +992,61 @@ proc cmTrimPeers*(api: DaemonAPI) {.async.} = finally: await api.closeConnection(transp) -proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo +proc dhtGetSinglePeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [Defect, DaemonLocalError].} = - if pb.enterSubmessage() == 2: - result = pb.getPeerInfo() + var res: seq[byte] + if pb.getRequiredField(2, res).isOk(): + result = initProtoBuffer(res).getPeerInfo() else: raise newException(DaemonLocalError, "Missing required field `peer`!") -proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte] +proc dhtGetSingleValue(pb: ProtoBuffer): seq[byte] {.raises: [Defect, DaemonLocalError].} = result = newSeq[byte]() - if pb.getLengthValue(3, result) == -1: + if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") -proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey +proc dhtGetSinglePublicKey(pb: ProtoBuffer): PublicKey {.raises: [Defect, DaemonLocalError].} = - if pb.getValue(3, result) == -1: + if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") -proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID +proc dhtGetSinglePeerID(pb: ProtoBuffer): PeerID {.raises: [Defect, DaemonLocalError].} = - if pb.getValue(3, result) == -1: + if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") -proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) +proc enterDhtMessage(pb: ProtoBuffer, rt: DHTResponseType): Protobuffer {.inline, raises: [Defect, DaemonLocalError].} = - var dtype: uint - var res = pb.enterSubmessage() - if res == cast[int](ResponseType.DHT): - if pb.getVarintValue(1, dtype) == 0: + var dhtResponse: seq[byte] + if pb.getRequiredField(ResponseType.DHT.int, dhtResponse).isOk(): + var pbDhtResponse = initProtoBuffer(dhtResponse) + var dtype: uint + if pbDhtResponse.getRequiredField(1, dtype).isErr(): raise newException(DaemonLocalError, "Missing required DHT field `type`!") if dtype != cast[uint](rt): raise newException(DaemonLocalError, "Wrong DHT answer type! ") + + var value: seq[byte] + if pbDhtResponse.getRequiredField(3, value).isErr(): + raise newException(DaemonLocalError, "Missing required DHT field `value`!") + + return initProtoBuffer(value) else: raise newException(DaemonLocalError, "Wrong message type!") -proc enterPsMessage(pb: var ProtoBuffer) +proc enterPsMessage(pb: ProtoBuffer): ProtoBuffer {.inline, raises: [Defect, DaemonLocalError].} = - var res = pb.enterSubmessage() - if res != cast[int](ResponseType.PUBSUB): + var res: seq[byte] + if pb.getRequiredField(ResponseType.PUBSUB.int, res).isErr(): raise newException(DaemonLocalError, "Wrong message type!") -proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType + initProtoBuffer(res) + +proc getDhtMessageType(pb: ProtoBuffer): DHTResponseType {.inline, raises: [Defect, DaemonLocalError].} = var dtype: uint - if pb.getVarintValue(1, dtype) == 0: + if pb.getRequiredField(1, dtype).isErr(): raise newException(DaemonLocalError, "Missing required DHT field `type`!") if dtype == cast[uint](DHTResponseType.VALUE): result = DHTResponseType.VALUE @@ -1073,8 +1065,7 @@ proc dhtFindPeer*(api: DaemonAPI, peer: PeerID, try: var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout)) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.VALUE) - result = pb.dhtGetSinglePeerInfo() + result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePeerInfo() finally: await api.closeConnection(transp) @@ -1088,8 +1079,7 @@ proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID, try: var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout)) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.VALUE) - result = pb.dhtGetSinglePublicKey() + result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePublicKey() finally: await api.closeConnection(transp) @@ -1103,8 +1093,7 @@ proc dhtGetValue*(api: DaemonAPI, key: string, try: var pb = await transp.transactMessage(requestDHTGetValue(key, timeout)) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.VALUE) - result = pb.dhtGetSingleValue() + result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSingleValue() finally: await api.closeConnection(transp) @@ -1148,7 +1137,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID, let spb = requestDHTFindPeersConnectedToPeer(peer, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.BEGIN) + discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: @@ -1173,7 +1162,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, let spb = requestDHTGetClosestPeers(key, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.BEGIN) + discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: @@ -1198,7 +1187,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: Cid, count: uint32, let spb = requestDHTFindProviders(cid, count, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.BEGIN) + discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: @@ -1222,7 +1211,7 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, try: var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout)) withMessage(pb) do: - pb.enterDhtMessage(DHTResponseType.BEGIN) + discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: @@ -1241,12 +1230,9 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = try: var pb = await transp.transactMessage(requestPSGetTopics()) withMessage(pb) do: - pb.enterPsMessage() + let innerPb = pb.enterPsMessage() var topics = newSeq[string]() - var topic = "" - while pb.getString(1, topic) != -1: - topics.add(topic) - topic.setLen(0) + discard innerPb.getRepeatedField(1, topics) result = topics finally: await api.closeConnection(transp) @@ -1260,11 +1246,10 @@ proc pubsubListPeers*(api: DaemonAPI, var pb = await transp.transactMessage(requestPSListPeers(topic)) withMessage(pb) do: var peer: PeerID - pb.enterPsMessage() - var peers = newSeq[PeerID]() - while pb.getValue(2, peer) != -1: - peers.add(peer) - result = peers + let innerPb = pb.enterPsMessage() + var peers = newSeq[seq[byte]]() + discard innerPb.getRepeatedField(2, peers) + result = peers.mapIt(PeerId.init(it).get()) finally: await api.closeConnection(transp) @@ -1279,24 +1264,15 @@ proc pubsubPublish*(api: DaemonAPI, topic: string, finally: await api.closeConnection(transp) -proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage = +proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage = result.data = newSeq[byte]() result.seqno = newSeq[byte]() - discard pb.getValue(1, result.peer) - discard pb.getBytes(2, result.data) - discard pb.getBytes(3, result.seqno) - var item = newSeq[byte]() - while true: - if pb.getBytes(4, item) == -1: - break - var copyitem = item - var stritem = cast[string](copyitem) - if len(result.topics) == 0: - result.topics = newSeq[string]() - result.topics.add(stritem) - item.setLen(0) - discard pb.getValue(5, result.signature) - discard pb.getValue(6, result.key) + discard pb.getField(1, result.peer) + discard pb.getField(2, result.data) + discard pb.getField(3, result.seqno) + discard pb.getRepeatedField(4, result.topics) + discard pb.getField(5, result.signature) + discard pb.getField(6, result.key) proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = while true: diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index b952b75..e1c1709 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -20,6 +20,7 @@ import dial, connmanager, stream/connection, transports/transport, + nameresolving/nameresolver, errors export dial, errors @@ -41,6 +42,7 @@ type connManager: ConnManager dialLock: Table[PeerID, AsyncLock] transports: seq[Transport] + nameResolver: NameResolver proc dialAndUpgrade( self: Dialer, @@ -49,58 +51,63 @@ proc dialAndUpgrade( Future[Connection] {.async.} = debug "Dialing peer", peerId - # Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x. - var - transport: Transport - address: MultiAddress + for address in addrs: # for each address + let + hostname = address.getHostname() + resolvedAddresses = + if isNil(self.nameResolver): @[address] + else: await self.nameResolver.resolveMAddress(address) - for t in self.transports: # for each transport - transport = t - for a in addrs: # for each address - address = a - if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a, peerId - let dialed = try: - libp2p_total_dial_attempts.inc() - # await a connection slot when the total - # connection count is equal to `maxConns` - await self.connManager.trackOutgoingConn( - () => transport.dial(address) - ) - except TooManyConnectionsError as exc: - trace "Connection limit reached!" - raise exc - except CancelledError as exc: - debug "Dialing canceled", msg = exc.msg, peerId - raise exc - except CatchableError as exc: - debug "Dialing failed", msg = exc.msg, peerId - libp2p_failed_dials.inc() - continue # Try the next address + for a in resolvedAddresses: # for each resolved address + for transport in self.transports: # for each transport + if transport.handles(a): # check if it can dial it + trace "Dialing address", address = $a, peerId, hostname + let dialed = try: + libp2p_total_dial_attempts.inc() + # await a connection slot when the total + # connection count is equal to `maxConns` + # + # Need to copy to avoid "cannot be captured" errors in Nim-1.4.x. + let + transportCopy = transport + addressCopy = a + await self.connManager.trackOutgoingConn( + () => transportCopy.dial(hostname, addressCopy) + ) + except TooManyConnectionsError as exc: + trace "Connection limit reached!" + raise exc + except CancelledError as exc: + debug "Dialing canceled", msg = exc.msg, peerId + raise exc + except CatchableError as exc: + debug "Dialing failed", msg = exc.msg, peerId + libp2p_failed_dials.inc() + continue # Try the next address - # make sure to assign the peer to the connection - dialed.peerId = peerId + # make sure to assign the peer to the connection + dialed.peerId = peerId - # also keep track of the connection's bottom unsafe transport direction - # required by gossipsub scoring - dialed.transportDir = Direction.Out + # also keep track of the connection's bottom unsafe transport direction + # required by gossipsub scoring + dialed.transportDir = Direction.Out - libp2p_successful_dials.inc() + libp2p_successful_dials.inc() - let conn = try: - await transport.upgradeOutgoing(dialed) - except CatchableError as exc: - # If we failed to establish the connection through one transport, - # we won't succeeded through another - no use in trying again - await dialed.close() - debug "Upgrade failed", msg = exc.msg, peerId - if exc isnot CancelledError: - libp2p_failed_upgrades_outgoing.inc() - raise exc + let conn = try: + await transport.upgradeOutgoing(dialed) + except CatchableError as exc: + # If we failed to establish the connection through one transport, + # we won't succeeded through another - no use in trying again + await dialed.close() + debug "Upgrade failed", msg = exc.msg, peerId + if exc isnot CancelledError: + libp2p_failed_upgrades_outgoing.inc() + raise exc - doAssert not isNil(conn), "connection died after upgradeOutgoing" - debug "Dial successful", conn, peerId = conn.peerId - return conn + doAssert not isNil(conn), "connection died after upgradeOutgoing" + debug "Dial successful", conn, peerId = conn.peerId + return conn proc internalConnect( self: Dialer, @@ -234,9 +241,11 @@ proc new*( localPeerId: PeerId, connManager: ConnManager, transports: seq[Transport], - ms: MultistreamSelect): Dialer = + ms: MultistreamSelect, + nameResolver: NameResolver = nil): Dialer = T(localPeerId: localPeerId, connManager: connManager, transports: transports, - ms: ms) + ms: ms, + nameResolver: nameResolver) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 322a499..096143a 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -426,7 +426,7 @@ const Unreliable* = mapOr(UDP) - Reliable* = mapOr(TCP, UTP, QUIC) + Reliable* = mapOr(TCP, UTP, QUIC, WebSockets) IPFS* = mapAnd(Reliable, mapEq("p2p")) @@ -944,59 +944,6 @@ proc `==`*(m1: var MultiAddress, m2: MultiAddress): bool = ## Check of two MultiAddress are equal m1.data == m2.data -proc isWire*(ma: MultiAddress): bool = - ## Returns ``true`` if MultiAddress ``ma`` is one of: - ## - {IP4}/{TCP, UDP} - ## - {IP6}/{TCP, UDP} - ## - {UNIX}/{PATH} - - var state = 0 - const - wireProtocols = toHashSet([ - multiCodec("ip4"), multiCodec("ip6"), - ]) - wireTransports = toHashSet([ - multiCodec("tcp"), multiCodec("udp") - ]) - try: - for rpart in ma.items(): - if rpart.isErr(): - return false - let part = rpart.get() - - if state == 0: - let rcode = part.protoCode() - if rcode.isErr(): - return false - let code = rcode.get() - - if code in wireProtocols: - inc(state) - continue - elif code == multiCodec("unix"): - result = true - break - else: - result = false - break - elif state == 1: - let rcode = part.protoCode() - if rcode.isErr(): - return false - let code = rcode.get() - - if code in wireTransports: - inc(state) - result = true - else: - result = false - break - else: - result = false - break - except: - result = false - proc matchPart(pat: MaPattern, protos: seq[MultiCodec]): MaPatResult = var empty: seq[MultiCodec] var pcs = protos @@ -1073,7 +1020,7 @@ proc getField*(pb: var ProtoBuffer, field: int, else: err(ProtoError.IncorrectBlob) -proc getRepeatedField*(pb: var ProtoBuffer, field: int, +proc getRepeatedField*(pb: ProtoBuffer, field: int, value: var seq[MultiAddress]): ProtoResult[bool] {. inline.} = var items: seq[seq[byte]] diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 2ce2fc9..95e40c0 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -43,9 +43,6 @@ type proc new*(T: typedesc[MultistreamSelect]): T = T(codec: MSCodec) -proc newMultistream*(): MultistreamSelect {.deprecated: "use MultistreamSelect.new".} = - MultistreamSelect.new() - template validateSuffix(str: string): untyped = if str.endsWith("\n"): str.removeSuffix("\n") diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 7d75942..d6eda76 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -192,7 +192,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = await m.close() trace "Stopped mplex handler", m -proc init*(M: type Mplex, +proc new*(M: type Mplex, conn: Connection, inTimeout, outTimeout: Duration = DefaultChanTimeout, maxChannCount: int = MaxChannelCount): Mplex = diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 9ecdba9..670a5c7 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -58,9 +58,6 @@ proc new*( muxerProvider.init() muxerProvider -proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe, deprecated: "use MuxerProvider.new".} = - MuxerProvider.new(creator, codec) - method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = trace "starting muxer handler", proto=proto, conn diff --git a/libp2p/nameresolving/nameresolver.nim b/libp2p/nameresolving/nameresolver.nim index 38efcde..6f1e975 100644 --- a/libp2p/nameresolving/nameresolver.nim +++ b/libp2p/nameresolving/nameresolver.nim @@ -40,12 +40,10 @@ method resolveIp*( doAssert(false, "Not implemented!") -proc getHostname(ma: MultiAddress): string = - var dnsbuf = newSeq[byte](256) - - let dnsLen = ma[0].get().protoArgument(dnsbuf).get() - dnsbuf.setLen(dnsLen) - return string.fromBytes(dnsbuf) +proc getHostname*(ma: MultiAddress): string = + let firstPart = ($ma[0].get()).split('/') + if firstPart.len > 1: firstPart[2] + else: "" proc resolveDnsAddress( self: NameResolver, @@ -122,27 +120,26 @@ proc resolveDnsAddr( return result -proc resolveMAddresses*( +proc resolveMAddress*( self: NameResolver, - addrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.async.} = + address: MultiAddress): Future[seq[MultiAddress]] {.async.} = var res = initOrderedSet[MultiAddress]() - for address in addrs: - if not DNS.matchPartial(address): - res.incl(address) - else: - let code = address[0].get().protoCode().get() - let seq = case code: - of multiCodec("dns"): - await self.resolveDnsAddress(address) - of multiCodec("dns4"): - await self.resolveDnsAddress(address, Domain.AF_INET) - of multiCodec("dns6"): - await self.resolveDnsAddress(address, Domain.AF_INET6) - of multiCodec("dnsaddr"): - await self.resolveDnsAddr(address) - else: - @[address] - for ad in seq: - res.incl(ad) + if not DNS.matchPartial(address): + res.incl(address) + else: + let code = address[0].get().protoCode().get() + let seq = case code: + of multiCodec("dns"): + await self.resolveDnsAddress(address) + of multiCodec("dns4"): + await self.resolveDnsAddress(address, Domain.AF_INET) + of multiCodec("dns6"): + await self.resolveDnsAddress(address, Domain.AF_INET6) + of multiCodec("dnsaddr"): + await self.resolveDnsAddr(address) + else: + @[address] + for ad in seq: + res.incl(ad) return res.toSeq diff --git a/libp2p/peerid.nim b/libp2p/peerid.nim index 0219b1a..0fd7daa 100644 --- a/libp2p/peerid.nim +++ b/libp2p/peerid.nim @@ -197,21 +197,6 @@ func write*(vb: var VBuffer, pid: PeerID) = ## Write PeerID value ``peerid`` to buffer ``vb``. vb.writeSeq(pid.data) -func initProtoField*(index: int, pid: PeerID): ProtoField {.deprecated.} = - ## Initialize ProtoField with PeerID ``value``. - initProtoField(index, pid.data) - -func getValue*(data: var ProtoBuffer, field: int, value: var PeerID): int {. - deprecated.} = - ## Read ``PeerID`` from ProtoBuf's message and validate it. - var pid: PeerID - result = getLengthValue(data, field, pid.data) - if result > 0: - if not pid.validate(): - result = -1 - else: - value = pid - func write*(pb: var ProtoBuffer, field: int, pid: PeerID) = ## Write PeerID value ``peerid`` to object ``pb`` using ProtoBuf's encoding. write(pb, field, pid.data) diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index ff75a71..b82626d 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -39,7 +39,7 @@ func shortLog*(p: PeerInfo): auto = ) chronicles.formatIt(PeerInfo): shortLog(it) -proc init*( +proc new*( p: typedesc[PeerInfo], key: PrivateKey, addrs: openarray[MultiAddress] = [], @@ -49,7 +49,7 @@ proc init*( {.raises: [Defect, PeerInfoError].} = let pubkey = try: - key.getKey().tryGet() + key.getPublicKey().tryGet() except CatchableError: raise newException(PeerInfoError, "invalid private key") diff --git a/libp2p/protobuf/minprotobuf.nim b/libp2p/protobuf/minprotobuf.nim index 02ea60b..2d72c97 100644 --- a/libp2p/protobuf/minprotobuf.nim +++ b/libp2p/protobuf/minprotobuf.nim @@ -58,7 +58,8 @@ type BufferOverflow, MessageTooBig, BadWireType, - IncorrectBlob + IncorrectBlob, + RequiredFieldMissing ProtoResult*[T] = Result[T, ProtoError] @@ -115,43 +116,6 @@ proc vsizeof*(field: ProtoField): int {.inline.} = else: 0 -proc initProtoField*(index: int, value: SomeVarint): ProtoField {.deprecated.} = - ## Initialize ProtoField with integer value. - result = ProtoField(kind: Varint, index: index) - when type(value) is uint64: - result.vint = value - else: - result.vint = cast[uint64](value) - -proc initProtoField*(index: int, value: bool): ProtoField {.deprecated.} = - ## Initialize ProtoField with integer value. - result = ProtoField(kind: Varint, index: index) - result.vint = byte(value) - -proc initProtoField*(index: int, - value: openarray[byte]): ProtoField {.deprecated.} = - ## Initialize ProtoField with bytes array. - result = ProtoField(kind: Length, index: index) - if len(value) > 0: - result.vbuffer = newSeq[byte](len(value)) - copyMem(addr result.vbuffer[0], unsafeAddr value[0], len(value)) - -proc initProtoField*(index: int, value: string): ProtoField {.deprecated.} = - ## Initialize ProtoField with string. - result = ProtoField(kind: Length, index: index) - if len(value) > 0: - result.vbuffer = newSeq[byte](len(value)) - copyMem(addr result.vbuffer[0], unsafeAddr value[0], len(value)) - -proc initProtoField*(index: int, - value: ProtoBuffer): ProtoField {.deprecated, inline.} = - ## Initialize ProtoField with nested message stored in ``value``. - ## - ## Note: This procedure performs shallow copy of ``value`` sequence. - result = ProtoField(kind: Length, index: index) - if len(value.buffer) > 0: - shallowCopy(result.vbuffer, value.buffer) - proc initProtoBuffer*(data: seq[byte], offset = 0, options: set[ProtoFlags] = {}): ProtoBuffer = ## Initialize ProtoBuffer with shallow copy of ``data``. @@ -299,51 +263,6 @@ proc write*(pb: var ProtoBuffer, field: int, value: ProtoBuffer) {.inline.} = ## ``pb`` with field number ``field``. write(pb, field, value.buffer) -proc write*(pb: var ProtoBuffer, field: ProtoField) {.deprecated.} = - ## Encode protobuf's field ``field`` and store it to protobuf's buffer ``pb``. - var length = 0 - var res: VarintResult[void] - pb.buffer.setLen(len(pb.buffer) + vsizeof(field)) - res = PB.putUVarint(pb.toOpenArray(), length, getProtoHeader(field)) - doAssert(res.isOk()) - pb.offset += length - case field.kind - of ProtoFieldKind.Varint: - res = PB.putUVarint(pb.toOpenArray(), length, field.vint) - doAssert(res.isOk()) - pb.offset += length - of ProtoFieldKind.Fixed64: - doAssert(pb.isEnough(8)) - var value = cast[uint64](field.vfloat64) - pb.buffer[pb.offset] = byte(value and 0xFF'u32) - pb.buffer[pb.offset + 1] = byte((value shr 8) and 0xFF'u64) - pb.buffer[pb.offset + 2] = byte((value shr 16) and 0xFF'u64) - pb.buffer[pb.offset + 3] = byte((value shr 24) and 0xFF'u64) - pb.buffer[pb.offset + 4] = byte((value shr 32) and 0xFF'u64) - pb.buffer[pb.offset + 5] = byte((value shr 40) and 0xFF'u64) - pb.buffer[pb.offset + 6] = byte((value shr 48) and 0xFF'u64) - pb.buffer[pb.offset + 7] = byte((value shr 56) and 0xFF'u64) - pb.offset += 8 - of ProtoFieldKind.Fixed32: - doAssert(pb.isEnough(4)) - var value = cast[uint32](field.vfloat32) - pb.buffer[pb.offset] = byte(value and 0xFF'u32) - pb.buffer[pb.offset + 1] = byte((value shr 8) and 0xFF'u32) - pb.buffer[pb.offset + 2] = byte((value shr 16) and 0xFF'u32) - pb.buffer[pb.offset + 3] = byte((value shr 24) and 0xFF'u32) - pb.offset += 4 - of ProtoFieldKind.Length: - res = PB.putUVarint(pb.toOpenArray(), length, uint(len(field.vbuffer))) - doAssert(res.isOk()) - pb.offset += length - doAssert(pb.isEnough(len(field.vbuffer))) - if len(field.vbuffer) > 0: - copyMem(addr pb.buffer[pb.offset], unsafeAddr field.vbuffer[0], - len(field.vbuffer)) - pb.offset += len(field.vbuffer) - else: - discard - proc finish*(pb: var ProtoBuffer) = ## Prepare protobuf's buffer ``pb`` for writing to stream. doAssert(len(pb.buffer) > 0) @@ -657,6 +576,17 @@ proc getField*(pb: ProtoBuffer, field: int, else: err(res.error) +proc getRequiredField*[T](pb: ProtoBuffer, field: int, + output: var T): ProtoResult[void] {.inline.} = + let res = pb.getField(field, output) + if res.isOk(): + if res.get(): + ok() + else: + err(RequiredFieldMissing) + else: + err(res.error) + proc getRepeatedField*[T: seq[byte]|string](data: ProtoBuffer, field: int, output: var seq[T]): ProtoResult[bool] = checkFieldNumber(field) @@ -733,6 +663,17 @@ proc getRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int, else: ok(false) +proc getRequiredRepeatedField*[T](pb: ProtoBuffer, field: int, + output: var seq[T]): ProtoResult[void] {.inline.} = + let res = pb.getRepeatedField(field, output) + if res.isOk(): + if res.get(): + ok() + else: + err(RequiredFieldMissing) + else: + err(res.error) + proc getPackedRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int, output: var seq[T]): ProtoResult[bool] = checkFieldNumber(field) @@ -787,93 +728,3 @@ proc getPackedRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int, ok(true) else: ok(false) - -proc getVarintValue*(data: var ProtoBuffer, field: int, - value: var SomeVarint): int {.deprecated.} = - ## Get value of `Varint` type. - var length = 0 - var header = 0'u64 - var soffset = data.offset - - if not data.isEmpty() and PB.getUVarint(data.toOpenArray(), - length, header).isOk(): - data.offset += length - if header == getProtoHeader(field, Varint): - if not data.isEmpty(): - when type(value) is int32 or type(value) is int64 or type(value) is int: - let res = getSVarint(data.toOpenArray(), length, value) - else: - let res = PB.getUVarint(data.toOpenArray(), length, value) - if res.isOk(): - data.offset += length - result = length - return - # Restore offset on error - data.offset = soffset - -proc getLengthValue*[T: string|seq[byte]](data: var ProtoBuffer, field: int, - buffer: var T): int {.deprecated.} = - ## Get value of `Length` type. - var length = 0 - var header = 0'u64 - var ssize = 0'u64 - var soffset = data.offset - result = -1 - buffer.setLen(0) - if not data.isEmpty() and PB.getUVarint(data.toOpenArray(), - length, header).isOk(): - data.offset += length - if header == getProtoHeader(field, Length): - if not data.isEmpty() and PB.getUVarint(data.toOpenArray(), - length, ssize).isOk(): - data.offset += length - if ssize <= MaxMessageSize and data.isEnough(int(ssize)): - buffer.setLen(ssize) - # Protobuf allow zero-length values. - if ssize > 0'u64: - copyMem(addr buffer[0], addr data.buffer[data.offset], ssize) - result = int(ssize) - data.offset += int(ssize) - return - # Restore offset on error - data.offset = soffset - -proc getBytes*(data: var ProtoBuffer, field: int, - buffer: var seq[byte]): int {.deprecated, inline.} = - ## Get value of `Length` type as bytes. - result = getLengthValue(data, field, buffer) - -proc getString*(data: var ProtoBuffer, field: int, - buffer: var string): int {.deprecated, inline.} = - ## Get value of `Length` type as string. - result = getLengthValue(data, field, buffer) - -proc enterSubmessage*(pb: var ProtoBuffer): int {.deprecated.} = - ## Processes protobuf's sub-message and adjust internal offset to enter - ## inside of sub-message. Returns field index of sub-message field or - ## ``0`` on error. - var length = 0 - var header = 0'u64 - var msize = 0'u64 - var soffset = pb.offset - - if not pb.isEmpty() and PB.getUVarint(pb.toOpenArray(), - length, header).isOk(): - pb.offset += length - if (header and 0x07'u64) == cast[uint64](ProtoFieldKind.Length): - if not pb.isEmpty() and PB.getUVarint(pb.toOpenArray(), - length, msize).isOk(): - pb.offset += length - if msize <= MaxMessageSize and pb.isEnough(int(msize)): - pb.length = int(msize) - result = int(header shr 3) - return - # Restore offset on error - pb.offset = soffset - -proc skipSubmessage*(pb: var ProtoBuffer) {.deprecated.} = - ## Skip current protobuf's sub-message and adjust internal offset to the - ## end of sub-message. - doAssert(pb.length != 0) - pb.offset += pb.length - pb.length = 0 diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index e19c5a7..cf04891 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -122,9 +122,6 @@ proc new*(T: typedesc[Identify], peerInfo: PeerInfo): T = identify.init() identify -proc newIdentify*(peerInfo: PeerInfo): Identify {.deprecated: "use Identify.new".} = - Identify.new(peerInfo) - method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = try: diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index 23565f8..6e5e6bc 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -49,7 +49,7 @@ method init*(p: Ping) = var buf: array[PingSize, byte] await conn.readExactly(addr buf[0], PingSize) trace "echoing ping", conn - await conn.write(addr buf[0], PingSize) + await conn.write(@buf) if not isNil(p.pingHandler): await p.pingHandler(conn.peerId) except CancelledError as exc: @@ -79,7 +79,7 @@ proc ping*( let startTime = Moment.now() trace "sending ping", conn - await conn.write(addr randomBuf[0], randomBuf.len) + await conn.write(@randomBuf) await conn.readExactly(addr resultBuf[0], PingSize) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index ed29a72..36e71a6 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -16,6 +16,7 @@ import ./pubsub, ./timedcache, ./peertable, ./rpc/[message, messages], + ../../crypto/crypto, ../../stream/connection, ../../peerid, ../../peerinfo, @@ -207,8 +208,7 @@ method initPubSub*(f: FloodSub) {.raises: [Defect, InitializationError].} = procCall PubSub(f).initPubSub() f.seen = TimedCache[MessageID].init(2.minutes) - var rng = newRng() f.seenSalt = newSeqUninitialized[byte](sizeof(Hash)) - brHmacDrbgGenerate(rng[], f.seenSalt) + brHmacDrbgGenerate(f.rng[], f.seenSalt) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f85057e..64181bf 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -9,7 +9,7 @@ {.push raises: [Defect].} -import std/[tables, sets, options, sequtils, random] +import std/[tables, sets, options, sequtils] import chronos, chronicles, metrics import ./pubsub, ./floodsub, @@ -37,6 +37,7 @@ logScope: declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened") +declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation") proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( @@ -295,11 +296,12 @@ method rpcHandler*(g: GossipSub, for i in 0.. bool: x notin grafts # shuffle anyway, score might be not used - shuffle(prunes) + g.rng.shuffle(prunes) # sort peers by score (inverted), pruning, so low score peers are on top prunes.sort(byScore, SortOrder.Ascending) @@ -382,7 +381,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) if pruneLen > 0: # Ok we got some peers to prune, # for this heartbeat let's prune those - shuffle(prunes) + g.rng.shuffle(prunes) prunes.setLen(pruneLen) trace "pruning", prunes = prunes.len @@ -519,7 +518,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 if midsSeq.len > IHaveMaxLength: - shuffle(midsSeq) + g.rng.shuffle(midsSeq) midsSeq.setLen(IHaveMaxLength) let @@ -540,7 +539,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: target = min(factor, allPeers.len) if target < allPeers.len: - shuffle(allPeers) + g.rng.shuffle(allPeers) allPeers.setLen(target) for peer in allPeers: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 413b4fb..a59e185 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -139,6 +139,7 @@ type disconnectBadPeers*: bool BackoffTable* = Table[string, Table[PeerID, Moment]] + ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]] GossipSub* = ref object of FloodSub mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic @@ -150,6 +151,7 @@ type gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages mcache*: MCache # messages cache + validationSeen*: ValidationSeenTable # peers who sent us message in validation heartbeatFut*: Future[void] # cancellation future for heartbeat interval heartbeatRunning*: bool diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e10802c..ecfec82 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -10,11 +10,12 @@ {.push raises: [Defect].} import std/[tables, sequtils, sets, strutils] -import chronos, chronicles, metrics +import chronos, chronicles, metrics, bearssl import ./pubsubpeer, ./rpc/[message, messages, protobuf], ../../switch, ../protocol, + ../../crypto/crypto, ../../stream/connection, ../../peerid, ../../peerinfo, @@ -106,6 +107,15 @@ type anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to + maxMessageSize*: int ##\ + ## the maximum raw message size we'll globally allow + ## for finer tuning, check message size on topic validator + ## + ## sending a big message to a peer with a lower size limit can + ## lead to issues, from descoring to connection drops + ## + ## defaults to 1mB + rng*: ref BrHmacDrbgContext knownTopics*: HashSet[string] @@ -283,7 +293,7 @@ proc getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0]) + let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer @@ -538,6 +548,8 @@ proc init*[PubParams: object | bool]( sign: bool = true, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, subscriptionValidator: SubscriptionValidator = nil, + maxMessageSize: int = 1024 * 1024, + rng: ref BrHmacDrbgContext = newRng(), parameters: PubParams = false): P {.raises: [Defect, InitializationError].} = let pubsub = @@ -550,6 +562,8 @@ proc init*[PubParams: object | bool]( sign: sign, msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, + maxMessageSize: maxMessageSize, + rng: rng, topicsHigh: int.high) else: P(switch: switch, @@ -561,6 +575,8 @@ proc init*[PubParams: object | bool]( msgIdProvider: msgIdProvider, subscriptionValidator: subscriptionValidator, parameters: parameters, + maxMessageSize: maxMessageSize, + rng: rng, topicsHigh: int.high) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index bb6833f..af301f0 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -60,6 +60,7 @@ type score*: float64 iWantBudget*: int iHaveBudget*: int + maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score @@ -119,7 +120,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = while not conn.atEof: trace "waiting for data", conn, peer = p, closed = conn.closed - var data = await conn.readLp(64 * 1024) + var data = await conn.readLp(p.maxMessageSize) trace "read data from peer", conn, peer = p, closed = conn.closed, data = data.shortLog @@ -186,8 +187,8 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = try: if p.onEvent != nil: p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected)) - except CancelledError: - raise + except CancelledError as exc: + raise exc except CatchableError as exc: debug "Errors during diconnection events", error = exc.msg @@ -243,6 +244,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} = debug "empty message, skipping", p, msg = shortLog(msg) return + if msg.len > p.maxMessageSize: + info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + return + let conn = p.sendConn if conn == nil or conn.closed(): trace "No send connection, skipping message", p, msg = shortLog(msg) @@ -280,7 +285,8 @@ proc new*( getConn: GetConn, dropConn: DropConn, onEvent: OnEvent, - codec: string): T = + codec: string, + maxMessageSize: int): T = T( getConn: getConn, @@ -288,19 +294,5 @@ proc new*( onEvent: onEvent, codec: codec, peerId: peerId, - ) - -proc newPubSubPeer*( - peerId: PeerID, - getConn: GetConn, - dropConn: DropConn, - onEvent: OnEvent, - codec: string): PubSubPeer {.deprecated: "use PubSubPeer.new".} = - - PubSubPeer.new( - peerId, - getConn, - dropConn, - onEvent, - codec + maxMessageSize: maxMessageSize ) diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index dc110ca..79cedb2 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -565,7 +565,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon raise newException(NoiseHandshakeError, "Invalid remote peer id") conn.peerId = pid.get() - var tmp = NoiseConnection.init(conn, conn.peerId, conn.observedAddr) + var tmp = NoiseConnection.new(conn, conn.peerId, conn.observedAddr) if initiator: tmp.readCs = handshakeRes.cs2 @@ -615,10 +615,3 @@ proc new*( noise.init() noise - -proc newNoise*( - rng: ref BrHmacDrbgContext, - privateKey: PrivateKey, - outgoing: bool = true, - commonPrologue: seq[byte] = @[]): Noise {.deprecated: "use Noise.new".}= - Noise.new(rng, privateKey, outgoing, commonPrologue) diff --git a/libp2p/protocols/secure/plaintext.nim b/libp2p/protocols/secure/plaintext.nim index 8541f7c..7d712f8 100644 --- a/libp2p/protocols/secure/plaintext.nim +++ b/libp2p/protocols/secure/plaintext.nim @@ -29,6 +29,3 @@ proc new*(T: typedesc[PlainText]): T = let plainText = T() plainText.init() plainText - -proc newPlainText*(): PlainText {.deprecated: "use PlainText.new".} = - PlainText.new() diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 329675d..aa43d0e 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -263,7 +263,7 @@ proc newSecioConn(conn: Connection, ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## ``order``. - result = SecioConn.init(conn, conn.peerId, conn.observedAddr) + result = SecioConn.new(conn, conn.peerId, conn.observedAddr) let i0 = if order < 0: 1 else: 0 let i1 = if order < 0: 0 else: 1 @@ -441,6 +441,3 @@ proc new*( ) secio.init() secio - -proc newSecio*(rng: ref BrHmacDrbgContext, localPrivateKey: PrivateKey): Secio {.deprecated: "use Secio.new".} = - Secio.new(rng, localPrivateKey) diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index e045eba..c6fe8d7 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -42,7 +42,7 @@ func shortLog*(conn: SecureConn): auto = chronicles.formatIt(SecureConn): shortLog(it) -proc init*(T: type SecureConn, +proc new*(T: type SecureConn, conn: Connection, peerId: PeerId, observedAddr: Multiaddress, diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 9151029..6d31d3a 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -65,10 +65,6 @@ proc new*( bufferStream.initStream() bufferStream -proc newBufferStream*( - timeout: Duration = DefaultConnectionTimeout): BufferStream {.deprecated: "use BufferStream.new".} = - return BufferStream.new(timeout) - method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 7b974c3..419f52d 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -151,7 +151,7 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = if not await s.pollActivity(): return -proc init*(C: type Connection, +proc new*(C: type Connection, peerId: PeerId, dir: Direction, timeout: Duration = DefaultConnectionTimeout, diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 9e4ee73..168ef91 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -180,7 +180,7 @@ proc readExactly*(s: LPStream, proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] - {.async, deprecated: "todo".} = + {.async.} = # TODO replace with something that exploits buffering better var lim = if limit <= 0: -1 else: limit var state = 0 @@ -255,9 +255,6 @@ proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] = proc writeLp*(s: LPStream, msg: string): Future[void] = writeLp(s, msg.toOpenArrayByte(0, msg.high)) -proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} = - s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1))) - proc write*(s: LPStream, msg: string): Future[void] = s.write(msg.toBytes()) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 3d9b22a..8f19c5f 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -273,7 +273,7 @@ proc newSwitch*(peerInfo: PeerInfo, transports: transports, connManager: connManager, peerStore: PeerStore.new(), - dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms), + dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver), nameResolver: nameResolver) switch.connManager.peerStore = switch.peerStore diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index b0527df..9a19a69 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -114,13 +114,6 @@ proc connHandler*(self: TcpTransport, return conn -proc init*( - T: typedesc[TcpTransport], - flags: set[ServerFlags] = {}, - upgrade: Upgrade): T {.deprecated: "use .new".} = - - T.new(flags, upgrade) - proc new*( T: typedesc[TcpTransport], flags: set[ServerFlags] = {}, @@ -206,6 +199,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = method dial*( self: TcpTransport, + hostname: string, address: MultiAddress): Future[Connection] {.async, gcsafe.} = ## dial a peer ## diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index b9a0f8e..8ff1a6f 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -60,12 +60,18 @@ method accept*(self: Transport): Future[Connection] method dial*( self: Transport, + hostname: string, address: MultiAddress): Future[Connection] {.base, gcsafe.} = ## dial a peer ## doAssert(false, "Not implemented!") +proc dial*( + self: Transport, + address: MultiAddress): Future[Connection] {.gcsafe.} = + self.dial("", address) + method upgradeIncoming*( self: Transport, conn: Connection): Future[void] {.base, gcsafe.} = diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index a75cafc..7694e2f 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -34,7 +34,7 @@ type WsStream = ref object of Connection session: WSSession -proc init*(T: type WsStream, +proc new*(T: type WsStream, session: WSSession, dir: Direction, timeout = 10.minutes, @@ -170,7 +170,7 @@ proc connHandler(self: WsTransport, await stream.close() raise exc - let conn = WsStream.init(stream, dir) + let conn = WsStream.new(stream, dir) conn.observedAddr = observedAddr self.connections[dir].add(conn) @@ -207,6 +207,7 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} = method dial*( self: WsTransport, + hostname: string, address: MultiAddress): Future[Connection] {.async, gcsafe.} = ## dial a peer ## @@ -219,6 +220,7 @@ method dial*( address.initTAddress().tryGet(), "", secure = secure, + hostName = hostname, flags = self.tlsFlags) return await self.connHandler(transp, Direction.Out) diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index fc596a7..e92e5d3 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -194,7 +194,7 @@ proc muxerHandler( await muxer.close() trace "Exception in muxer handler", conn, msg = exc.msg -proc init*( +proc new*( T: type MuxedUpgrade, identity: Identify, muxers: Table[string, MuxerProvider], diff --git a/tests/helpers.nim b/tests/helpers.nim index 58f9cc6..c336291 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -83,9 +83,6 @@ proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T = testBufferStream.initStream() testBufferStream -proc newBufferStream*(writeHandler: WriteHandler): TestBufferStream {.deprecated: "use TestBufferStream.new".}= - TestBufferStream.new(writeHandler) - proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} = {.gcsafe.}: let start = Moment.now() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 5e167be..12bb288 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -382,3 +382,57 @@ suite "FloodSub": it.switch.stop()))) await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub message size validation": + var messageReceived = 0 + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check data.len < 50 + inc(messageReceived) + + let + bigNode = generateNodes(1) + smallNode = generateNodes(1, maxMessageSize = 200) + + # start switches + nodesFut = await allFinished( + bigNode[0].switch.start(), + smallNode[0].switch.start(), + ) + + # start pubsubcon + await allFuturesThrowing( + allFinished( + bigNode[0].start(), + smallNode[0].start(), + )) + + await subscribeNodes(bigNode & smallNode) + bigNode[0].subscribe("foo", handler) + smallNode[0].subscribe("foo", handler) + await waitSub(bigNode[0], smallNode[0], "foo") + + let + bigMessage = newSeq[byte](1000) + smallMessage1 = @[1.byte] + smallMessage2 = @[3.byte] + + # Need two different messages, otherwise they are the same when anonymized + check (await smallNode[0].publish("foo", smallMessage1)) > 0 + check (await bigNode[0].publish("foo", smallMessage2)) > 0 + + check (await checkExpiring(messageReceived == 2)) == true + + check (await smallNode[0].publish("foo", bigMessage)) > 0 + check (await bigNode[0].publish("foo", bigMessage)) > 0 + + await allFuturesThrowing( + smallNode[0].switch.stop(), + bigNode[0].switch.stop() + ) + + await allFuturesThrowing( + smallNode[0].stop(), + bigNode[0].stop() + ) + + await allFuturesThrowing(nodesFut) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index d8ee5c5..450496a 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -25,7 +25,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer = proc dropConn(peer: PubSubPeer) = discard # we don't care about it here yet - let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec) + let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec, 1024 * 1024) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 15fa04d..8e83392 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -22,6 +22,7 @@ import utils, ../../libp2p/[errors, protocols/pubsub/gossipsub, protocols/pubsub/pubsubpeer, protocols/pubsub/peertable, + protocols/pubsub/timedcache, protocols/pubsub/rpc/messages] import ../helpers @@ -556,6 +557,89 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub should not send to source & peers who already seen": + # 3 nodes: A, B, C + # A publishes, B relays, C is having a long validation + # so C should not send to anyone + + let + nodes = generateNodes( + 3, + gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + nodes[2].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + nodes[2].start(), + )) + + await subscribeNodes(nodes) + + var cRelayed: Future[void] = newFuture[void]() + var bFinished: Future[void] = newFuture[void]() + var + aReceived = 0 + cReceived = 0 + proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} = + inc aReceived + check aReceived < 2 + proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} = discard + proc handlerC(topic: string, data: seq[byte]) {.async, gcsafe.} = + inc cReceived + check cReceived < 2 + cRelayed.complete() + + nodes[0].subscribe("foobar", handlerA) + nodes[1].subscribe("foobar", handlerB) + nodes[2].subscribe("foobar", handlerC) + await waitSub(nodes[0], nodes[1], "foobar") + await waitSub(nodes[0], nodes[2], "foobar") + await waitSub(nodes[2], nodes[1], "foobar") + await waitSub(nodes[1], nodes[2], "foobar") + + var gossip1: GossipSub = GossipSub(nodes[0]) + var gossip2: GossipSub = GossipSub(nodes[1]) + var gossip3: GossipSub = GossipSub(nodes[2]) + + proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} = + await cRelayed + # Empty A & C caches to detect duplicates + gossip1.seen = TimedCache[MessageId].init() + gossip3.seen = TimedCache[MessageId].init() + let msgId = toSeq(gossip2.validationSeen.keys)[0] + check await checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false) + result = ValidationResult.Accept + bFinished.complete() + + nodes[1].addValidator("foobar", slowValidator) + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + await bFinished + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop(), + nodes[2].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop(), + nodes[2].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - GossipSub send over floodPublish A -> B": var passed: Future[bool] = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 8a2b64b..d555d89 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -14,7 +14,7 @@ suite "Message": test "signature": var seqno = 11'u64 let - peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + peer = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get()) msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) check verify(msg) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 4d59149..452edd4 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -26,7 +26,8 @@ proc generateNodes*( triggerSelf: bool = false, verifySignature: bool = libp2p_pubsub_verify, anonymize: bool = libp2p_pubsub_anonymize, - sign: bool = libp2p_pubsub_sign): seq[PubSub] = + sign: bool = libp2p_pubsub_sign, + maxMessageSize: int = 1024 * 1024): seq[PubSub] = for i in 0.. 0: + readFile(paramStr(1)) + else: + stdin.readAll() +var index = 0 + +const startDelim = "```nim\n" +const endDelim = "\n```" +while true: + let startOfBlock = contents.find(startDelim, start = index) + if startOfBlock == -1: break + + let endOfBlock = contents.find(endDelim, start = startOfBlock + startDelim.len) + if endOfBlock == -1: + quit "Unfinished block!" + + let code = contents[startOfBlock + startDelim.len .. endOfBlock] + + echo code + + index = endOfBlock + endDelim.len