service implementation

This commit is contained in:
vyzo 2018-04-23 11:54:25 +03:00
parent e5a72b9bea
commit 7d72fc79b3
4 changed files with 312 additions and 9 deletions

View File

@ -8,17 +8,12 @@ import (
pb "github.com/libp2p/go-libp2p-rendezvous/pb" pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
) )
var log = logging.Logger("rendezvous")
const DefaultTTL = 2 * 3600 // 2hr
type Rendezvous interface { type Rendezvous interface {
Register(ctx context.Context, ns string, ttl int) error Register(ctx context.Context, ns string, ttl int) error
Unregister(ctx context.Context, ns string) error Unregister(ctx context.Context, ns string) error
@ -51,7 +46,7 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
} }
defer s.Close() defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20) r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl)
@ -124,7 +119,7 @@ func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []
} }
defer s.Close() defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20) r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
return discoverQuery(ns, limit, cookie, r, w) return discoverQuery(ns, limit, cookie, r, w)
@ -182,7 +177,7 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist
defer s.Close() defer s.Close()
defer close(ch) defer close(ch)
r := ggio.NewDelimitedReader(s, 1<<20) r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
const batch = 100 const batch = 100
@ -208,7 +203,7 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist
} }
} }
if len(regs) < batch { if len(regs) < batch/2 {
select { select {
case <-time.After(2 * time.Minute): case <-time.After(2 * time.Minute):
case <-ctx.Done(): case <-ctx.Done():

35
db.go Normal file
View File

@ -0,0 +1,35 @@
package rendezvous
import (
"context"
"errors"
peer "github.com/libp2p/go-libp2p-peer"
)
type DB struct {
}
func OpenDB(ctx context.Context, path string) (*DB, error) {
return nil, errors.New("IMPLEMENTME: OpenDB")
}
func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) error {
return errors.New("IMPLEMENTME: DB.Register")
}
func (db *DB) CountRegistrations(p peer.ID) (int, error) {
return 0, errors.New("IMPLEMENTME: DB.CountRegistrations")
}
func (db *DB) Unregister(p peer.ID, ns string) error {
return errors.New("IMPLEMENTME: DB.Unregister")
}
func (db *DB) ValidCookie(ns string, cookie []byte) bool {
return false
}
func (db *DB) Discover(ns string, cookie []byte, limit int) ([]RegistrationRecord, []byte, error) {
return nil, nil, errors.New("IMPLEMENTME: DB.Discover")
}

View File

@ -1,20 +1,33 @@
package rendezvous package rendezvous
import ( import (
"errors"
"fmt" "fmt"
pb "github.com/libp2p/go-libp2p-rendezvous/pb" pb "github.com/libp2p/go-libp2p-rendezvous/pb"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol" protocol "github.com/libp2p/go-libp2p-protocol"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
var log = logging.Logger("rendezvous")
const ( const (
RendezvousProto = protocol.ID("/rendezvous/1.0.0") RendezvousProto = protocol.ID("/rendezvous/1.0.0")
DefaultTTL = 2 * 3600 // 2hr
) )
type RegistrationRecord struct {
Id []byte
Addrs [][]byte
Ns string
Ttl int
}
type RendezvousError struct { type RendezvousError struct {
Status pb.Message_ResponseStatus Status pb.Message_ResponseStatus
Text string Text string
@ -73,6 +86,10 @@ func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
} }
func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) { func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) {
if p == nil {
return pstore.PeerInfo{}, errors.New("missing peer info")
}
id, err := peer.IDFromBytes(p.Id) id, err := peer.IDFromBytes(p.Id)
if err != nil { if err != nil {
return pstore.PeerInfo{}, err return pstore.PeerInfo{}, err
@ -89,3 +106,46 @@ func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) {
return pstore.PeerInfo{ID: id, Addrs: addrs}, nil return pstore.PeerInfo{ID: id, Addrs: addrs}, nil
} }
func newRegisterResponse() *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse)
r.Status = pb.Message_OK.Enum()
return r
}
func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}
func newDiscoverResponse(regs []RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = pb.Message_OK.Enum()
rregs := make([]*pb.Message_Register, len(regs))
for i, reg := range regs {
rreg := new(pb.Message_Register)
rns := reg.Ns
rreg.Ns = &rns
rreg.Peer = new(pb.Message_PeerInfo)
rreg.Peer.Id = reg.Id
rreg.Peer.Addrs = reg.Addrs
rttl := int64(reg.Ttl)
rreg.Ttl = &rttl
rregs[i] = rreg
}
r.Registrations = rregs
r.Cookie = cookie
return r
}
func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}

213
svc.go Normal file
View File

@ -0,0 +1,213 @@
package rendezvous
import (
"context"
"fmt"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
)
const (
MaxTTL = 72 * 3600 // 72hr
MaxNamespaceLength = 256
MaxPeerAddressLength = 2048
MaxRegistrations = 100
MaxDiscoverLimit = 1000
)
type RendezvousService struct {
DB *DB
}
func NewRendezvousService(ctx context.Context, host host.Host, dbpath string) (*RendezvousService, error) {
db, err := OpenDB(ctx, dbpath)
if err != nil {
return nil, err
}
rz := &RendezvousService{DB: db}
host.SetStreamHandler(RendezvousProto, rz.handleStream)
return rz, nil
}
func (rz *RendezvousService) handleStream(s inet.Stream) {
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
for {
var req pb.Message
var res pb.Message
err := r.ReadMsg(&req)
if err != nil {
s.Reset()
return
}
t := req.GetType()
switch t {
case pb.Message_REGISTER:
r := rz.handleRegister(pid, req.GetRegister())
res.Type = pb.Message_REGISTER_RESPONSE.Enum()
res.RegisterResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
s.Reset()
return
}
case pb.Message_UNREGISTER:
err := rz.handleUnregister(pid, req.GetUnregister())
if err != nil {
log.Debugf("Error unregistering peer: %s", err.Error())
}
case pb.Message_DISCOVER:
r := rz.handleDiscover(pid, req.GetDiscover())
res.Type = pb.Message_DISCOVER_RESPONSE.Enum()
res.DiscoverResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
s.Reset()
return
}
default:
log.Debugf("Unexpected message: %s", t.String())
s.Reset()
return
}
}
}
func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *pb.Message_RegisterResponse {
ns := m.GetNs()
if ns == "" {
return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "unspecified namespace")
}
if len(ns) > MaxNamespaceLength {
return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long")
}
mpi := m.GetPeer()
if mpi == nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer info")
}
mpid := mpi.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "bad peer id")
}
if mp != p {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer id mismatch")
}
}
maddrs := mpi.GetAddrs()
if len(maddrs) == 0 {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer addresses")
}
mlen := 0
for _, maddr := range maddrs {
mlen += len(maddr)
}
if mlen > MaxPeerAddressLength {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer info too long")
}
// Note:
// We don't validate the addresses, because they could include protocols we don't understand
// Perhaps we should though.
mttl := m.GetTtl()
if mttl < 0 || mttl > MaxTTL {
return newRegisterResponseError(pb.Message_E_INVALID_TTL, "bad ttl")
}
ttl := DefaultTTL
if mttl > 0 {
ttl = int(mttl)
}
// now check how many registrations we have for this peer -- simple limit to defend
// against trivial DoS attacks (eg a peer connects and keeps registering until it
// fills our db)
rcount, err := rz.DB.CountRegistrations(p)
if err != nil {
log.Errorf("Error counting registrations: %s", err.Error())
return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, err.Error())
}
if rcount > MaxRegistrations {
return newRegisterResponseError(pb.Message_E_NOT_AUTHORIZED, "too many registrations")
}
// ok, seems like we can register
err = rz.DB.Register(p, ns, maddrs, ttl)
if err != nil {
log.Errorf("Error registering: %s", err.Error())
return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, err.Error())
}
return newRegisterResponse()
}
func (rz *RendezvousService) handleUnregister(p peer.ID, m *pb.Message_Unregister) error {
ns := m.GetNs()
mpid := m.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return err
}
if mp != p {
return fmt.Errorf("peer id mismatch: %s asked to unregister %s", p.Pretty(), mp.Pretty())
}
}
return rz.DB.Unregister(p, ns)
}
func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *pb.Message_DiscoverResponse {
ns := m.GetNs()
if len(ns) > MaxNamespaceLength {
return newDiscoverResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long")
}
limit := MaxDiscoverLimit
mlimit := m.GetLimit()
if mlimit > 0 && mlimit < int64(limit) {
limit = int(mlimit)
}
cookie := m.GetCookie()
if cookie != nil && !rz.DB.ValidCookie(ns, cookie) {
return newDiscoverResponseError(pb.Message_E_INVALID_COOKIE, "bad cookie")
}
regs, cookie, err := rz.DB.Discover(ns, cookie, limit)
if err != nil {
log.Errorf("Error in query: %s", err.Error())
return newDiscoverResponseError(pb.Message_E_INTERNAL_ERROR, err.Error())
}
return newDiscoverResponse(regs, cookie)
}