Merge remote-tracking branch 'origin/implement-spec' into master

* origin/implement-spec: (47 commits)
  replaced sync.Map with map + RW mutex. small refactors
  fixed compile error from previous commit + code refactoring
  RendezvousPoint and RendezvousClient now return the server's TTL on Registered records. Default TTL for discovery client increased discovery client now utilizes server cookie for added efficiency
  Add stateful discovery client
  Switched from gx to go mod and started using go-libp2p-core interfaces
  update tests
  expose counter in register interface
  update gx deps
  include ttl in registration response
  update protobuf
  two interfaces for client-side: RendezvousPoint and RendezvousClient
  don't leak database error details in internal errors
  refactor database interface and implementation into db subpackage
  client: add TODO for robust discovery error recovery
  use randomized exponential backoff in error retry for persistent client registrations
  test client specific functionality
  make db nonce 32 bytes
  test service errors
  basic service test
  test db functionality with multiple namespaces
  ...
This commit is contained in:
Guilhem Fanton 2020-10-16 10:21:07 +02:00
commit 90a910542f
14 changed files with 5051 additions and 0 deletions

311
client.go Normal file
View File

@ -0,0 +1,311 @@
package rendezvous
import (
"context"
"fmt"
"math/rand"
"time"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
var (
DiscoverAsyncInterval = 2 * time.Minute
)
type RendezvousPoint interface {
Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
}
type Registration struct {
Peer peer.AddrInfo
Ns string
Ttl int
}
type RendezvousClient interface {
Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
}
func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint {
return &rendezvousPoint{
host: host,
p: p,
}
}
type rendezvousPoint struct {
host host.Host
p peer.ID
}
func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp))
}
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient {
return &rendezvousClient{rp: rp}
}
type rendezvousClient struct {
rp RendezvousPoint
}
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return 0, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
req := newRegisterMessage(ns, peer.AddrInfo{ID: rp.host.ID(), Addrs: rp.host.Addrs()}, ttl)
err = w.WriteMsg(req)
if err != nil {
return 0, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return 0, err
}
if res.GetType() != pb.Message_REGISTER_RESPONSE {
return 0, fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
response := res.GetRegisterResponse()
status := response.GetStatus()
if status != pb.Message_OK {
return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()}
}
return time.Duration(*response.Ttl) * time.Second, nil
}
func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
if ttl < 120 {
return 0, fmt.Errorf("registration TTL is too short")
}
returnedTTL, err := rc.rp.Register(ctx, ns, ttl)
if err != nil {
return 0, err
}
go registerRefresh(ctx, rc.rp, ns, ttl)
return returnedTTL, nil
}
func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int) {
var refresh time.Duration
errcount := 0
for {
if errcount > 0 {
// do randomized exponential backoff, up to ~4 hours
if errcount > 7 {
errcount = 7
}
backoff := 2 << uint(errcount)
refresh = 5*time.Minute + time.Duration(rand.Intn(backoff*60000))*time.Millisecond
} else {
refresh = time.Duration(ttl-30) * time.Second
}
select {
case <-time.After(refresh):
case <-ctx.Done():
return
}
_, err := rz.Register(ctx, ns, ttl)
if err != nil {
log.Errorf("Error registering [%s]: %s", ns, err.Error())
errcount++
} else {
errcount = 0
}
}
}
func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
w := ggio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, rp.host.ID())
return w.WriteMsg(req)
}
func (rc *rendezvousClient) Unregister(ctx context.Context, ns string) error {
return rc.rp.Unregister(ctx, ns)
}
func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
return discoverQuery(ns, limit, cookie, r, w)
}
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) {
req := newDiscoverMessage(ns, limit, cookie)
err := w.WriteMsg(req)
if err != nil {
return nil, nil, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return nil, nil, err
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverResponse().GetStatus()
if status != pb.Message_OK {
return nil, nil, RendezvousError{Status: status, Text: res.GetDiscoverResponse().GetStatusText()}
}
regs := res.GetDiscoverResponse().GetRegistrations()
result := make([]Registration, 0, len(regs))
for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer())
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
}
result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())})
}
return result, res.GetDiscoverResponse().GetCookie(), nil
}
func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, err
}
ch := make(chan Registration)
go discoverAsync(ctx, ns, s, ch)
return ch, nil
}
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) {
defer s.Close()
defer close(ch)
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
const batch = 200
var (
cookie []byte
regs []Registration
err error
)
for {
regs, cookie, err = discoverQuery(ns, batch, cookie, r, w)
if err != nil {
// TODO robust error recovery
// - handle closed streams with backoff + new stream, preserving the cookie
// - handle E_INVALID_COOKIE errors in that case to restart the discovery
log.Errorf("Error in discovery [%s]: %s", ns, err.Error())
return
}
for _, reg := range regs {
select {
case ch <- reg:
case <-ctx.Done():
return
}
}
if len(regs) < batch {
// TODO adaptive backoff for heavily loaded rendezvous points
select {
case <-time.After(DiscoverAsyncInterval):
case <-ctx.Done():
return
}
}
}
}
func (rc *rendezvousClient) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error) {
regs, cookie, err := rc.rp.Discover(ctx, ns, limit, cookie)
if err != nil {
return nil, nil, err
}
pinfos := make([]peer.AddrInfo, len(regs))
for i, reg := range regs {
pinfos[i] = reg.Peer
}
return pinfos, cookie, nil
}
func (rc *rendezvousClient) DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) {
rch, err := rc.rp.DiscoverAsync(ctx, ns)
if err != nil {
return nil, err
}
ch := make(chan peer.AddrInfo)
go discoverPeersAsync(ctx, rch, ch)
return ch, nil
}
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan peer.AddrInfo) {
defer close(ch)
for {
select {
case reg, ok := <-rch:
if !ok {
return
}
select {
case ch <- reg.Peer:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}

137
client_test.go Normal file
View File

@ -0,0 +1,137 @@
package rendezvous
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
)
func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient {
clients := make([]RendezvousClient, len(hosts)-1)
for i, host := range hosts[1:] {
clients[i] = NewRendezvousClient(host, hosts[0].ID())
}
return clients
}
func TestClientRegistrationAndDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getRendezvousHosts(t, ctx, 5)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousClients(t, hosts)
recordTTL, err := clients[0].Register(ctx, "foo1", DefaultTTL)
if err != nil {
t.Fatal(err)
}
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi, cookie, err := clients[0].Discover(ctx, "foo1", 0, nil)
if err != nil {
t.Fatal(err)
}
if len(pi) != 1 {
t.Fatal("Expected 1 peer")
}
checkPeerInfo(t, pi[0], hosts[1])
for i, client := range clients[1:] {
recordTTL, err = client.Register(ctx, "foo1", DefaultTTL)
if err != nil {
t.Fatal(err)
}
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie)
if err != nil {
t.Fatal(err)
}
if len(pi) != 1 {
t.Fatal("Expected 1 peer")
}
checkPeerInfo(t, pi[0], hosts[2+i])
}
for _, client := range clients[1:] {
pi, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil {
t.Fatal(err)
}
if len(pi) != 4 {
t.Fatal("Expected 4 registrations")
}
for j, p := range pi {
checkPeerInfo(t, p, hosts[1+j])
}
}
}
func TestClientRegistrationAndDiscoveryAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getRendezvousHosts(t, ctx, 5)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousClients(t, hosts)
DiscoverAsyncInterval = 1 * time.Second
ch, err := clients[0].DiscoverAsync(ctx, "foo1")
if err != nil {
t.Fatal(err)
}
for i, client := range clients[0:] {
recordTTL, err := client.Register(ctx, "foo1", DefaultTTL)
if err != nil {
t.Fatal(err)
}
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi := <-ch
checkPeerInfo(t, pi, hosts[1+i])
}
DiscoverAsyncInterval = 2 * time.Minute
}
func checkPeerInfo(t *testing.T, pi peer.AddrInfo, host host.Host) {
if pi.ID != host.ID() {
t.Fatal("bad registration: peer ID doesn't match host ID")
}
addrs := host.Addrs()
raddrs := pi.Addrs
if len(addrs) != len(raddrs) {
t.Fatal("bad registration: peer address length mismatch")
}
for i, addr := range addrs {
raddr := raddrs[i]
if !addr.Equal(raddr) {
t.Fatal("bad registration: peer address mismatch")
}
}
}

