diff --git a/client.go b/client.go new file mode 100644 index 0000000..2d20bbc --- /dev/null +++ b/client.go @@ -0,0 +1,191 @@ +package rendezvous + +import ( + "context" + "fmt" + "time" + + pb "github.com/libp2p/go-libp2p-rendezvous/pb" + + ggio "github.com/gogo/protobuf/io" + logging "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +var log = logging.Logger("rendezvous") + +type Rendezvous interface { + Register(ctx context.Context, ns string, ttl int) error + Unregister(ctx context.Context, ns string) error + Discover(ctx context.Context, ns string, limit int) ([]pstore.PeerInfo, error) + DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) +} + +func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { + return &client{ + host: host, + rp: rp, + } +} + +type client struct { + host host.Host + rp peer.ID +} + +func (cli *client) Register(ctx context.Context, ns string, ttl int) error { + s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) + if err != nil { + return err + } + defer s.Close() + + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(s) + + req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) + err = w.WriteMsg(req) + if err != nil { + return err + } + + var res pb.Message + err = r.ReadMsg(&res) + if err != nil { + return err + } + + if res.GetType() != pb.Message_REGISTER_RESPONSE { + return fmt.Errorf("Unexpected response: %s", res.GetType().String()) + } + + status := res.GetRegisterResponse().GetStatus() + if status != pb.Message_OK { + return fmt.Errorf("Registration failure: %s", status.String()) + } + + return nil +} + +func (cli *client) Unregister(ctx context.Context, ns string) error { + s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) + if err != nil { + return err + } + defer s.Close() + + w := ggio.NewDelimitedWriter(s) + req := newUnregisterMessage(ns, cli.host.ID()) + return w.WriteMsg(req) +} + +func (cli *client) Discover(ctx context.Context, ns string, limit int) ([]pstore.PeerInfo, error) { + s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) + if err != nil { + return nil, err + } + defer s.Close() + + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(s) + + req := newDiscoverMessage(ns, limit) + err = w.WriteMsg(req) + if err != nil { + return nil, err + } + + var res pb.Message + err = r.ReadMsg(&res) + if err != nil { + return nil, err + } + + if res.GetType() != pb.Message_DISCOVER_RESPONSE { + return nil, fmt.Errorf("Unexpected response: %s", res.GetType().String()) + } + + regs := res.GetDiscoverResponse().GetRegistrations() + pinfos := make([]pstore.PeerInfo, 0, len(regs)) + for _, reg := range regs { + pi, err := pbToPeerInfo(reg.GetPeer()) + if err != nil { + log.Errorf("Invalid peer info: %s", err.Error()) + continue + } + pinfos = append(pinfos, pi) + } + + return pinfos, nil +} + +func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) { + s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) + if err != nil { + return nil, err + } + + ch := make(chan pstore.PeerInfo) + go doDiscovery(ctx, ns, s, ch) + return ch, nil +} + +func doDiscovery(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) { + defer s.Close() + defer close(ch) + + const batch = 100 + + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(s) + + req := newDiscoverMessage(ns, batch) + + for { + err := w.WriteMsg(req) + if err != nil { + log.Errorf("Error sending Discover request: %s", err.Error()) + return + } + + var res pb.Message + err = r.ReadMsg(&res) + if err != nil { + log.Errorf("Error reading discover response: %s", err.Error()) + return + } + + if res.GetType() != pb.Message_DISCOVER_RESPONSE { + log.Errorf("Unexpected response: %s", res.GetType().String()) + return + } + + regs := res.GetDiscoverResponse().GetRegistrations() + for _, reg := range regs { + pinfo, err := pbToPeerInfo(reg.GetPeer()) + if err != nil { + log.Errorf("Invalid peer info: %s", err.Error()) + continue + } + + select { + case ch <- pinfo: + case <-ctx.Done(): + return + } + } + + req.Discover.Cookie = res.GetDiscoverResponse().GetCookie() + + if len(regs) < batch { + select { + case <-time.After(1 * time.Minute): + case <-ctx.Done(): + return + } + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..cf8f9f1 --- /dev/null +++ b/package.json @@ -0,0 +1,63 @@ +{ + "author": "vyzo", + "bugs": {}, + "gx": { + "dvcsimport": "github.com/libp2p/go-libp2p-rendezvous" + }, + "gxDependencies": [ + { + "author": "whyrusleeping", + "hash": "QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D", + "name": "go-libp2p-host", + "version": "2.1.7" + }, + { + "author": "whyrusleeping", + "hash": "QmXoz9o2PT3tEzf7hicegwex5UgVP54n3k82K7jrWFyN86", + "name": "go-libp2p-net", + "version": "2.0.7" + }, + { + "author": "whyrusleeping", + "hash": "QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74", + "name": "go-libp2p-peer", + "version": "2.3.2" + }, + { + "author": "whyrusleeping", + "hash": "QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh", + "name": "go-libp2p-peerstore", + "version": "1.4.17" + }, + { + "author": "whyrusleeping", + "hash": "QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN", + "name": "go-libp2p-protocol", + "version": "1.0.0" + }, + { + "author": "multiformats", + "hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb", + "name": "go-multiaddr", + "version": "1.2.6" + }, + { + "hash": "QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7", + "name": "go-log", + "version": "1.4.1" + }, + { + "author": "whyrusleeping", + "hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV", + "name": "gogo-protobuf", + "version": "0.0.0" + } + ], + "gxVersion": "0.12.1", + "language": "go", + "license": "MIT", + "name": "go-libp2p-rendezvous", + "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", + "version": "0.0.0" +} + diff --git a/proto.go b/proto.go new file mode 100644 index 0000000..9021c5c --- /dev/null +++ b/proto.go @@ -0,0 +1,77 @@ +package rendezvous + +import ( + pb "github.com/libp2p/go-libp2p-rendezvous/pb" + + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + protocol "github.com/libp2p/go-libp2p-protocol" + ma "github.com/multiformats/go-multiaddr" +) + +const ( + RendezvousProto = protocol.ID("/rendezvous/1.0.0") +) + +func newRegisterMessage(ns string, pi pstore.PeerInfo, ttl int) *pb.Message { + msg := new(pb.Message) + msg.Type = pb.Message_REGISTER.Enum() + msg.Register = new(pb.Message_Register) + if ns != "" { + msg.Register.Ns = &ns + } + if ttl > 0 { + ttl64 := int64(ttl) + msg.Register.Ttl = &ttl64 + } + msg.Register.Peer = new(pb.Message_PeerInfo) + msg.Register.Peer.Id = []byte(pi.ID) + msg.Register.Peer.Addrs = make([][]byte, len(pi.Addrs)) + for i, addr := range pi.Addrs { + msg.Register.Peer.Addrs[i] = addr.Bytes() + } + return msg +} + +func newUnregisterMessage(ns string, pid peer.ID) *pb.Message { + msg := new(pb.Message) + msg.Type = pb.Message_UNREGISTER.Enum() + msg.Unregister = new(pb.Message_Unregister) + if ns != "" { + msg.Unregister.Ns = &ns + } + msg.Unregister.Id = []byte(pid) + return msg +} + +func newDiscoverMessage(ns string, limit int) *pb.Message { + msg := new(pb.Message) + msg.Type = pb.Message_DISCOVER.Enum() + msg.Discover = new(pb.Message_Discover) + if ns != "" { + msg.Discover.Ns = &ns + } + if limit > 0 { + limit64 := int64(limit) + msg.Discover.Limit = &limit64 + } + return msg +} + +func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) { + id, err := peer.IDFromBytes(p.Id) + if err != nil { + return pstore.PeerInfo{}, err + } + addrs := make([]ma.Multiaddr, 0, len(p.Addrs)) + for _, bs := range p.Addrs { + addr, err := ma.NewMultiaddrBytes(bs) + if err != nil { + log.Errorf("Error parsing multiaddr: %s", err.Error()) + continue + } + addrs = append(addrs, addr) + } + + return pstore.PeerInfo{ID: id, Addrs: addrs}, nil +}