From 9706590149656aac0d7324f27a38634765d59cfb Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 15 Mar 2017 16:24:31 +0100 Subject: [PATCH] Add example on how to use multicodecs to encode/decode data on a stream License: MIT Signed-off-by: Hector Sanjuan --- .gitignore | 1 + .travis.yml | 3 +- Makefile | 4 + examples/multicodecs/README.md | 39 +++++++ examples/multicodecs/main.go | 181 +++++++++++++++++++++++++++++++++ 5 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 examples/multicodecs/README.md create mode 100644 examples/multicodecs/main.go diff --git a/.gitignore b/.gitignore index 3fe82efc..cd7b1cea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.swp examples/echo/echo +examples/multicodecs/multicodecs diff --git a/.travis.yml b/.travis.yml index 22556c24..386f384f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,8 @@ install: true before_install: - make deps - + - go get -u github.com/multiformats/go-multicodec + - go get -u github.com/jbenet/go-msgio #- go vet ./... script: - go test ./... -v diff --git a/Makefile b/Makefile index 0344fadd..910bcd34 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,10 @@ gx: go get github.com/whyrusleeping/gx go get github.com/whyrusleeping/gx-go +deps-examples: deps + go get -u github.com/multiformats/go-multicodec + go get -u github.com/jbenet/go-msgio + deps: gx gx --verbose install --global gx-go rewrite diff --git a/examples/multicodecs/README.md b/examples/multicodecs/README.md new file mode 100644 index 00000000..bd1229f4 --- /dev/null +++ b/examples/multicodecs/README.md @@ -0,0 +1,39 @@ +# Using multicodecs with LibP2P + +This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams. + +Multicodecs present a common interface, making it very easy to swap the codec implementation if needed. + +This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo). + +## Build + +From `go-libp2p` base folder: + +``` +> make deps-examples +> go build ./examples/multicodecs +``` + +## Usage + +``` +> ./multicodecs + +``` + +## Details + +The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example. + +Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example: + +``` +{ + "Msg": "This is the message", + "Index": 3, + "HangUp": false +} +``` + +The stream lasts until one of the sides closes it when the HangUp field is `true`. diff --git a/examples/multicodecs/main.go b/examples/multicodecs/main.go new file mode 100644 index 00000000..86c069b2 --- /dev/null +++ b/examples/multicodecs/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "log" + "math/rand" + "time" + + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + ps "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" + ma "github.com/multiformats/go-multiaddr" + + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + multicodec "github.com/multiformats/go-multicodec" + json "github.com/multiformats/go-multicodec/json" +) + +const proto = "/example/1.0.0" + +// Message is a serializable/encodable object that we will send +// on a Stream. +type Message struct { + Msg string + Index int + HangUp bool +} + +// streamWrap wraps a libp2p stream. We encode/decode whenever we +// write/read from a stream, so we can just carry the encoders +// and bufios with us +type WrappedStream struct { + stream inet.Stream + enc multicodec.Encoder + dec multicodec.Decoder + w *bufio.Writer + r *bufio.Reader +} + +// wrapStream takes a stream and complements it with r/w bufios and +// decoder/encoder. In order to write raw data to the stream we can use +// wrap.w.Write(). To encode something into it we can wrap.enc.Encode(). +// Finally, we should wrap.w.Flush() to actually send the data. Handling +// incoming data works similarly with wrap.r.Read() for raw-reading and +// wrap.dec.Decode() to decode. +func WrapStream(s inet.Stream) *WrappedStream { + reader := bufio.NewReader(s) + writer := bufio.NewWriter(s) + // This is where we pick our specific multicodec. In order to change the + // codec, we only need to change this place. + // See https://godoc.org/github.com/multiformats/go-multicodec/json + dec := json.Multicodec(false).Decoder(reader) + enc := json.Multicodec(false).Encoder(writer) + return &WrappedStream{ + stream: s, + r: reader, + w: writer, + enc: enc, + dec: dec, + } +} + +// messages that will be sent between the hosts. +var conversationMsgs = []string{ + "Hello!", + "Hey!", + "How are you doing?", + "Very good! It is great that you can send data on a stream to me!", + "Not only that, the data is encoded in a JSON object.", + "Yeah, and we are using the multicodecs interface to encode and decode.", + "This way we could swap it easily for, say, cbor, or msgpack!", + "Let's leave that as an excercise for the reader...", + "Agreed, our last message should activate the HangUp flag", + "Yes, and the example code will close streams. So sad :(. Bye!", +} + +func makeRandomHost(port int) host.Host { + // Ignoring most errors for brevity + // See echo example for more details and better implementation + priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) + pid, _ := peer.IDFromPublicKey(pub) + listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + ps := ps.NewPeerstore() + ps.AddPrivKey(pid, priv) + ps.AddPubKey(pid, pub) + n, _ := swarm.NewNetwork(context.Background(), + []ma.Multiaddr{listen}, pid, ps, nil) + return bhost.New(n) +} + +func main() { + // Choose random ports between 10000-10100 + rand.Seed(666) + port1 := rand.Intn(100) + 10000 + port2 := port1 + 1 + + // Make 2 hosts + h1 := makeRandomHost(port1) + h2 := makeRandomHost(port2) + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL) + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL) + + log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID()) + + // Define a stream handler for host number 2 + h2.SetStreamHandler(proto, func(stream inet.Stream) { + log.Printf("%s: Received a stream", h2.ID()) + wrappedStream := WrapStream(stream) + defer stream.Close() + handleStream(wrappedStream) + }) + + // Create new stream from h1 to h2 and start the conversation + stream, err := h1.NewStream(context.Background(), h2.ID(), proto) + if err != nil { + log.Fatal(err) + } + wrappedStream := WrapStream(stream) + // This sends the first message + sendMessage(0, wrappedStream) + // We keep the conversation on the created stream so we launch + // this to handle any responses + handleStream(wrappedStream) + // When we are done, close the stream on our side and exit. + stream.Close() +} + +// receiveMessage reads and decodes a message from the stream +func receiveMessage(ws *WrappedStream) (*Message, error) { + var msg Message + err := ws.dec.Decode(&msg) + if err != nil { + return nil, err + } + return &msg, nil +} + +// sendMessage encodes and writes a message to the stream +func sendMessage(index int, ws *WrappedStream) error { + msg := &Message{ + Msg: conversationMsgs[index], + Index: index, + HangUp: index >= len(conversationMsgs)-1, + } + + err := ws.enc.Encode(msg) + // Because output is buffered with bufio, we need to flush! + ws.w.Flush() + return err +} + +// handleStream is a for loop which receives and then sends a message +// an artificial delay of 500ms happens in-between. +// When Message.HangUp is true, it exists. This will close the stream +// on one of the sides. The other side's receiveMessage() will error +// with EOF, thus also breaking out from the loop. +func handleStream(ws *WrappedStream) { + for { + // Read + msg, err := receiveMessage(ws) + if err != nil { + break + } + pid := ws.stream.Conn().LocalPeer() + log.Printf("%s says: %s\n", pid, msg.Msg) + time.Sleep(500 * time.Millisecond) + if msg.HangUp { + break + } + // Send response + err = sendMessage(msg.Index+1, ws) + if err != nil { + break + } + } +}