Added multi-protocol using protobufs example

This commit is contained in:
Aviv Eyal 2017-11-27 20:04:10 +02:00 committed by Steven Allen
parent 784f1b4a71
commit 380ae70ddc
7 changed files with 672 additions and 0 deletions

View File

@ -0,0 +1,38 @@
# Protocol Multiplexing using rpc-style multicodecs, protobufs with libp2p
This examples shows how to use multicodecs (i.e. protobufs) to encode and transmit information between LibP2P hosts using LibP2P Streams.
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed.
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo).
## Build
Compile the .proto files with the protobufs go compiler:
```
protoc --go_out=. ./p2p.proto
```
From `multipro` base source folder:
```
> go build
```
## Usage
```
> ./multipro
```
## Details
The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example.
Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example:
The stream lasts until one of the sides closes it when the HangUp field is `true`.

117
examples/multipro/echo.go Normal file
View File

@ -0,0 +1,117 @@
package main
import (
"bufio"
"context"
"fmt"
"log"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
uuid "github.com/google/uuid"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
"github.com/ipfs/go-ipfs/thirdparty/assert"
)
// pattern: /protocol-name/request-or-response-message/version
const echoRequest = "/echo/echoreq/0.0.1"
const echoResponse = "/echo/echoresp/0.0.1"
type EchoProtocol struct {
host host.Host // local host
requests map[string] *p2p.EchoRequest // used to access request data from response handlers
done chan bool // only for demo purposes to hold main from terminating
}
func NewEchoProtocol(host host.Host, done chan bool) *EchoProtocol {
e := EchoProtocol{host: host, requests: make(map[string]*p2p.EchoRequest), done: done}
host.SetStreamHandler(echoRequest, e.onEchoRequest)
host.SetStreamHandler(echoResponse, e.onEchoResponse)
return &e
}
// remote peer requests handler
func (e *EchoProtocol) onEchoRequest(s inet.Stream) {
// get request data
data := &p2p.EchoRequest{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
log.Fatal(err)
return
}
log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)
// send response to sender
log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
resp := &p2p.EchoResponse{
MessageData: NewMessageData(e.host.ID().String(), data.MessageData.Id, false),
Message: data.Message}
s, respErr := e.host.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse)
if respErr != nil {
log.Fatal(respErr)
return
}
ok := sendDataObject(resp, s)
if ok {
log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
}
}
// remote echo response handler
func (e *EchoProtocol) onEchoResponse(s inet.Stream) {
data := &p2p.EchoResponse{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
return
}
// locate request data and remove it if found
req, ok := e.requests[data.MessageData.Id]
if ok {
// remove request from map as we have processed it here
delete(e.requests, data.MessageData.Id)
} else {
log.Print("Failed to locate request data boject for response")
return
}
assert.True(req.Message == data.Message, nil, "Expected echo to respond with request message")
log.Printf("%s: Received Echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
e.done <- true
}
func (e *EchoProtocol) Echo(node *Node) bool {
log.Printf("%s: Sending echo to: %s....", e.host.ID(), node.host.ID())
// create message data
req := &p2p.EchoRequest{
MessageData: NewMessageData(e.host.ID().String(), uuid.New().String(), false),
Message: fmt.Sprintf("Echo from %s", e.host.ID())}
s, err := e.host.NewStream(context.Background(), node.host.ID(), echoRequest)
if err != nil {
log.Fatal(err)
return false
}
ok := sendDataObject(req, s)
if !ok {
return false
}
// store request so response handler has access to it
e.requests[req.MessageData.Id] = req
log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.host.ID(), node.host.ID(), req.MessageData.Id, req.Message)
return true
}

68
examples/multipro/main.go Normal file
View File