21
db/dbi.go Normal file
View File

@ -0,0 +1,21 @@
package dbi
import (
"github.com/libp2p/go-libp2p-core/peer"
)
type RegistrationRecord struct {
Id peer.ID
Addrs [][]byte
Ns string
Ttl int
}
type DB interface {
Close() error
Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error)
Unregister(p peer.ID, ns string) error
CountRegistrations(p peer.ID) (int, error)
Discover(ns string, cookie []byte, limit int) ([]RegistrationRecord, []byte, error)
ValidCookie(ns string, cookie []byte) bool
}

474
db/sqlite/db.go Normal file
View File

@ -0,0 +1,474 @@
package db
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"database/sql"
"encoding/binary"
"fmt"
"os"
"time"
dbi "github.com/libp2p/go-libp2p-rendezvous/db"
_ "github.com/mattn/go-sqlite3"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger("rendezvous/db")
type DB struct {
db *sql.DB
insertPeerRegistration *sql.Stmt
deletePeerRegistrations *sql.Stmt
deletePeerRegistrationsNs *sql.Stmt
countPeerRegistrations *sql.Stmt
selectPeerRegistrations *sql.Stmt
selectPeerRegistrationsNS *sql.Stmt
selectPeerRegistrationsC *sql.Stmt
selectPeerRegistrationsNSC *sql.Stmt
deleteExpiredRegistrations *sql.Stmt
getCounter *sql.Stmt
nonce []byte
cancel func()
}
func OpenDB(ctx context.Context, path string) (*DB, error) {
var create bool
if path == ":memory:" {
create = true
} else {
_, err := os.Stat(path)
switch {
case os.IsNotExist(err):
create = true
case err != nil:
return nil, err
}
}
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, err
}
if path == ":memory:" {
// this is necessary to avoid creating a new database on each connection
db.SetMaxOpenConns(1)
}
rdb := &DB{db: db}
if create {
err = rdb.prepareDB()
if err != nil {
db.Close()
return nil, err
}
} else {
err = rdb.loadNonce()
if err != nil {
db.Close()
return nil, err
}
}
err = rdb.prepareStmts()
if err != nil {
db.Close()
return nil, err
}
bgctx, cancel := context.WithCancel(ctx)
rdb.cancel = cancel
go rdb.background(bgctx)
return rdb, nil
}
func (db *DB) Close() error {
db.cancel()
return db.db.Close()
}
func (db *DB) prepareDB() error {
_, err := db.db.Exec("CREATE TABLE Registrations (counter INTEGER PRIMARY KEY AUTOINCREMENT, peer VARCHAR(64), ns VARCHAR, expire INTEGER, addrs VARBINARY)")
if err != nil {
return err
}
_, err = db.db.Exec("CREATE TABLE Nonce (nonce VARBINARY)")
if err != nil {
return err
}
nonce := make([]byte, 32)
_, err = rand.Read(nonce)
if err != nil {
return err
}
_, err = db.db.Exec("INSERT INTO Nonce VALUES (?)", nonce)
if err != nil {
return err
}
db.nonce = nonce
return nil
}
func (db *DB) loadNonce() error {
var nonce []byte
row := db.db.QueryRow("SELECT nonce FROM Nonce")
err := row.Scan(&nonce)
if err != nil {
return err
}
db.nonce = nonce
return nil
}
func (db *DB) prepareStmts() error {
stmt, err := db.db.Prepare("INSERT INTO Registrations VALUES (NULL, ?, ?, ?, ?)")
if err != nil {
return err
}
db.insertPeerRegistration = stmt
stmt, err = db.db.Prepare("DELETE FROM Registrations WHERE peer = ?")
if err != nil {
return err
}
db.deletePeerRegistrations = stmt
stmt, err = db.db.Prepare("DELETE FROM Registrations WHERE peer = ? AND ns = ?")
if err != nil {
return err
}
db.deletePeerRegistrationsNs = stmt
stmt, err = db.db.Prepare("SELECT COUNT(*) FROM Registrations WHERE peer = ?")
if err != nil {
return err
}
db.countPeerRegistrations = stmt
stmt, err = db.db.Prepare("SELECT * FROM Registrations WHERE expire > ? LIMIT ?")
if err != nil {
return err
}
db.selectPeerRegistrations = stmt
stmt, err = db.db.Prepare("SELECT * FROM Registrations WHERE ns = ? AND expire > ? LIMIT ?")
if err != nil {
return err
}
db.selectPeerRegistrationsNS = stmt
stmt, err = db.db.Prepare("SELECT * FROM Registrations WHERE counter > ? AND expire > ? LIMIT ?")
if err != nil {
return err
}
db.selectPeerRegistrationsC = stmt
stmt, err = db.db.Prepare("SELECT * FROM Registrations WHERE counter > ? AND ns = ? AND expire > ? LIMIT ?")
if err != nil {
return err
}
db.selectPeerRegistrationsNSC = stmt
stmt, err = db.db.Prepare("DELETE FROM Registrations WHERE expire < ?")
if err != nil {
return err
}
db.deleteExpiredRegistrations = stmt
stmt, err = db.db.Prepare("SELECT MAX(counter) FROM Registrations")
if err != nil {
return err
}
db.getCounter = stmt
return nil
}
func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error) {
pid := p.Pretty()
maddrs := packAddrs(addrs)
expire := time.Now().Unix() + int64(ttl)
tx, err := db.db.Begin()
if err != nil {
return 0, err
}
delOld := tx.Stmt(db.deletePeerRegistrationsNs)
insertNew := tx.Stmt(db.insertPeerRegistration)
getCounter := tx.Stmt(db.getCounter)
_, err = delOld.Exec(pid, ns)
if err != nil {
tx.Rollback()
return 0, err
}
_, err = insertNew.Exec(pid, ns, expire, maddrs)
if err != nil {
tx.Rollback()
return 0, err
}
var counter uint64
row := getCounter.QueryRow()
err = row.Scan(&counter)
if err != nil {
tx.Rollback()
return 0, err
}
err = tx.Commit()
return counter, err
}
func (db *DB) CountRegistrations(p peer.ID) (int, error) {
pid := p.Pretty()
row := db.countPeerRegistrations.QueryRow(pid)
var count int
err := row.Scan(&count)
return count, err
}
func (db *DB) Unregister(p peer.ID, ns string) error {
pid := p.Pretty()
var err error
if ns == "" {
_, err = db.deletePeerRegistrations.Exec(pid)
} else {
_, err = db.deletePeerRegistrationsNs.Exec(pid, ns)
}
return err
}
func (db *DB) Discover(ns string, cookie []byte, limit int) ([]dbi.RegistrationRecord, []byte, error) {
now := time.Now().Unix()
var (
counter int64
rows *sql.Rows
err error
)
if cookie != nil {
counter, err = unpackCookie(cookie)
if err != nil {
log.Errorf("error unpacking cookie: %s", err.Error())
return nil, nil, err
}
}
if counter > 0 {
if ns == "" {
rows, err = db.selectPeerRegistrationsC.Query(counter, now, limit)
} else {
rows, err = db.selectPeerRegistrationsNSC.Query(counter, ns, now, limit)
}
} else {
if ns == "" {
rows, err = db.selectPeerRegistrations.Query(now, limit)
} else {
rows, err = db.selectPeerRegistrationsNS.Query(ns, now, limit)
}
}
if err != nil {
log.Errorf("query error: %s", err.Error())
return nil, nil, err
}
defer rows.Close()
regs := make([]dbi.RegistrationRecord, 0, limit)
for rows.Next() {
var (
reg dbi.RegistrationRecord
rid string
rns string
expire int64
raddrs []byte
addrs [][]byte
p peer.ID
)
err = rows.Scan(&counter, &rid, &rns, &expire, &raddrs)
if err != nil {
log.Errorf("row scan error: %s", err.Error())
return nil, nil, err
}
p, err = peer.IDB58Decode(rid)
if err != nil {
log.Errorf("error decoding peer id: %s", err.Error())
continue
}
addrs, err := unpackAddrs(raddrs)
if err != nil {
log.Errorf("error unpacking address: %s", err.Error())
continue
}
reg.Id = p
reg.Addrs = addrs
reg.Ttl = int(expire - now)
if ns == "" {
reg.Ns = rns
}
regs = append(regs, reg)
}
err = rows.Err()
if err != nil {
return nil, nil, err
}
if counter > 0 {
cookie = packCookie(counter, ns, db.nonce)
}
return regs, cookie, nil
}
func (db *DB) ValidCookie(ns string, cookie []byte) bool {
return validCookie(cookie, ns, db.nonce)
}
func (db *DB) background(ctx context.Context) {
for {
db.cleanupExpired()
select {
case <-time.After(15 * time.Minute):
case <-ctx.Done():
return
}
}
}
func (db *DB) cleanupExpired() {
now := time.Now().Unix()
_, err := db.deleteExpiredRegistrations.Exec(now)
if err != nil {
log.Errorf("error deleting expired registrations: %s", err.Error())
}
}
func packAddrs(addrs [][]byte) []byte {
packlen := 0
for _, addr := range addrs {
packlen = packlen + 2 + len(addr)
}
packed := make([]byte, packlen)
buf := packed
for _, addr := range addrs {
binary.BigEndian.PutUint16(buf, uint16(len(addr)))
buf = buf[2:]
copy(buf, addr)
buf = buf[len(addr):]
}
return packed
}
func unpackAddrs(packed []byte) ([][]byte, error) {
var addrs [][]byte
buf := packed
for len(buf) > 1 {
l := binary.BigEndian.Uint16(buf)
buf = buf[2:]
if len(buf) < int(l) {
return nil, fmt.Errorf("bad packed address: not enough bytes %v %v", packed, buf)
}
addr := make([]byte, l)
copy(addr, buf[:l])
buf = buf[l:]
addrs = append(addrs, addr)
}
if len(buf) > 0 {
return nil, fmt.Errorf("bad packed address: unprocessed bytes: %v %v", packed, buf)
}
return addrs, nil
}
// cookie: counter:SHA256(nonce + ns + counter)
func packCookie(counter int64, ns string, nonce []byte) []byte {
cbits := make([]byte, 8)
binary.BigEndian.PutUint64(cbits, uint64(counter))
hash := sha256.New()
_, err := hash.Write(nonce)
if err != nil {
panic(err)
}
_, err = hash.Write([]byte(ns))
if err != nil {
panic(err)
}
_, err = hash.Write(cbits)
if err != nil {
panic(err)
}
return hash.Sum(cbits)
}
func unpackCookie(cookie []byte) (int64, error) {
if len(cookie) < 8 {
return 0, fmt.Errorf("bad packed cookie: not enough bytes: %v", cookie)
}
counter := binary.BigEndian.Uint64(cookie[:8])
return int64(counter), nil
}
func validCookie(cookie []byte, ns string, nonce []byte) bool {
if len(cookie) != 40 {
return false
}
cbits := cookie[:8]
hash := sha256.New()
_, err := hash.Write(nonce)
if err != nil {
panic(err)
}
_, err = hash.Write([]byte(ns))
if err != nil {
panic(err)
}
_, err = hash.Write(cbits)
if err != nil {
panic(err)
}
hbits := hash.Sum(nil)
return bytes.Equal(cookie[8:], hbits)
}

