Add example on how to use multicodecs to encode/decode data on a stream
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
7e11c6008b
commit
9706590149
|
@ -1,2 +1,3 @@
|
|||
*.swp
|
||||
examples/echo/echo
|
||||
examples/multicodecs/multicodecs
|
||||
|
|
|
@ -11,7 +11,8 @@ install: true
|
|||
|
||||
before_install:
|
||||
- make deps
|
||||
|
||||
- go get -u github.com/multiformats/go-multicodec
|
||||
- go get -u github.com/jbenet/go-msgio
|
||||
#- go vet ./...
|
||||
script:
|
||||
- go test ./... -v
|
||||
|
|
4
Makefile
4
Makefile
|
@ -2,6 +2,10 @@ gx:
|
|||
go get github.com/whyrusleeping/gx
|
||||
go get github.com/whyrusleeping/gx-go
|
||||
|
||||
deps-examples: deps
|
||||
go get -u github.com/multiformats/go-multicodec
|
||||
go get -u github.com/jbenet/go-msgio
|
||||
|
||||
deps: gx
|
||||
gx --verbose install --global
|
||||
gx-go rewrite
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
# Using multicodecs with LibP2P
|
||||
|
||||
This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams.
|
||||
|
||||
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed.
|
||||
|
||||
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo).
|
||||
|
||||
## Build
|
||||
|
||||
From `go-libp2p` base folder:
|
||||
|
||||
```
|
||||
> make deps-examples
|
||||
> go build ./examples/multicodecs
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
```
|
||||
> ./multicodecs
|
||||
|
||||
```
|
||||
|
||||
## Details
|
||||
|
||||
The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example.
|
||||
|
||||
Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example:
|
||||
|
||||
```
|
||||
{
|
||||
"Msg": "This is the message",
|
||||
"Index": 3,
|
||||
"HangUp": false
|
||||
}
|
||||
```
|
||||
|
||||
The stream lasts until one of the sides closes it when the HangUp field is `true`.
|
|
@ -0,0 +1,181 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
inet "github.com/libp2p/go-libp2p-net"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
ps "github.com/libp2p/go-libp2p-peerstore"
|
||||
swarm "github.com/libp2p/go-libp2p-swarm"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
|
||||
multicodec "github.com/multiformats/go-multicodec"
|
||||
json "github.com/multiformats/go-multicodec/json"
|
||||
)
|
||||
|
||||
const proto = "/example/1.0.0"
|
||||
|
||||
// Message is a serializable/encodable object that we will send
|
||||
// on a Stream.
|
||||
type Message struct {
|
||||
Msg string
|
||||
Index int
|
||||
HangUp bool
|
||||
}
|
||||
|
||||
// streamWrap wraps a libp2p stream. We encode/decode whenever we
|
||||
// write/read from a stream, so we can just carry the encoders
|
||||
// and bufios with us
|
||||
type WrappedStream struct {
|
||||
stream inet.Stream
|
||||
enc multicodec.Encoder
|
||||
dec multicodec.Decoder
|
||||
w *bufio.Writer
|
||||
r *bufio.Reader
|
||||
}
|
||||
|
||||
// wrapStream takes a stream and complements it with r/w bufios and
|
||||
// decoder/encoder. In order to write raw data to the stream we can use
|
||||
// wrap.w.Write(). To encode something into it we can wrap.enc.Encode().
|
||||
// Finally, we should wrap.w.Flush() to actually send the data. Handling
|
||||
// incoming data works similarly with wrap.r.Read() for raw-reading and
|
||||
// wrap.dec.Decode() to decode.
|
||||
func WrapStream(s inet.Stream) *WrappedStream {
|
||||
reader := bufio.NewReader(s)
|
||||
writer := bufio.NewWriter(s)
|
||||
// This is where we pick our specific multicodec. In order to change the
|
||||
// codec, we only need to change this place.
|
||||
// See https://godoc.org/github.com/multiformats/go-multicodec/json
|
||||
dec := json.Multicodec(false).Decoder(reader)
|
||||
enc := json.Multicodec(false).Encoder(writer)
|
||||
return &WrappedStream{
|
||||
stream: s,
|
||||
r: reader,
|
||||
w: writer,
|
||||
enc: enc,
|
||||
dec: dec,
|
||||
}
|
||||
}
|
||||
|
||||
// messages that will be sent between the hosts.
|
||||
var conversationMsgs = []string{
|
||||
"Hello!",
|
||||
"Hey!",
|
||||
"How are you doing?",
|
||||
"Very good! It is great that you can send data on a stream to me!",
|
||||
"Not only that, the data is encoded in a JSON object.",
|
||||
"Yeah, and we are using the multicodecs interface to encode and decode.",
|
||||
"This way we could swap it easily for, say, cbor, or msgpack!",
|
||||
"Let's leave that as an excercise for the reader...",
|
||||
"Agreed, our last message should activate the HangUp flag",
|
||||
"Yes, and the example code will close streams. So sad :(. Bye!",
|
||||
}
|
||||
|
||||
func makeRandomHost(port int) host.Host {
|
||||
// Ignoring most errors for brevity
|
||||
// See echo example for more details and better implementation
|
||||
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
|
||||
pid, _ := peer.IDFromPublicKey(pub)
|
||||
listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
|
||||
ps := ps.NewPeerstore()
|
||||
ps.AddPrivKey(pid, priv)
|
||||
ps.AddPubKey(pid, pub)
|
||||
n, _ := swarm.NewNetwork(context.Background(),
|
||||
[]ma.Multiaddr{listen}, pid, ps, nil)
|
||||
return bhost.New(n)
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Choose random ports between 10000-10100
|
||||
rand.Seed(666)
|
||||
port1 := rand.Intn(100) + 10000
|
||||
port2 := port1 + 1
|
||||
|
||||
// Make 2 hosts
|
||||
h1 := makeRandomHost(port1)
|
||||
h2 := makeRandomHost(port2)
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL)
|
||||
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL)
|
||||
|
||||
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID())
|
||||
|
||||
// Define a stream handler for host number 2
|
||||
h2.SetStreamHandler(proto, func(stream inet.Stream) {
|
||||
log.Printf("%s: Received a stream", h2.ID())
|
||||
wrappedStream := WrapStream(stream)
|
||||
defer stream.Close()
|
||||
handleStream(wrappedStream)
|
||||
})
|
||||
|
||||
// Create new stream from h1 to h2 and start the conversation
|
||||
stream, err := h1.NewStream(context.Background(), h2.ID(), proto)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wrappedStream := WrapStream(stream)
|
||||
// This sends the first message
|
||||
sendMessage(0, wrappedStream)
|
||||
// We keep the conversation on the created stream so we launch
|
||||
// this to handle any responses
|
||||
handleStream(wrappedStream)
|
||||
// When we are done, close the stream on our side and exit.
|
||||
stream.Close()
|
||||
}
|
||||
|
||||
// receiveMessage reads and decodes a message from the stream
|
||||
func receiveMessage(ws *WrappedStream) (*Message, error) {
|
||||
var msg Message
|
||||
err := ws.dec.Decode(&msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// sendMessage encodes and writes a message to the stream
|
||||
func sendMessage(index int, ws *WrappedStream) error {
|
||||
msg := &Message{
|
||||
Msg: conversationMsgs[index],
|
||||
Index: index,
|
||||
HangUp: index >= len(conversationMsgs)-1,
|
||||
}
|
||||
|
||||
err := ws.enc.Encode(msg)
|
||||
// Because output is buffered with bufio, we need to flush!
|
||||
ws.w.Flush()
|
||||
return err
|
||||
}
|
||||
|
||||
// handleStream is a for loop which receives and then sends a message
|
||||
// an artificial delay of 500ms happens in-between.
|
||||
// When Message.HangUp is true, it exists. This will close the stream
|
||||
// on one of the sides. The other side's receiveMessage() will error
|
||||
// with EOF, thus also breaking out from the loop.
|
||||
func handleStream(ws *WrappedStream) {
|
||||
for {
|
||||
// Read
|
||||
msg, err := receiveMessage(ws)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
pid := ws.stream.Conn().LocalPeer()
|
||||
log.Printf("%s says: %s\n", pid, msg.Msg)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if msg.HangUp {
|
||||
break
|
||||
}
|
||||
// Send response
|
||||
err = sendMessage(msg.Index+1, ws)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue