mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-01 16:40:32 +00:00
Add streaming protocol tutorials
Add two new tutorials showing users how they can use libp2p to stream data across a connection. Add tutorial 3 and 4 to CI via libp2p.nimble. Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
parent
fffa7e8cc2
commit
34aa4045fd
@ -83,16 +83,27 @@ We can find out which port was attributed, and the resulting local addresses, by
|
||||
|
||||
We'll **dial** the first switch from the second one, by specifying it's **Peer ID**, it's **MultiAddress** and the **`Ping` protocol codec**:
|
||||
```nim
|
||||
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, PingCodec)
|
||||
let stream = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, PingCodec)
|
||||
```
|
||||
We now have a `Ping` connection setup between the second and the first switch, we can use it to actually ping the node:
|
||||
This will connect to the peer using a transport compatible with one of the peer MultiAddress.
|
||||
The connection will next be upgraded, meaning it will be secured using our SecureManager (Noise in our case),
|
||||
and multiplexed using our Multiplexer (Mplex here).
|
||||
|
||||
The "physical" connection will be handled by libp2p, which will keep it opened for as long as it's used.
|
||||
If you want to keep a connection around, you will have to actively use it, eg, by pinging it regularly.
|
||||
|
||||
Because we asked for the `PingCodec`, a multiplexed Channel (or Stream) will be opened on this connection
|
||||
and the Codec will be negotiated. Once this is done, the libp2p will give us back the stream.
|
||||
|
||||
We now have a `Ping` stream setup between the second and the first switch, we can use it to actually ping the node:
|
||||
```nim
|
||||
# ping the other node and echo the ping duration
|
||||
echo "ping: ", await pingProtocol.ping(conn)
|
||||
echo "ping: ", await pingProtocol.ping(stream)
|
||||
|
||||
# We must close the connection ourselves when we're done with it
|
||||
await conn.close()
|
||||
# We must close the stream ourselves when we're done with it
|
||||
await stream.close()
|
||||
```
|
||||
Closing the stream will not close the underlying connection.
|
||||
|
||||
And that's it! Just a little bit of cleanup: shutting down the switches, waiting for them to stop, and we'll call our `main` procedure:
|
||||
```nim
|
||||
|
@ -13,7 +13,7 @@ import libp2p
|
||||
```
|
||||
This is similar to the first tutorial, except we don't need to import the `Ping` protocol.
|
||||
|
||||
Next, we'll declare our custom protocol
|
||||
Next, we'll declare our custom protocol:
|
||||
```nim
|
||||
const TestCodec = "/test/proto/1.0.0"
|
||||
|
||||
@ -22,29 +22,35 @@ type TestProto = ref object of LPProtocol
|
||||
|
||||
We've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`. In a more complex protocol, we could use this structure to store interesting variables.
|
||||
|
||||
A protocol generally has two part: and handling/server part, and a dialing/client part.
|
||||
Theses two parts can be identical, but in our trivial protocol, the server will wait for a message from the client, and the client will send a message, so we have to handle the two cases separately.
|
||||
A protocol generally has two parts: a handling/server part, and a dialing/client part. These two parts can be identical, but in our trivial protocol, the server will wait for a message from the client, and the client will send a message, so we have to handle the two cases separately.
|
||||
|
||||
Let's start with the server part:
|
||||
```nim
|
||||
proc new(T: typedesc[TestProto]): T =
|
||||
# every incoming connections will in be handled in this closure
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||
echo "Got from remote - ", string.fromBytes(await conn.readLp(1024))
|
||||
# We must close the connections ourselves when we're done with it
|
||||
await conn.close()
|
||||
# every incoming connections will be handled in this closure
|
||||
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
|
||||
echo "[server] received from client: ", string.fromBytes(await stream.readLp(1024))
|
||||
await stream.writeLp("Hi back!")
|
||||
# We must close the stream ourselves when we're done with it
|
||||
await stream.close()
|
||||
|
||||
return T(codecs: @[TestCodec], handler: handle)
|
||||
```
|
||||
This is a constructor for our `TestProto`, that will specify our `codecs` and a `handler`, which will be called for each incoming peer asking for this protocol.
|
||||
In our handle, we simply read a message from the connection and `echo` it.
|
||||
|
||||
In our handle, we simply read a message from the stream and `echo` it.
|
||||
|
||||
We can now create our client part:
|
||||
```nim
|
||||
proc hello(p: TestProto, conn: Connection) {.async.} =
|
||||
await conn.writeLp("Hello p2p!")
|
||||
proc hello(p: TestProto, stream: Connection) {.async.} =
|
||||
await stream.writeLp("Hello p2p!")
|
||||
let
|
||||
# readLp reads length prefixed bytes and returns a buffer without the prefix
|
||||
strData = await stream.readLp(1024)
|
||||
str = string.fromBytes(strData)
|
||||
echo "[client] received from server: ", str
|
||||
```
|
||||
Again, pretty straight-forward, we just send a message on the connection.
|
||||
Again, pretty straight-forward, we just send a message on the stream.
|
||||
|
||||
We can now create our main procedure:
|
||||
```nim
|
||||
@ -64,17 +70,17 @@ proc main() {.async, gcsafe.} =
|
||||
|
||||
await testProto.hello(conn)
|
||||
|
||||
# We must close the connection ourselves when we're done with it
|
||||
await conn.close()
|
||||
# We must close the stream ourselves when we're done with it
|
||||
await stream.close()
|
||||
|
||||
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
|
||||
```
|
||||
|
||||
This is very similar to the first tutorial's `main`, the only noteworthy difference is that we use `newStandardSwitch`, which is similar to `createSwitch` but is bundled directly in libp2p
|
||||
This is very similar to the first tutorial's `main`. The only noteworthy difference is that we use `newStandardSwitch`, which is similar to `createSwitch` but is bundled directly in libp2p.
|
||||
|
||||
We can now wrap our program by calling our main proc:
|
||||
```nim
|
||||
waitFor(main())
|
||||
```
|
||||
|
||||
And that's it!
|
||||
And that's it! In the [next tutorial](tutorial_3_streamproto.md), we'll look at how to create a protocol for streaming data.
|
||||
|
102
examples/tutorial_3_streamproto.md
Normal file
102
examples/tutorial_3_streamproto.md
Normal file
@ -0,0 +1,102 @@
|
||||
In the [previous tutorial](tutorial_2_customproto.md), we dug a little deeper in to how the ping protocol works by creating a custom ping protocol ourselves.
|
||||
|
||||
This tutorial will focus on how to create a custom streaming protocol using libp2p.
|
||||
|
||||
# Custom protocol in libp2p
|
||||
Let's create a `part3.nim`, and import our dependencies:
|
||||
```nim
|
||||
import bearssl
|
||||
import chronos
|
||||
import stew/byteutils
|
||||
|
||||
import libp2p
|
||||
```
|
||||
This is similar to the second tutorial. Next, we'll declare our custom protocol.
|
||||
```nim
|
||||
const TestStreamCodec = "/test/stream-proto/1.0.0"
|
||||
|
||||
type TestStreamProto = ref object of LPProtocol
|
||||
```
|
||||
|
||||
Just as we did in the last tutorial, we've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`.
|
||||
|
||||
As in the last tutorial, we are going to handle our client and server parts separately. The server will wait for an instruction from the client, then send a stream of data to the client and finish by closing the connection. The client will listen in a loop until the server has finished and closed the connection. Then the client will close its end of the connection as well.
|
||||
|
||||
Let's start with the server part:
|
||||
```nim
|
||||
# Set the chunk length >= any data being streamed. Our longest chunk of data in
|
||||
# either direction is 16 bytes ("please send data").
|
||||
const CHUNK_LEN_BYTES = 16
|
||||
|
||||
proc new(T: typedesc[TestStreamProto]): T =
|
||||
# every incoming connections will in be handled in this closure
|
||||
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
|
||||
# readLp reads length prefixed bytes and returns a buffer without the
|
||||
# prefix
|
||||
var bytes = await stream.readLp(CHUNK_LEN_BYTES)
|
||||
echo "[server] received from client: ", string.fromBytes(bytes)
|
||||
|
||||
# send stream of data
|
||||
for i in 0..10:
|
||||
let data = "Part " & $i
|
||||
await stream.writeLp(data)
|
||||
|
||||
# We must close the connections ourselves when we're done with it
|
||||
await stream.close()
|
||||
|
||||
return T(codecs: @[TestStreamCodec], handler: handle)
|
||||
```
|
||||
Again, `handle`, will be called for each incoming peer asking for this protocol. In our simple example, the client sends `"please send data"` and once received, the server echos the message and sends a stream of data back. In this example, we are simply sending a series of strings as the stream of data, but this can be any data in a practical application.
|
||||
|
||||
We can now create our client part:
|
||||
```nim
|
||||
proc streamData(p: TestStreamProto, stream: Connection) {.async.} =
|
||||
let data = "please send data"
|
||||
await stream.writeLp(data)
|
||||
|
||||
# Read loop
|
||||
try:
|
||||
|
||||
while true:
|
||||
# readLp reads length prefixed bytes and returns a buffer without the
|
||||
# prefix
|
||||
let bytes = await stream.readLp(CHUNK_LEN_BYTES)
|
||||
|
||||
echo "[client] received from server: ", string.fromBytes(bytes)
|
||||
|
||||
except LPStreamEOFError:
|
||||
# close the stream from the client's side
|
||||
await stream.close()
|
||||
```
|
||||
As a client, we want to keep reading data in a loop until the server has finished sending its stream, then we can close the stream from the client's side as well. We know that the client has closed its stream when `LPStreamEOFError` is raised.
|
||||
|
||||
We can now create our main procedure:
|
||||
```nim
|
||||
proc main() {.async, gcsafe.} =
|
||||
let
|
||||
rng = newRng()
|
||||
TestStreamProto = TestStreamProto.new()
|
||||
switch1 = newStandardSwitch(rng=rng)
|
||||
switch2 = newStandardSwitch(rng=rng)
|
||||
|
||||
switch1.mount(TestStreamProto)
|
||||
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
|
||||
let stream = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestStreamCodec)
|
||||
|
||||
# stream is now a fully setup connection, we talk directly to the switch1 custom protocol handler
|
||||
await TestStreamProto.streamData(stream)
|
||||
|
||||
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
|
||||
```
|
||||
|
||||
This is very similar to the second tutorial's `main`, except that we use our newly created stream protocol and stream codec.
|
||||
|
||||
We can now wrap our program by calling our main proc:
|
||||
```nim
|
||||
waitFor(main())
|
||||
```
|
||||
|
||||
In the [next tutorial](tutorial_4_streamproto2.md), we will get a glimpse of an alternative way to stream data.
|
107
examples/tutorial_4_streamproto2.md
Normal file
107
examples/tutorial_4_streamproto2.md
Normal file
@ -0,0 +1,107 @@
|
||||
In the [previous tutorial](tutorial_3_streamproto.md), we learned how to create a custom streaming protocol.
|
||||
|
||||
This tutorial is very similar to the [previous tutorial](tutorial_3_streamproto.md) in that a stream of data will be consumed, however in this tutorial we will learn how to use alternative libp2p streaming semantics.
|
||||
|
||||
# Custom protocol in libp2p
|
||||
Let's create a `part4.nim`, and import our dependencies:
|
||||
```nim
|
||||
import bearssl
|
||||
import chronos
|
||||
import stew/byteutils
|
||||
|
||||
import libp2p
|
||||
```
|
||||
This is similar to the third tutorial. Next, we'll declare our custom protocol.
|
||||
```nim
|
||||
const TestStreamCodec = "/test/stream-proto/2.0.0"
|
||||
|
||||
type TestStreamProto = ref object of LPProtocol
|
||||
```
|
||||
|
||||
Just as we did in the last tutorial, we've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`.
|
||||
|
||||
As in the last tutorial, we are going to handle our client and server parts separately. The server will wait for an instruction from the client, then send a stream of data to the client and finish by closing the connection. The client will listen in a loop until the server has finished and closed the connection. Then the client will close its end of the connection as well.
|
||||
|
||||
Let's start with the server part:
|
||||
```nim
|
||||
# Set the chunk length >= any data being streamed. Our longest chunk of data in
|
||||
# either direction is 16 bytes ("please send data").
|
||||
const CHUNK_LEN_BYTES = 16
|
||||
|
||||
proc new(T: typedesc[TestStreamProto]): T =
|
||||
# every incoming connections will in be handled in this closure
|
||||
var buf = newSeq[byte](CHUNK_LEN_BYTES)
|
||||
proc handle(stream: Connection, proto: string) {.async, gcsafe.} =
|
||||
|
||||
discard await stream.readOnce(addr buf[0], CHUNK_LEN_BYTES)
|
||||
echo "[server] received from client: ", string.fromBytes(buf)
|
||||
|
||||
# send stream of data
|
||||
for i in 0..10:
|
||||
let data = "Part " & $i
|
||||
await stream.write(data)
|
||||
|
||||
# We must close the connections ourselves when we're done with it
|
||||
await stream.close()
|
||||
|
||||
return T(codecs: @[TestStreamCodec], handler: handle)
|
||||
```
|
||||
Instead of using `readLp` to read data from the connection (as in the previous tutorial), we are going to use `readOnce`, which allows us to read bytes from the buffer or wait for bytes to be received.
|
||||
|
||||
We can now create our client part:
|
||||
```nim
|
||||
proc streamData(p: TestStreamProto, stream: Connection) {.async.} =
|
||||
let data = "please send data"
|
||||
await stream.write(data)
|
||||
|
||||
# Read loop
|
||||
while not stream.atEof:
|
||||
|
||||
var buf = newSeq[byte](CHUNK_LEN_BYTES)
|
||||
# readOnce reads bytes from internal buffer if present, otherwise waits for
|
||||
# bytes to be received. Note that it will return as soon as it receive
|
||||
# bytes, even if the message is not complete.
|
||||
let len = await stream.readOnce(addr buf[0], CHUNK_LEN_BYTES)
|
||||
|
||||
# When the connection is closed from the server end, len will be 0 as we
|
||||
# will not have read any data. We could check len == 0 and break the while
|
||||
# loop, or we can let the loop continue at which point stream.atEof will be
|
||||
# true and the loop will break. The only difference is that by continuing,
|
||||
# we will see an extra line echoed with no data received.
|
||||
echo "[client] received from server: ", string.fromBytes(buf)
|
||||
|
||||
# close the stream from the client's side
|
||||
await stream.close()
|
||||
```
|
||||
As a client, we want to keep reading data in a loop until the server has finished sending its stream, then we can close the stream from the client's side as well. We know that the client has closed its stream when `stream.atEof` is `true`. The difference when using `readOnce` instead of `readLp` is that when using `readLp` (to read length-prefixed data), we won't get the chance to read an EOF signaled by a remote closure, requiring us to read again and catch an exception. `readOnce` allows us to finish our read loop once `atEof` is `true`.
|
||||
|
||||
We can now create our main procedure:
|
||||
```nim
|
||||
proc main() {.async, gcsafe.} =
|
||||
let
|
||||
rng = newRng()
|
||||
TestStreamProto = TestStreamProto.new()
|
||||
switch1 = newStandardSwitch(rng=rng)
|
||||
switch2 = newStandardSwitch(rng=rng)
|
||||
|
||||
switch1.mount(TestStreamProto)
|
||||
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
|
||||
let stream = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestStreamCodec)
|
||||
|
||||
# stream is now a fully setup connection, we talk directly to the switch1 custom protocol handler
|
||||
await TestStreamProto.streamData(stream)
|
||||
|
||||
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
|
||||
```
|
||||
|
||||
This is very similar to the second tutorial's `main`, except that we use our newly created stream protocol and stream codec.
|
||||
|
||||
We can now wrap our program by calling our main proc:
|
||||
```nim
|
||||
waitFor(main())
|
||||
```
|
||||
|
||||
And that's it!
|
@ -94,3 +94,5 @@ task examples_build, "Build the samples":
|
||||
buildSample("helloworld", true)
|
||||
buildTutorial("examples/tutorial_1_connect.md")
|
||||
buildTutorial("examples/tutorial_2_customproto.md")
|
||||
buildTutorial("examples/tutorial_3_streamproto.md")
|
||||
buildTutorial("examples/tutorial_4_streamproto2.md")
|
||||
|
Loading…
x
Reference in New Issue
Block a user