512
db/sqlite/db_test.go Normal file
View File

@ -0,0 +1,512 @@
package db
import (
"bytes"
"context"
"math/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
func TestPackAddrs(t *testing.T) {
addrs := make([][]byte, 5)
for i := 0; i < 5; i++ {
addrs[i] = make([]byte, rand.Intn(256))
}
packed := packAddrs(addrs)
unpacked, err := unpackAddrs(packed)
if err != nil {
t.Fatal(err)
}
if !equalAddrs(addrs, unpacked) {
t.Fatal("unpacked addr not equal to original")
}
}
func equalAddrs(addrs1, addrs2 [][]byte) bool {
if len(addrs1) != len(addrs2) {
return false
}
for i, addr1 := range addrs1 {
addr2 := addrs2[i]
if !bytes.Equal(addr1, addr2) {
return false
}
}
return true
}
func TestPackCookie(t *testing.T) {
nonce := make([]byte, 16)
_, err := rand.Read(nonce)
if err != nil {
t.Fatal(err)
}
counter := rand.Int63()
ns := "blah"
cookie := packCookie(counter, ns, nonce)
if !validCookie(cookie, ns, nonce) {
t.Fatal("packed an invalid cookie")
}
xcounter, err := unpackCookie(cookie)
if err != nil {
t.Fatal(err)
}
if counter != xcounter {
t.Fatal("unpacked cookie counter not equal to original")
}
}
func TestOpenCloseMemDB(t *testing.T) {
db, err := OpenDB(context.Background(), ":memory:")
if err != nil {
t.Fatal(err)
}
// let the flush goroutine run its cleanup act
time.Sleep(1 * time.Second)
err = db.Close()
if err != nil {
t.Fatal(err)
}
}
func TestOpenCloseFSDB(t *testing.T) {
db, err := OpenDB(context.Background(), "/tmp/rendezvous-test.db")
if err != nil {
t.Fatal(err)
}
nonce1 := db.nonce
// let the flush goroutine run its cleanup act
time.Sleep(1 * time.Second)
err = db.Close()
if err != nil {
t.Fatal(err)
}
db, err = OpenDB(context.Background(), "/tmp/rendezvous-test.db")
if err != nil {
t.Fatal(err)
}
nonce2 := db.nonce
// let the flush goroutine run its cleanup act
time.Sleep(1 * time.Second)
err = db.Close()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(nonce1, nonce2) {
t.Fatal("persistent db nonces are not equal")
}
}
func TestDBRegistrationAndDiscovery(t *testing.T) {
db, err := OpenDB(context.Background(), ":memory:")
if err != nil {
t.Fatal(err)
}
p1, err := peer.IDB58Decode("QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH")
if err != nil {
t.Fatal(err)
}
p2, err := peer.IDB58Decode("QmUkUQgxXeggyaD5Ckv8ZqfW8wHBX6cYyeiyqvVZYzq5Bi")
if err != nil {
t.Fatal(err)
}
addr1, err := ma.NewMultiaddr("/ip4/1.1.1.1/tcp/9999")
if err != nil {
t.Fatal(err)
}
addrs1 := [][]byte{addr1.Bytes()}
addr2, err := ma.NewMultiaddr("/ip4/2.2.2.2/tcp/9999")
if err != nil {
t.Fatal(err)
}
addrs2 := [][]byte{addr2.Bytes()}
// register p1 and do discovery
_, err = db.Register(p1, "foo1", addrs1, 60)
if err != nil {
t.Fatal(err)
}
count, err := db.CountRegistrations(p1)
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatal("registrations for p1 should be 1")
}
rrs, cookie, err := db.Discover("foo1", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("should have got 1 registration")
}
rr := rrs[0]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
// register p2 and do progressive discovery
_, err = db.Register(p2, "foo1", addrs2, 60)
if err != nil {
t.Fatal(err)
}
count, err = db.CountRegistrations(p2)
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatal("registrations for p2 should be 1")
}
rrs, cookie, err = db.Discover("foo1", cookie, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("should have got 1 registration")
}
rr = rrs[0]
if rr.Id != p2 {
t.Fatal("expected p2 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs2) {
t.Fatal("expected p2's addrs in registration")
}
// reregister p1 and do progressive discovery
_, err = db.Register(p1, "foo1", addrs1, 60)
if err != nil {
t.Fatal(err)
}
count, err = db.CountRegistrations(p1)
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatal("registrations for p1 should be 1")
}
rrs, cookie, err = db.Discover("foo1", cookie, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("should have got 1 registration")
}
rr = rrs[0]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
// do a full discovery
rrs, _, err = db.Discover("foo1", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 2 {
t.Fatal("should have got 2 registration")
}
rr = rrs[0]
if rr.Id != p2 {
t.Fatal("expected p2 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs2) {
t.Fatal("expected p2's addrs in registration")
}
rr = rrs[1]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
// unregister p2 and redo discovery
err = db.Unregister(p2, "foo1")
if err != nil {
t.Fatal(err)
}
count, err = db.CountRegistrations(p2)
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatal("registrations for p2 should be 0")
}
rrs, _, err = db.Discover("foo1", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("should have got 1 registration")
}
rr = rrs[0]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
db.Close()
}
func TestDBRegistrationAndDiscoveryMultipleNS(t *testing.T) {
db, err := OpenDB(context.Background(), ":memory:")
if err != nil {
t.Fatal(err)
}
p1, err := peer.IDB58Decode("QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH")
if err != nil {
t.Fatal(err)
}
p2, err := peer.IDB58Decode("QmUkUQgxXeggyaD5Ckv8ZqfW8wHBX6cYyeiyqvVZYzq5Bi")
if err != nil {
t.Fatal(err)
}
addr1, err := ma.NewMultiaddr("/ip4/1.1.1.1/tcp/9999")
if err != nil {
t.Fatal(err)
}
addrs1 := [][]byte{addr1.Bytes()}
addr2, err := ma.NewMultiaddr("/ip4/2.2.2.2/tcp/9999")
if err != nil {
t.Fatal(err)
}
addrs2 := [][]byte{addr2.Bytes()}
_, err = db.Register(p1, "foo1", addrs1, 60)
if err != nil {
t.Fatal(err)
}
_, err = db.Register(p1, "foo2", addrs1, 60)
if err != nil {
t.Fatal(err)
}
count, err := db.CountRegistrations(p1)
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Fatal("registrations for p1 should be 2")
}
rrs, cookie, err := db.Discover("", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 2 {
t.Fatal("should have got 2 registrations")
}
rr := rrs[0]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if rr.Ns != "foo1" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
rr = rrs[1]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if rr.Ns != "foo2" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
_, err = db.Register(p2, "foo1", addrs2, 60)
if err != nil {
t.Fatal(err)
}
_, err = db.Register(p2, "foo2", addrs2, 60)
if err != nil {
t.Fatal(err)
}
count, err = db.CountRegistrations(p2)
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Fatal("registrations for p2 should be 2")
}
rrs, cookie, err = db.Discover("", cookie, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 2 {
t.Fatal("should have got 2 registrations")
}
rr = rrs[0]
if rr.Id != p2 {
t.Fatal("expected p2 ID in registration")
}
if rr.Ns != "foo1" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs2) {
t.Fatal("expected p2's addrs in registration")
}
rr = rrs[1]
if rr.Id != p2 {
t.Fatal("expected p2 ID in registration")
}
if rr.Ns != "foo2" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs2) {
t.Fatal("expected p2's addrs in registration")
}
err = db.Unregister(p2, "")
if err != nil {
t.Fatal(err)
}
count, err = db.CountRegistrations(p2)
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatal("registrations for p2 should be 0")
}
rrs, _, err = db.Discover("", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 2 {
t.Fatal("should have got 2 registrations")
}
rr = rrs[0]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if rr.Ns != "foo1" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
rr = rrs[1]
if rr.Id != p1 {
t.Fatal("expected p1 ID in registration")
}
if rr.Ns != "foo2" {
t.Fatal("expected namespace foo1 in registration")
}
if !equalAddrs(rr.Addrs, addrs1) {
t.Fatal("expected p1's addrs in registration")
}
db.Close()
}
func TestDBCleanup(t *testing.T) {
db, err := OpenDB(context.Background(), ":memory:")
if err != nil {
t.Fatal(err)
}
p1, err := peer.IDB58Decode("QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH")
if err != nil {
t.Fatal(err)
}
addr1, err := ma.NewMultiaddr("/ip4/1.1.1.1/tcp/9999")
if err != nil {
t.Fatal(err)
}
addrs1 := [][]byte{addr1.Bytes()}
_, err = db.Register(p1, "foo1", addrs1, 1)
if err != nil {
t.Fatal(err)
}
count, err := db.CountRegistrations(p1)
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatal("registrations for p1 should be 1")
}
time.Sleep(2 * time.Second)
db.cleanupExpired()
count, err = db.CountRegistrations(p1)
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatal("registrations for p1 should be 0")
}
rrs, _, err := db.Discover("foo1", nil, 100)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 0 {
t.Fatal("should have got 0 registrations")
}
db.Close()
}