@ -0,0 +1,68 @@
package main
import (
"context"
"fmt"
"log"
"math/rand"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ps "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
swarm "gx/ipfs/QmU219N3jn7QadVCeBUqGnAkwoXoUomrCwDuVQVuL7PB5W/go-libp2p-swarm"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
crypto "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
)
// helper method - create a lib-p2p host to listen on a port
func makeRandomNode(port int, done chan bool) *Node {
// Ignoring most errors for brevity
// See echo example for more details and better implementation
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
pid, _ := peer.IDFromPublicKey(pub)
listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
peerStore := ps.NewPeerstore()
peerStore.AddPrivKey(pid, priv)
peerStore.AddPubKey(pid, pub)
n, _ := swarm.NewNetwork(context.Background(), []ma.Multiaddr{listen}, pid, peerStore, nil)
host := bhost.New(n)
return NewNode(host, done)
}
func main() {
// Choose random ports between 10000-10100
rand.Seed(666)
port1 := rand.Intn(100) + 10000
port2 := port1 + 1
done := make(chan bool, 1)
// Make 2 hosts
h1 := makeRandomNode(port1, done)
h2 := makeRandomNode(port2, done)
h1.host.Peerstore().AddAddrs(h2.host.ID(), h2.host.Addrs(), ps.PermanentAddrTTL)
h2.host.Peerstore().AddAddrs(h1.host.ID(), h1.host.Addrs(), ps.PermanentAddrTTL)
log.Printf("This is a conversation between %s and %s\n", h1.host.ID(), h2.host.ID())
// send a ping from h1 to h2
h1.pingProtocol.Ping(h2)
// pause main until pong was received and processed by h2
<-done
// send a ping from h1 to h2
h2.pingProtocol.Ping(h1)
<-done
h1.echoProtocol.Echo(h2)
<-done
h2.echoProtocol.Echo(h1)
<-done
}

58
examples/multipro/node.go Normal file
View File

@ -0,0 +1,58 @@
package main
import (
"bufio"
"log"
"time"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
)
// node version
const clientVersion = "go-p2p-node/0.0.1"
// helper method - writes a protobuf go data object to a network stream
// data - address of protobuf go data object to send
// s - network stream to write the data to
func sendDataObject(data interface{}, s inet.Stream) bool {
writer := bufio.NewWriter(s)
enc := protobufCodec.Multicodec(nil).Encoder(writer)
err := enc.Encode(data)
if err != nil {
log.Fatal(err)
return false
}
writer.Flush()
return true
}
// helper method - generate message data shared between all node's p2p protocols
// nodeId - message author id
// mesageId - unique for requests, copied from request for responses
func NewMessageData(nodeId string, messageId string, gossip bool) *p2p.MessageData {
return &p2p.MessageData{
ClientVersion: clientVersion,
NodeId: nodeId,
Timestamp: time.Now().Unix(),
Id: messageId,
Gossip: gossip}
}
// Node type - implements one or more p2p protocols
type Node struct {
host host.Host // lib-p2p host
pingProtocol *PingProtocol // ping protocol impl
echoProtocol *EchoProtocol // echp protocl imp
}
// create a new node with its supported protocols
func NewNode(host host.Host, done chan bool) *Node {
return &Node{host: host,
pingProtocol: NewPingProtocol(host, done),
echoProtocol: NewEchoProtocol(host, done)}
}

View File

