client implementation

This commit is contained in:
vyzo 2018-04-19 20:18:42 +03:00
parent f946163e88
commit 0cbcbf6e24
3 changed files with 331 additions and 0 deletions

191
client.go Normal file
View File

@ -0,0 +1,191 @@
package rendezvous
import (
"context"
"fmt"
"time"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
var log = logging.Logger("rendezvous")
type Rendezvous interface {
Register(ctx context.Context, ns string, ttl int) error
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int) ([]pstore.PeerInfo, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error)
}
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous {
return &client{
host: host,
rp: rp,
}
}
type client struct {
host host.Host
rp peer.ID
}
func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(s)
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl)
err = w.WriteMsg(req)
if err != nil {
return err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return err
}
if res.GetType() != pb.Message_REGISTER_RESPONSE {
return fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
status := res.GetRegisterResponse().GetStatus()
if status != pb.Message_OK {
return fmt.Errorf("Registration failure: %s", status.String())
}
return nil
}
func (cli *client) Unregister(ctx context.Context, ns string) error {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
w := ggio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, cli.host.ID())
return w.WriteMsg(req)
}
func (cli *client) Discover(ctx context.Context, ns string, limit int) ([]pstore.PeerInfo, error) {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(s)
req := newDiscoverMessage(ns, limit)
err = w.WriteMsg(req)
if err != nil {
return nil, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return nil, err
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
return nil, fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
regs := res.GetDiscoverResponse().GetRegistrations()
pinfos := make([]pstore.PeerInfo, 0, len(regs))
for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer())
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
}
pinfos = append(pinfos, pi)
}
return pinfos, nil
}
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return nil, err
}
ch := make(chan pstore.PeerInfo)
go doDiscovery(ctx, ns, s, ch)
return ch, nil
}
func doDiscovery(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) {
defer s.Close()
defer close(ch)
const batch = 100
r := ggio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(s)
req := newDiscoverMessage(ns, batch)
for {
err := w.WriteMsg(req)
if err != nil {
log.Errorf("Error sending Discover request: %s", err.Error())
return
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
log.Errorf("Error reading discover response: %s", err.Error())
return
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
log.Errorf("Unexpected response: %s", res.GetType().String())
return
}
regs := res.GetDiscoverResponse().GetRegistrations()
for _, reg := range regs {
pinfo, err := pbToPeerInfo(reg.GetPeer())
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
}
select {
case ch <- pinfo:
case <-ctx.Done():
return
}
}
req.Discover.Cookie = res.GetDiscoverResponse().GetCookie()
if len(regs) < batch {
select {
case <-time.After(1 * time.Minute):
case <-ctx.Done():
return
}
}
}
}

63
package.json Normal file
View File

@ -0,0 +1,63 @@
{
"author": "vyzo",
"bugs": {},
"gx": {
"dvcsimport": "github.com/libp2p/go-libp2p-rendezvous"
},
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D",
"name": "go-libp2p-host",
"version": "2.1.7"
},
{
"author": "whyrusleeping",
"hash": "QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86",
"name": "go-libp2p-net",
"version": "2.0.7"
},
{
"author": "whyrusleeping",
"hash": "QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74",
"name": "go-libp2p-peer",
"version": "2.3.2"
},
{
"author": "whyrusleeping",
"hash": "QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh",
"name": "go-libp2p-peerstore",
"version": "1.4.17"
},
{
"author": "whyrusleeping",
"hash": "QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN",
"name": "go-libp2p-protocol",
"version": "1.0.0"
},
{
"author": "multiformats",
"hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb",
"name": "go-multiaddr",
"version": "1.2.6"
},
{
"hash": "QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7",
"name": "go-log",
"version": "1.4.1"
},
{
"author": "whyrusleeping",
"hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV",
"name": "gogo-protobuf",
"version": "0.0.0"
}
],
"gxVersion": "0.12.1",
"language": "go",
"license": "MIT",
"name": "go-libp2p-rendezvous",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.0.0"
}

77
proto.go Normal file
View File

@ -0,0 +1,77 @@
package rendezvous
import (
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
ma "github.com/multiformats/go-multiaddr"
)
const (
RendezvousProto = protocol.ID("/rendezvous/1.0.0")
)
func newRegisterMessage(ns string, pi pstore.PeerInfo, ttl int) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_REGISTER.Enum()
msg.Register = new(pb.Message_Register)
if ns != "" {
msg.Register.Ns = &ns
}
if ttl > 0 {
ttl64 := int64(ttl)
msg.Register.Ttl = &ttl64
}
msg.Register.Peer = new(pb.Message_PeerInfo)
msg.Register.Peer.Id = []byte(pi.ID)
msg.Register.Peer.Addrs = make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
msg.Register.Peer.Addrs[i] = addr.Bytes()
}
return msg
}
func newUnregisterMessage(ns string, pid peer.ID) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_UNREGISTER.Enum()
msg.Unregister = new(pb.Message_Unregister)
if ns != "" {
msg.Unregister.Ns = &ns
}
msg.Unregister.Id = []byte(pid)
return msg
}
func newDiscoverMessage(ns string, limit int) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_DISCOVER.Enum()
msg.Discover = new(pb.Message_Discover)
if ns != "" {
msg.Discover.Ns = &ns
}
if limit > 0 {
limit64 := int64(limit)
msg.Discover.Limit = &limit64
}
return msg
}
func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) {
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return pstore.PeerInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, bs := range p.Addrs {
addr, err := ma.NewMultiaddrBytes(bs)
if err != nil {
log.Errorf("Error parsing multiaddr: %s", err.Error())
continue
}
addrs = append(addrs, addr)
}
return pstore.PeerInfo{ID: id, Addrs: addrs}, nil
}