155
discovery.go Normal file
View File

@ -0,0 +1,155 @@
package rendezvous
import (
"context"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"math"
"math/rand"
"sync"
"time"
)
type rendezvousDiscovery struct {
rp RendezvousPoint
peerCache map[string]*discoveryCache
peerCacheMux sync.RWMutex
rng *rand.Rand
rngMux sync.Mutex
}
type discoveryCache struct {
recs map[peer.ID]*record
cookie []byte
mux sync.Mutex
}
type record struct {
peer peer.AddrInfo
expire int64
}
func NewRendezvousDiscovery(host host.Host, rendezvousPeer peer.ID) discovery.Discovery {
rp := NewRendezvousPoint(host, rendezvousPeer)
return &rendezvousDiscovery{rp: rp, peerCache: make(map[string]*discoveryCache), rng: rand.New(rand.NewSource(rand.Int63()))}
}
func (c *rendezvousDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}
ttl := options.Ttl
var ttlSeconds int
if ttl == 0 {
ttlSeconds = 7200
} else {
ttlSeconds = int(math.Round(ttl.Seconds()))
}
if rttl, err := c.rp.Register(ctx, ns, ttlSeconds); err != nil {
return 0, err
} else {
return rttl, nil
}
}
func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}
const maxLimit = 1000
limit := options.Limit
if limit == 0 || limit > maxLimit {
limit = maxLimit
}
// Get cached peers
var cache *discoveryCache
c.peerCacheMux.RLock()
cache, ok := c.peerCache[ns]
c.peerCacheMux.RUnlock()
if !ok {
c.peerCacheMux.Lock()
cache, ok = c.peerCache[ns]
if !ok{
cache = &discoveryCache{recs: make(map[peer.ID]*record)}
c.peerCache[ns] = cache
}
c.peerCacheMux.Unlock()
}
cache.mux.Lock()
defer cache.mux.Unlock()
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(cache.recs)
for p := range cache.recs {
rec := cache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(cache.recs, p)
}
}
cookie := cache.cookie
// Discover new records if we don't have enough
if newCacheSize < limit {
// TODO: Should we return error even if we have valid cached results?
var regs []Registration
var newCookie []byte
if regs, newCookie, err = c.rp.Discover(ctx, ns, limit, cookie); err == nil {
for _, reg := range regs {
rec := &record{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime}
cache.recs[rec.peer.ID] = rec
}
cache.cookie = newCookie
}
}
// Randomize and fill channel with available records
count := len(cache.recs)
if limit < count {
count = limit
}
chPeer := make(chan peer.AddrInfo, count)
c.rngMux.Lock()
perm := c.rng.Perm(len(cache.recs))[0:count]
c.rngMux.Unlock()
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
sendLst := make([]*peer.AddrInfo, count)
iter := 0
for k := range cache.recs {
if sendIndex, ok := permSet[iter]; ok {
sendLst[sendIndex] = &cache.recs[k].peer
}
iter++
}
for _, send := range sendLst {
chPeer <- *send
}
close(chPeer)
return chPeer, err
}