@ -0,0 +1,222 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: p2p.proto
/*
Package protocols_p2p is a generated protocol buffer package.
It is generated from these files:
p2p.proto
It has these top-level messages:
MessageData
PingRequest
PingResponse
EchoRequest
EchoResponse
*/
package protocols_p2p
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// designed to be shared between all app protocols
type MessageData struct {
// shared between all requests
ClientVersion string `protobuf:"bytes,1,opt,name=clientVersion" json:"clientVersion,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp" json:"timestamp,omitempty"`
Id string `protobuf:"bytes,3,opt,name=id" json:"id,omitempty"`
Gossip bool `protobuf:"varint,4,opt,name=gossip" json:"gossip,omitempty"`
NodeId string `protobuf:"bytes,5,opt,name=nodeId" json:"nodeId,omitempty"`
Sign string `protobuf:"bytes,6,opt,name=sign" json:"sign,omitempty"`
}
func (m *MessageData) Reset() { *m = MessageData{} }
func (m *MessageData) String() string { return proto.CompactTextString(m) }
func (*MessageData) ProtoMessage() {}
func (*MessageData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *MessageData) GetClientVersion() string {
if m != nil {
return m.ClientVersion
}
return ""
}
func (m *MessageData) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *MessageData) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *MessageData) GetGossip() bool {
if m != nil {
return m.Gossip
}
return false
}
func (m *MessageData) GetNodeId() string {
if m != nil {
return m.NodeId
}
return ""
}
func (m *MessageData) GetSign() string {
if m != nil {
return m.Sign
}
return ""
}
// a protocol define a set of reuqest and responses
type PingRequest struct {
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"`
// method specific data
Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
}
func (m *PingRequest) Reset() { *m = PingRequest{} }
func (m *PingRequest) String() string { return proto.CompactTextString(m) }
func (*PingRequest) ProtoMessage() {}
func (*PingRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *PingRequest) GetMessageData() *MessageData {
if m != nil {
return m.MessageData
}
return nil
}
func (m *PingRequest) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
type PingResponse struct {
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"`
// response specific data
Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
}
func (m *PingResponse) Reset() { *m = PingResponse{} }
func (m *PingResponse) String() string { return proto.CompactTextString(m) }
func (*PingResponse) ProtoMessage() {}
func (*PingResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *PingResponse) GetMessageData() *MessageData {
if m != nil {
return m.MessageData
}
return nil
}
func (m *PingResponse) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
// a protocol define a set of reuqest and responses
type EchoRequest struct {
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"`
// method specific data
Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
}
func (m *EchoRequest) Reset() { *m = EchoRequest{} }
func (m *EchoRequest) String() string { return proto.CompactTextString(m) }
func (*EchoRequest) ProtoMessage() {}
func (*EchoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *EchoRequest) GetMessageData() *MessageData {
if m != nil {
return m.MessageData
}
return nil
}
func (m *EchoRequest) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
type EchoResponse struct {
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"`
// response specific data
Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
}
func (m *EchoResponse) Reset() { *m = EchoResponse{} }
func (m *EchoResponse) String() string { return proto.CompactTextString(m) }
func (*EchoResponse) ProtoMessage() {}
func (*EchoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *EchoResponse) GetMessageData() *MessageData {
if m != nil {
return m.MessageData
}
return nil
}
func (m *EchoResponse) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
func init() {
proto.RegisterType((*MessageData)(nil), "protocols.p2p.MessageData")
proto.RegisterType((*PingRequest)(nil), "protocols.p2p.PingRequest")
proto.RegisterType((*PingResponse)(nil), "protocols.p2p.PingResponse")
proto.RegisterType((*EchoRequest)(nil), "protocols.p2p.EchoRequest")
proto.RegisterType((*EchoResponse)(nil), "protocols.p2p.EchoResponse")
}
func init() { proto.RegisterFile("p2p.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 244 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x8f, 0x31, 0x4f, 0xc3, 0x30,
0x10, 0x85, 0xe5, 0xb4, 0x04, 0x72, 0xa6, 0x0c, 0x37, 0x20, 0x0b, 0x31, 0x44, 0x11, 0x43, 0xa6,
0x0c, 0x61, 0x65, 0x84, 0x81, 0x01, 0x09, 0x79, 0x60, 0x0f, 0xc9, 0x11, 0x2c, 0x35, 0xb6, 0xe9,
0x99, 0x9f, 0xc4, 0xff, 0x44, 0x5c, 0x83, 0xda, 0xfe, 0x80, 0x76, 0xb2, 0xdf, 0xd3, 0xb3, 0xdf,
0xfb, 0xa0, 0x88, 0x6d, 0x6c, 0xe2, 0x26, 0xa4, 0x80, 0x2b, 0x39, 0xfa, 0xb0, 0xe6, 0x26, 0xb6,
0xb1, 0xfa, 0x51, 0xa0, 0x5f, 0x88, 0xb9, 0x1b, 0xe9, 0xb1, 0x4b, 0x1d, 0xde, 0xc1, 0xaa, 0x5f,
0x3b, 0xf2, 0xe9, 0x8d, 0x36, 0xec, 0x82, 0x37, 0xaa, 0x54, 0x75, 0x61, 0x0f, 0x4d, 0xbc, 0x85,
0x22, 0xb9, 0x89, 0x38, 0x75, 0x53, 0x34, 0x59, 0xa9, 0xea, 0x85, 0xdd, 0x19, 0x78, 0x05, 0x99,
0x1b, 0xcc, 0x42, 0x1e, 0x66, 0x6e, 0xc0, 0x6b, 0xc8, 0xc7, 0xc0, 0xec, 0xa2, 0x59, 0x96, 0xaa,
0xbe, 0xb0, 0xb3, 0xfa, 0xf3, 0x7d, 0x18, 0xe8, 0x79, 0x30, 0x67, 0x92, 0x9d, 0x15, 0x22, 0x2c,
0xd9, 0x8d, 0xde, 0xe4, 0xe2, 0xca, 0xbd, 0x22, 0xd0, 0xaf, 0xce, 0x8f, 0x96, 0xbe, 0xbe, 0x89,
0x13, 0x3e, 0x80, 0x9e, 0x76, 0xab, 0x65, 0xa4, 0x6e, 0x6f, 0x9a, 0x03, 0xb6, 0x66, 0x8f, 0xcb,
0xee, 0xc7, 0xd1, 0xc0, 0xf9, 0x2c, 0x65, 0x7c, 0x61, 0xff, 0x65, 0xf5, 0x01, 0x97, 0xdb, 0x1a,
0x8e, 0xc1, 0x33, 0x1d, 0xad, 0x87, 0x40, 0x3f, 0xf5, 0x9f, 0xe1, 0x04, 0x38, 0xdb, 0x9a, 0xe3,
0xe2, 0xbc, 0xe7, 0xf2, 0xc3, 0xfd, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb9, 0xf8, 0x88, 0xca,
0x68, 0x02, 0x00, 0x00,
}

View File

@ -0,0 +1,52 @@
syntax = "proto3";
package protocols.p2p;
// designed to be shared between all app protocols
message MessageData {
// shared between all requests
string clientVersion = 1; // client version
int64 timestamp = 2; // unix time
string id = 3; // allows requesters to match response with a request
bool gossip = 4; // true to have receiver peer gossip the message to neighbors
string nodeId = 5; // id of node that created the message (not the one that may have relayed it)
string sign = 6; // signature of message data + method specific data by message authoring node
}
//// ping protocol
// a protocol define a set of reuqest and responses
message PingRequest {
MessageData messageData = 1;
// method specific data
string message = 2;
// add any data here....
}
message PingResponse {
MessageData messageData = 1;
// response specific data
string message = 2;
// ... add any data here
}
//// echo protocol
// a protocol define a set of reuqest and responses
message EchoRequest {
MessageData messageData = 1;
// method specific data
string message = 2;
// add any data here....
}
message EchoResponse {
MessageData messageData = 1;
// response specific data
string message = 2;
// ... add any data here
}

117
examples/multipro/ping.go Normal file
View File

@ -0,0 +1,117 @@
package main
import (
"bufio"
"context"
"fmt"
"log"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
uuid "github.com/google/uuid"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
)
// pattern: /protocol-name/request-or-response-message/version
const pingRequest = "/ping/pingreq/0.0.1"
const pingResponse = "/ping/pingresp/0.0.1"
// PingProtocol type
type PingProtocol struct {
host host.Host // local host
requests map[string]*p2p.PingRequest // used to access request data from response handlers
done chan bool // only for demo purposes to hold main from terminating
}
func NewPingProtocol(host host.Host, done chan bool) *PingProtocol {
p := PingProtocol{host: host, requests: make(map[string]*p2p.PingRequest), done: done}
host.SetStreamHandler(pingRequest, p.onPingRequest)
host.SetStreamHandler(pingResponse, p.onPingResponse)
return &p
}
// remote peer requests handler
func (p *PingProtocol) onPingRequest(s inet.Stream) {
// get request data
data := &p2p.PingRequest{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
log.Fatal(err)
return
}
log.Printf("%s: Received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)
// send response to sender
log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
resp := &p2p.PingResponse{
MessageData: NewMessageData(p.host.ID().String(), data.MessageData.Id, false),
Message: fmt.Sprintf("Ping response from %s", p.host.ID()),
}
s, respErr := p.host.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse)
if respErr != nil {
log.Fatal(respErr)
return
}
ok := sendDataObject(resp, s)
if ok {
log.Printf("%s: Ping response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
}
}
// remote ping response handler
func (p *PingProtocol) onPingResponse(s inet.Stream) {
data := &p2p.PingResponse{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
return
}
// locate request data and remove it if found
_, ok := p.requests[data.MessageData.Id]
if ok {
// remove request from map as we have processed it here
delete(p.requests, data.MessageData.Id)
} else {
log.Print("Failed to locate request data boject for response")
return
}
log.Printf("%s: Received ping response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
p.done <- true
}
func (p *PingProtocol) Ping(node *Node) bool {
log.Printf("%s: Sending ping to: %s....", p.host.ID(), node.host.ID())
// create message data
req := &p2p.PingRequest{
MessageData: NewMessageData(p.host.ID().String(), uuid.New().String(), false),
Message: fmt.Sprintf("Ping from %s", p.host.ID()),
}
s, err := p.host.NewStream(context.Background(), node.host.ID(), pingRequest)
if err != nil {
log.Fatal(err)
return false
}
ok := sendDataObject(req, s)
if !ok {
return false
}
// store request so response handler has access to it
p.requests[req.MessageData.Id] = req
log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.host.ID(), node.host.ID(), req.MessageData.Id, req.Message)
return true
}