162
discovery_test.go Normal file
View File

@ -0,0 +1,162 @@
package rendezvous
import (
"context"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"math/rand"
"testing"
"time"
)
func getRendezvousDiscovery(hosts []host.Host) []discovery.Discovery {
clients := make([]discovery.Discovery, len(hosts)-1)
rendezvousPeer := hosts[0].ID()
for i, h := range hosts[1:] {
rp := NewRendezvousPoint(h, rendezvousPeer)
rng := rand.New(rand.NewSource(int64(i)))
clients[i] = &rendezvousDiscovery{rp: rp, peerCache: make(map[string]*discoveryCache), rng: rng}
}
return clients
}
func peerChannelToArray(pch <-chan peer.AddrInfo) []peer.AddrInfo {
pi := make([]peer.AddrInfo, len(pch))
peerIndex := 0
for p := range pch {
pi[peerIndex] = p
peerIndex++
}
return pi
}
func checkAvailablePeers(t *testing.T, ctx context.Context, client discovery.Discovery, namespace string, expectedNumPeers int) {
pch, err := client.FindPeers(ctx, namespace)
if err != nil {
t.Fatal(err)
}
pi := peerChannelToArray(pch)
if len(pi) != expectedNumPeers {
t.Fatalf("Expected %d peers", expectedNumPeers)
}
}
func TestDiscoveryClientAdvertiseAndFindPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Define parameters
const namespace = "foo1"
const numClients = 4
const ttl = DefaultTTL * time.Second
// Instantiate server and clients
hosts := getRendezvousHosts(t, ctx, numClients+1)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousDiscovery(hosts)
// Advertise and check one peer
_, err = clients[0].Advertise(ctx, namespace, discovery.TTL(ttl))
if err != nil {
t.Fatal(err)
}
checkAvailablePeers(t, ctx, clients[0], namespace, 1)
// Advertise and check the rest of the peers incrementally
for i, client := range clients[1:] {
if _, err = client.Advertise(ctx, namespace, discovery.TTL(ttl)); err != nil {
t.Fatal(err)
}
checkAvailablePeers(t, ctx, client, namespace, i+2)
}
// Check that the first peer can get all the new records
checkAvailablePeers(t, ctx, clients[0], namespace, numClients)
}
func TestDiscoveryClientExpiredCachedRecords(t *testing.T) {
BaseDiscoveryClientCacheExpirationTest(t, true)
}
func TestDiscoveryClientExpiredManyCachedRecords(t *testing.T) {
BaseDiscoveryClientCacheExpirationTest(t, false)
}
func BaseDiscoveryClientCacheExpirationTest(t *testing.T, onlyRequestFromCache bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Define parameters
const numShortLivedRegs = 5
const everyIthRegIsLongTTL = 2
const numBaseRegs = numShortLivedRegs * everyIthRegIsLongTTL
const namespace = "foo1"
const longTTL = DefaultTTL * time.Second
const shortTTL = 2 * time.Second
// Instantiate server and clients
hosts := getRendezvousHosts(t, ctx, numBaseRegs+3)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousDiscovery(hosts)
// Advertise most clients
for i, client := range clients[2:] {
ttl := shortTTL
if i%everyIthRegIsLongTTL == 0 {
ttl = longTTL
}
if _, err = client.Advertise(ctx, namespace, discovery.TTL(ttl)); err != nil {
t.Fatal(err)
}
}
// Find peers from an unrelated client (results should be cached)
pch, err := clients[0].FindPeers(ctx, namespace)
if err != nil {
t.Fatal(err)
}
pi := peerChannelToArray(pch)
if len(pi) != numBaseRegs {
t.Fatalf("expected %d registrations", numBaseRegs)
}
// Advertise from a new unrelated peer
if _, err := clients[1].Advertise(ctx, namespace, discovery.TTL(longTTL)); err != nil {
t.Fatal(err)
}
// Wait for cache expiration
time.Sleep(shortTTL + time.Second)
// Check if number of retrieved records matches caching expectations after expiration
expectedNumClients := numShortLivedRegs
if !onlyRequestFromCache {
expectedNumClients++
}
pch, err = clients[0].FindPeers(ctx, namespace, discovery.Limit(expectedNumClients))
if err != nil {
t.Fatal(err)
}
pi = peerChannelToArray(pch)
if len(pi) != expectedNumClients {
t.Fatalf("received an incorrect number of records: %d", len(pi))
}
}

11
go.mod Normal file
View File

@ -0,0 +1,11 @@
module github.com/libp2p/go-libp2p-rendezvous
require (
github.com/gogo/protobuf v1.2.1
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-blankhost v0.1.1
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/mattn/go-sqlite3 v1.10.0
github.com/multiformats/go-multiaddr v0.0.4
)

228
go.sum Normal file
View File

@ -0,0 +1,228 @@
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-addr-util v0.0.1 h1:TpTQm9cXVRVSKsYbgQ7GKc3KbbHVTnbostgGaDEP+88=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
github.com/libp2p/go-buffer-pool v0.0.1 h1:9Rrn/H46cXjaA2HQ5Y8lyhOS1NhTkZ4yuEs2r3Eechg=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD21CIEy5eYd1Hlp0juHY0=
github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc=
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-blankhost v0.1.1 h1:X919sCh+KLqJcNRApj43xCSiQRYqOSI88Fdf55ngf78=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI=
github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE=
github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY=
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s=
github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evllckjebkdiY5ta4=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA=
github.com/libp2p/go-libp2p-yamux v0.2.0 h1:TSPZ5cMMz/wdoYsye/wU1TE4G3LDGMoeEN0xgnCKU/I=
github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8=
github.com/libp2p/go-maddr-filter v0.0.4 h1:hx8HIuuwk34KePddrp2mM5ivgPkZ09JH4AvsALRbFUs=
github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q=
github.com/libp2p/go-mplex v0.0.3/go.mod h1:pK5yMLmOoBR1pNCqDlA2GQrdAVTMkqFalaTWe7l4Yd0=
github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0=
github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU=
github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=
github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA=
github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4=
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
github.com/libp2p/go-stream-muxer v0.0.1 h1:Ce6e2Pyu+b5MC1k3eeFtAax0pW4gc6MosYSLV05UeLw=
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=
github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc=
github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQtiRQD60/8o=
github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc=
github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg=
github.com/libp2p/go-yamux v1.2.2/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.1.0 h1:U41/2erhAKcmSI14xh/ZTUdBPOzDOIfS93ibzUSl8KM=
github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr-dns v0.0.1 h1:jQt9c6tDSdQLIlBo4tXYx7QUHCPjxsB1zXcag/2S7zc=
github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8=
github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
github.com/multiformats/go-multiaddr-fmt v0.0.1 h1:5YjeOIzbX8OTKVaN72aOzGIYW7PnrZrnkDyOfAWRSMA=
github.com/multiformats/go-multiaddr-fmt v0.0.1/go.mod h1:aBYjqL4T/7j4Qx+R73XSv/8JsgnRFlf0w2KGLCmXl3Q=
github.com/multiformats/go-multiaddr-net v0.0.1 h1:76O59E3FavvHqNg7jvzWzsPSW5JSi/ek0E4eiDVbg9g=
github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU=
github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ=
github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e h1:ZytStCyV048ZqDsWHiYDdoI2Vd4msMcrDECFxS+tL9c=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

2250
pb/rendezvous.pb.go Normal file

File diff suppressed because it is too large Load Diff

64
pb/rendezvous.proto Normal file
View File

@ -0,0 +1,64 @@
package rendezvous.pb;
message Message {
enum MessageType {
REGISTER = 0;
REGISTER_RESPONSE = 1;
UNREGISTER = 2;
DISCOVER = 3;
DISCOVER_RESPONSE = 4;
}
enum ResponseStatus {
OK = 0;
E_INVALID_NAMESPACE = 100;
E_INVALID_PEER_INFO = 101;
E_INVALID_TTL = 102;
E_INVALID_COOKIE = 103;
E_NOT_AUTHORIZED = 200;
E_INTERNAL_ERROR = 300;
E_UNAVAILABLE = 400;
}
message PeerInfo {
optional bytes id = 1;
repeated bytes addrs = 2;
}
message Register {
optional string ns = 1;
optional PeerInfo peer = 2;
optional int64 ttl = 3; // in seconds
}
message RegisterResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional int64 ttl = 3;
}
message Unregister {
optional string ns = 1;
optional bytes id = 2;
}
message Discover {
optional string ns = 1;
optional int64 limit = 2;
optional bytes cookie = 3;
}
message DiscoverResponse {
repeated Register registrations = 1;
optional bytes cookie = 2;
optional ResponseStatus status = 3;
optional string statusText = 4;
}
optional MessageType type = 1;
optional Register register = 2;
optional RegisterResponse registerResponse = 3;
optional Unregister unregister = 4;
optional Discover discover = 5;
optional DiscoverResponse discoverResponse = 6;
}

146
proto.go Normal file
View File

@ -0,0 +1,146 @@
package rendezvous
import (
"errors"
"fmt"
db "github.com/libp2p/go-libp2p-rendezvous/db"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("rendezvous")
const (
RendezvousProto = protocol.ID("/rendezvous/1.0.0")
DefaultTTL = 2 * 3600 // 2hr
)
type RendezvousError struct {
Status pb.Message_ResponseStatus
Text string
}
func (e RendezvousError) Error() string {
return fmt.Sprintf("Rendezvous error: %s (%s)", e.Text, pb.Message_ResponseStatus(e.Status).String())
}
func newRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_REGISTER.Enum()
msg.Register = new(pb.Message_Register)
if ns != "" {
msg.Register.Ns = &ns
}
if ttl > 0 {
ttl64 := int64(ttl)
msg.Register.Ttl = &ttl64
}
msg.Register.Peer = new(pb.Message_PeerInfo)
msg.Register.Peer.Id = []byte(pi.ID)
msg.Register.Peer.Addrs = make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
msg.Register.Peer.Addrs[i] = addr.Bytes()
}
return msg
}
func newUnregisterMessage(ns string, pid peer.ID) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_UNREGISTER.Enum()
msg.Unregister = new(pb.Message_Unregister)
if ns != "" {
msg.Unregister.Ns = &ns
}
msg.Unregister.Id = []byte(pid)
return msg
}
func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_DISCOVER.Enum()
msg.Discover = new(pb.Message_Discover)
if ns != "" {
msg.Discover.Ns = &ns
}
if limit > 0 {
limit64 := int64(limit)
msg.Discover.Limit = &limit64
}
if cookie != nil {
msg.Discover.Cookie = cookie
}
return msg
}
func pbToPeerInfo(p *pb.Message_PeerInfo) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("missing peer info")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, bs := range p.Addrs {
addr, err := ma.NewMultiaddrBytes(bs)
if err != nil {
log.Errorf("Error parsing multiaddr: %s", err.Error())
continue
}
addrs = append(addrs, addr)
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func newRegisterResponse(ttl int) *pb.Message_RegisterResponse {
ttl64 := int64(ttl)
r := new(pb.Message_RegisterResponse)
r.Status = pb.Message_OK.Enum()
r.Ttl = &ttl64
return r
}
func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}
func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = pb.Message_OK.Enum()
rregs := make([]*pb.Message_Register, len(regs))
for i, reg := range regs {
rreg := new(pb.Message_Register)
rns := reg.Ns
rreg.Ns = &rns
rreg.Peer = new(pb.Message_PeerInfo)
rreg.Peer.Id = []byte(reg.Id)
rreg.Peer.Addrs = reg.Addrs
rttl := int64(reg.Ttl)
rreg.Ttl = &rttl
rregs[i] = rreg
}
r.Registrations = rregs
r.Cookie = cookie
return r
}
func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}

235
svc.go Normal file
View File

@ -0,0 +1,235 @@
package rendezvous
import (
"fmt"
db "github.com/libp2p/go-libp2p-rendezvous/db"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
MaxTTL = 72 * 3600 // 72hr
MaxNamespaceLength = 256
MaxPeerAddressLength = 2048
MaxRegistrations = 1000
MaxDiscoverLimit = 1000
)
type RendezvousService struct {
DB db.DB
rzs []RendezvousSync
}
type RendezvousSync interface {
Register(p peer.ID, ns string, addrs [][]byte, ttl int, counter uint64)
Unregister(p peer.ID, ns string)
}
func NewRendezvousService(host host.Host, db db.DB, rzs ...RendezvousSync) *RendezvousService {
rz := &RendezvousService{DB: db, rzs: rzs}
host.SetStreamHandler(RendezvousProto, rz.handleStream)
return rz
}
func (rz *RendezvousService) handleStream(s inet.Stream) {
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
for {
var req pb.Message
var res pb.Message
err := r.ReadMsg(&req)
if err != nil {
s.Reset()
return
}
t := req.GetType()
switch t {
case pb.Message_REGISTER:
r := rz.handleRegister(pid, req.GetRegister())
res.Type = pb.Message_REGISTER_RESPONSE.Enum()
res.RegisterResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
s.Reset()
return
}
case pb.Message_UNREGISTER:
err := rz.handleUnregister(pid, req.GetUnregister())
if err != nil {
log.Debugf("Error unregistering peer: %s", err.Error())
}
case pb.Message_DISCOVER:
r := rz.handleDiscover(pid, req.GetDiscover())
res.Type = pb.Message_DISCOVER_RESPONSE.Enum()
res.DiscoverResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
s.Reset()
return
}
default:
log.Debugf("Unexpected message: %s", t.String())
s.Reset()
return
}
}
}
func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *pb.Message_RegisterResponse {
ns := m.GetNs()
if ns == "" {
return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "unspecified namespace")
}
if len(ns) > MaxNamespaceLength {
return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long")
}
mpi := m.GetPeer()
if mpi == nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer info")
}
mpid := mpi.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "bad peer id")
}
if mp != p {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer id mismatch")
}
}
maddrs := mpi.GetAddrs()
if len(maddrs) == 0 {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer addresses")
}
mlen := 0
for _, maddr := range maddrs {
mlen += len(maddr)
}
if mlen > MaxPeerAddressLength {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer info too long")
}
// Note:
// We don't validate the addresses, because they could include protocols we don't understand
// Perhaps we should though.
mttl := m.GetTtl()
if mttl < 0 || mttl > MaxTTL {
return newRegisterResponseError(pb.Message_E_INVALID_TTL, "bad ttl")
}
ttl := DefaultTTL
if mttl > 0 {
ttl = int(mttl)
}
// now check how many registrations we have for this peer -- simple limit to defend
// against trivial DoS attacks (eg a peer connects and keeps registering until it
// fills our db)
rcount, err := rz.DB.CountRegistrations(p)
if err != nil {
log.Errorf("Error counting registrations: %s", err.Error())
return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, "database error")
}
if rcount > MaxRegistrations {
log.Warningf("Too many registrations for %s", p)
return newRegisterResponseError(pb.Message_E_NOT_AUTHORIZED, "too many registrations")
}
// ok, seems like we can register
counter, err := rz.DB.Register(p, ns, maddrs, ttl)
if err != nil {
log.Errorf("Error registering: %s", err.Error())
return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, "database error")
}
log.Infof("registered peer %s %s (%d)", p, ns, ttl)
for _, rzs := range rz.rzs {
rzs.Register(p, ns, maddrs, ttl, counter)
}
return newRegisterResponse(ttl)
}
func (rz *RendezvousService) handleUnregister(p peer.ID, m *pb.Message_Unregister) error {
ns := m.GetNs()
mpid := m.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return err
}
if mp != p {
return fmt.Errorf("peer id mismatch: %s asked to unregister %s", p.Pretty(), mp.Pretty())
}
}
err := rz.DB.Unregister(p, ns)
if err != nil {
return err
}
log.Infof("unregistered peer %s %s", p, ns)
for _, rzs := range rz.rzs {
rzs.Unregister(p, ns)
}
return nil
}
func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *pb.Message_DiscoverResponse {
ns := m.GetNs()
if len(ns) > MaxNamespaceLength {
return newDiscoverResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long")
}
limit := MaxDiscoverLimit
mlimit := m.GetLimit()
if mlimit > 0 && mlimit < int64(limit) {
limit = int(mlimit)
}
cookie := m.GetCookie()
if cookie != nil && !rz.DB.ValidCookie(ns, cookie) {
return newDiscoverResponseError(pb.Message_E_INVALID_COOKIE, "bad cookie")
}
regs, cookie, err := rz.DB.Discover(ns, cookie, limit)
if err != nil {
log.Errorf("Error in query: %s", err.Error())
return newDiscoverResponseError(pb.Message_E_INTERNAL_ERROR, "database error")
}
log.Infof("discover query: %s %s -> %d", p, ns, len(regs))
return newDiscoverResponse(regs, cookie)
}

345
svc_test.go Normal file
View File

@ -0,0 +1,345 @@
package rendezvous
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
testutil "github.com/libp2p/go-libp2p-swarm/testing"
)
func getRendezvousHosts(t *testing.T, ctx context.Context, n int) []host.Host {
hosts := getNetHosts(t, ctx, n)
for i := 1; i < len(hosts); i++ {
connect(t, hosts[0], hosts[i])
}
return hosts
}
func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := testutil.GenSwarm(t, ctx)
h := bhost.NewBlankHost(netw)
out = append(out, h)
}
return out
}
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func getRendezvousPoints(t *testing.T, hosts []host.Host) []RendezvousPoint {
clients := make([]RendezvousPoint, len(hosts)-1)
for i, host := range hosts[1:] {
clients[i] = NewRendezvousPoint(host, hosts[0].ID())
}
return clients
}
func makeRendezvousService(ctx context.Context, host host.Host, path string) (*RendezvousService, error) {
dbi, err := db.OpenDB(ctx, path)
if err != nil {
return nil, err
}
return NewRendezvousService(host, dbi), nil
}
func TestSVCRegistrationAndDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getRendezvousHosts(t, ctx, 5)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousPoints(t, hosts)
const registerTTL = 60
recordTTL, err := clients[0].Register(ctx, "foo1", registerTTL)
if err != nil {
t.Fatal(err)
}
if recordTTL != registerTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
rrs, cookie, err := clients[0].Discover(ctx, "foo1", 10, nil)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("Expected 1 registration")
}
checkHostRegistration(t, rrs[0], hosts[1])
for i, client := range clients[1:] {
recordTTL, err = client.Register(ctx, "foo1", registerTTL)
if err != nil {
t.Fatal(err)
}
if recordTTL != registerTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
rrs, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 1 {
t.Fatal("Expected 1 registration")
}
checkHostRegistration(t, rrs[0], hosts[2+i])
}
for _, client := range clients[1:] {
rrs, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 4 {
t.Fatal("Expected 4 registrations")
}
for j, rr := range rrs {
checkHostRegistration(t, rr, hosts[1+j])
}
}
err = clients[0].Unregister(ctx, "foo1")
if err != nil {
t.Fatal(err)
}
for _, client := range clients[0:] {
rrs, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 3 {
t.Fatal("Expected 3 registrations")
}
for j, rr := range rrs {
checkHostRegistration(t, rr, hosts[2+j])
}
}
err = clients[1].Unregister(ctx, "")
for _, client := range clients[0:] {
rrs, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil {
t.Fatal(err)
}
if len(rrs) != 2 {
t.Fatal("Expected 2 registrations")
}
for j, rr := range rrs {
checkHostRegistration(t, rr, hosts[3+j])
}
}
}
func checkHostRegistration(t *testing.T, rr Registration, host host.Host) {
if rr.Peer.ID != host.ID() {
t.Fatal("bad registration: peer ID doesn't match host ID")
}
addrs := host.Addrs()
raddrs := rr.Peer.Addrs
if len(addrs) != len(raddrs) {
t.Fatal("bad registration: peer address length mismatch")
}
for i, addr := range addrs {
raddr := raddrs[i]
if !addr.Equal(raddr) {
t.Fatal("bad registration: peer address mismatch")
}
}
}
func TestSVCErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getRendezvousHosts(t, ctx, 2)
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:")
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
// testable registration errors
res, err := doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("", peer.AddrInfo{}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_NAMESPACE {
t.Fatal("expected E_INVALID_NAMESPACE")
}
badns := make([]byte, 2*MaxNamespaceLength)
rand.Read(badns)
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage(string(badns), peer.AddrInfo{}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_NAMESPACE {
t.Fatal("expected E_INVALID_NAMESPACE")
}
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_PEER_INFO {
t.Fatal("expected E_INVALID_PEER_INFO")
}
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{ID: peer.ID("blah")}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_PEER_INFO {
t.Fatal("expected E_INVALID_PEER_INFO")
}
p, err := peer.IDB58Decode("QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH")
if err != nil {
t.Fatal(err)
}
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{ID: p}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_PEER_INFO {
t.Fatal("expected E_INVALID_PEER_INFO")
}
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{ID: hosts[1].ID()}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_PEER_INFO {
t.Fatal("expected E_INVALID_PEER_INFO")
}
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{ID: hosts[1].ID(), Addrs: hosts[1].Addrs()}, 2*MaxTTL))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_INVALID_TTL {
t.Fatal("expected E_INVALID_TTL")
}
// do MaxRegistrations
for i := 0; i < MaxRegistrations+1; i++ {
ns := fmt.Sprintf("foo%d", i)
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage(ns, peer.AddrInfo{ID: hosts[1].ID(), Addrs: hosts[1].Addrs()}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_OK {
t.Fatal("expected OK")
}
}
// and now fail
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newRegisterMessage("foo", peer.AddrInfo{ID: hosts[1].ID(), Addrs: hosts[1].Addrs()}, 0))
if err != nil {
t.Fatal(err)
}
if res.GetRegisterResponse().GetStatus() != pb.Message_E_NOT_AUTHORIZED {
t.Fatal("expected E_NOT_AUTHORIZED")
}
// testable discovery errors
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newDiscoverMessage(string(badns), 0, nil))
if err != nil {
t.Fatal(err)
}
if res.GetDiscoverResponse().GetStatus() != pb.Message_E_INVALID_NAMESPACE {
t.Fatal("expected E_INVALID_NAMESPACE")
}
badcookie := make([]byte, 10)
rand.Read(badcookie)
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newDiscoverMessage("foo", 0, badcookie))
if err != nil {
t.Fatal(err)
}
if res.GetDiscoverResponse().GetStatus() != pb.Message_E_INVALID_COOKIE {
t.Fatal("expected E_INVALID_COOKIE")
}
badcookie = make([]byte, 40)
rand.Read(badcookie)
res, err = doTestRequest(ctx, hosts[1], hosts[0].ID(),
newDiscoverMessage("foo", 0, badcookie))
if err != nil {
t.Fatal(err)
}
if res.GetDiscoverResponse().GetStatus() != pb.Message_E_INVALID_COOKIE {
t.Fatal("expected E_INVALID_COOKIE")
}
}
func doTestRequest(ctx context.Context, host host.Host, rp peer.ID, m *pb.Message) (*pb.Message, error) {
s, err := host.NewStream(ctx, rp, RendezvousProto)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
err = w.WriteMsg(m)
if err != nil {
return nil, err
}
res := new(pb.Message)
err = r.ReadMsg(res)
if err != nil {
return nil, err
}
return res, nil
}