From 7d72fc79b3c0adffd71de5602648622e8a1e6ca2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 23 Apr 2018 11:54:25 +0300 Subject: [PATCH] service implementation --- client.go | 13 +--- db.go | 35 +++++++++ proto.go | 60 +++++++++++++++ svc.go | 213 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 312 insertions(+), 9 deletions(-) create mode 100644 db.go create mode 100644 svc.go diff --git a/client.go b/client.go index 6e31404..99995be 100644 --- a/client.go +++ b/client.go @@ -8,17 +8,12 @@ import ( pb "github.com/libp2p/go-libp2p-rendezvous/pb" ggio "github.com/gogo/protobuf/io" - logging "github.com/ipfs/go-log" host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" ) -var log = logging.Logger("rendezvous") - -const DefaultTTL = 2 * 3600 // 2hr - type Rendezvous interface { Register(ctx context.Context, ns string, ttl int) error Unregister(ctx context.Context, ns string) error @@ -51,7 +46,7 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error { } defer s.Close() - r := ggio.NewDelimitedReader(s, 1<<20) + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) w := ggio.NewDelimitedWriter(s) req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) @@ -124,7 +119,7 @@ func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie [] } defer s.Close() - r := ggio.NewDelimitedReader(s, 1<<20) + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) w := ggio.NewDelimitedWriter(s) return discoverQuery(ns, limit, cookie, r, w) @@ -182,7 +177,7 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist defer s.Close() defer close(ch) - r := ggio.NewDelimitedReader(s, 1<<20) + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) w := ggio.NewDelimitedWriter(s) const batch = 100 @@ -208,7 +203,7 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist } } - if len(regs) < batch { + if len(regs) < batch/2 { select { case <-time.After(2 * time.Minute): case <-ctx.Done(): diff --git a/db.go b/db.go new file mode 100644 index 0000000..cfee9cb --- /dev/null +++ b/db.go @@ -0,0 +1,35 @@ +package rendezvous + +import ( + "context" + "errors" + + peer "github.com/libp2p/go-libp2p-peer" +) + +type DB struct { +} + +func OpenDB(ctx context.Context, path string) (*DB, error) { + return nil, errors.New("IMPLEMENTME: OpenDB") +} + +func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) error { + return errors.New("IMPLEMENTME: DB.Register") +} + +func (db *DB) CountRegistrations(p peer.ID) (int, error) { + return 0, errors.New("IMPLEMENTME: DB.CountRegistrations") +} + +func (db *DB) Unregister(p peer.ID, ns string) error { + return errors.New("IMPLEMENTME: DB.Unregister") +} + +func (db *DB) ValidCookie(ns string, cookie []byte) bool { + return false +} + +func (db *DB) Discover(ns string, cookie []byte, limit int) ([]RegistrationRecord, []byte, error) { + return nil, nil, errors.New("IMPLEMENTME: DB.Discover") +} diff --git a/proto.go b/proto.go index ae3941d..f1c48d6 100644 --- a/proto.go +++ b/proto.go @@ -1,20 +1,33 @@ package rendezvous import ( + "errors" "fmt" pb "github.com/libp2p/go-libp2p-rendezvous/pb" + logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" protocol "github.com/libp2p/go-libp2p-protocol" ma "github.com/multiformats/go-multiaddr" ) +var log = logging.Logger("rendezvous") + const ( RendezvousProto = protocol.ID("/rendezvous/1.0.0") + + DefaultTTL = 2 * 3600 // 2hr ) +type RegistrationRecord struct { + Id []byte + Addrs [][]byte + Ns string + Ttl int +} + type RendezvousError struct { Status pb.Message_ResponseStatus Text string @@ -73,6 +86,10 @@ func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message { } func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) { + if p == nil { + return pstore.PeerInfo{}, errors.New("missing peer info") + } + id, err := peer.IDFromBytes(p.Id) if err != nil { return pstore.PeerInfo{}, err @@ -89,3 +106,46 @@ func pbToPeerInfo(p *pb.Message_PeerInfo) (pstore.PeerInfo, error) { return pstore.PeerInfo{ID: id, Addrs: addrs}, nil } + +func newRegisterResponse() *pb.Message_RegisterResponse { + r := new(pb.Message_RegisterResponse) + r.Status = pb.Message_OK.Enum() + return r +} + +func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse { + r := new(pb.Message_RegisterResponse) + r.Status = status.Enum() + r.StatusText = &text + return r +} + +func newDiscoverResponse(regs []RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse { + r := new(pb.Message_DiscoverResponse) + r.Status = pb.Message_OK.Enum() + + rregs := make([]*pb.Message_Register, len(regs)) + for i, reg := range regs { + rreg := new(pb.Message_Register) + rns := reg.Ns + rreg.Ns = &rns + rreg.Peer = new(pb.Message_PeerInfo) + rreg.Peer.Id = reg.Id + rreg.Peer.Addrs = reg.Addrs + rttl := int64(reg.Ttl) + rreg.Ttl = &rttl + rregs[i] = rreg + } + + r.Registrations = rregs + r.Cookie = cookie + + return r +} + +func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse { + r := new(pb.Message_DiscoverResponse) + r.Status = status.Enum() + r.StatusText = &text + return r +} diff --git a/svc.go b/svc.go new file mode 100644 index 0000000..71b609a --- /dev/null +++ b/svc.go @@ -0,0 +1,213 @@ +package rendezvous + +import ( + "context" + "fmt" + + pb "github.com/libp2p/go-libp2p-rendezvous/pb" + + ggio "github.com/gogo/protobuf/io" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" +) + +const ( + MaxTTL = 72 * 3600 // 72hr + MaxNamespaceLength = 256 + MaxPeerAddressLength = 2048 + MaxRegistrations = 100 + MaxDiscoverLimit = 1000 +) + +type RendezvousService struct { + DB *DB +} + +func NewRendezvousService(ctx context.Context, host host.Host, dbpath string) (*RendezvousService, error) { + db, err := OpenDB(ctx, dbpath) + if err != nil { + return nil, err + } + + rz := &RendezvousService{DB: db} + host.SetStreamHandler(RendezvousProto, rz.handleStream) + return rz, nil +} + +func (rz *RendezvousService) handleStream(s inet.Stream) { + pid := s.Conn().RemotePeer() + log.Debugf("New stream from %s", pid.Pretty()) + + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(s) + + for { + var req pb.Message + var res pb.Message + + err := r.ReadMsg(&req) + if err != nil { + s.Reset() + return + } + + t := req.GetType() + switch t { + case pb.Message_REGISTER: + r := rz.handleRegister(pid, req.GetRegister()) + res.Type = pb.Message_REGISTER_RESPONSE.Enum() + res.RegisterResponse = r + err = w.WriteMsg(&res) + if err != nil { + log.Debugf("Error writing response: %s", err.Error()) + s.Reset() + return + } + + case pb.Message_UNREGISTER: + err := rz.handleUnregister(pid, req.GetUnregister()) + if err != nil { + log.Debugf("Error unregistering peer: %s", err.Error()) + } + + case pb.Message_DISCOVER: + r := rz.handleDiscover(pid, req.GetDiscover()) + res.Type = pb.Message_DISCOVER_RESPONSE.Enum() + res.DiscoverResponse = r + err = w.WriteMsg(&res) + if err != nil { + log.Debugf("Error writing response: %s", err.Error()) + s.Reset() + return + } + + default: + log.Debugf("Unexpected message: %s", t.String()) + s.Reset() + return + } + } +} + +func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *pb.Message_RegisterResponse { + ns := m.GetNs() + if ns == "" { + return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "unspecified namespace") + } + + if len(ns) > MaxNamespaceLength { + return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long") + } + + mpi := m.GetPeer() + if mpi == nil { + return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer info") + } + + mpid := mpi.GetId() + if mpid != nil { + mp, err := peer.IDFromBytes(mpid) + if err != nil { + return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "bad peer id") + } + + if mp != p { + return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer id mismatch") + } + } + + maddrs := mpi.GetAddrs() + if len(maddrs) == 0 { + return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer addresses") + } + + mlen := 0 + for _, maddr := range maddrs { + mlen += len(maddr) + } + if mlen > MaxPeerAddressLength { + return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer info too long") + } + + // Note: + // We don't validate the addresses, because they could include protocols we don't understand + // Perhaps we should though. + + mttl := m.GetTtl() + if mttl < 0 || mttl > MaxTTL { + return newRegisterResponseError(pb.Message_E_INVALID_TTL, "bad ttl") + } + + ttl := DefaultTTL + if mttl > 0 { + ttl = int(mttl) + } + + // now check how many registrations we have for this peer -- simple limit to defend + // against trivial DoS attacks (eg a peer connects and keeps registering until it + // fills our db) + rcount, err := rz.DB.CountRegistrations(p) + if err != nil { + log.Errorf("Error counting registrations: %s", err.Error()) + return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, err.Error()) + } + + if rcount > MaxRegistrations { + return newRegisterResponseError(pb.Message_E_NOT_AUTHORIZED, "too many registrations") + } + + // ok, seems like we can register + err = rz.DB.Register(p, ns, maddrs, ttl) + if err != nil { + log.Errorf("Error registering: %s", err.Error()) + return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, err.Error()) + } + + return newRegisterResponse() +} + +func (rz *RendezvousService) handleUnregister(p peer.ID, m *pb.Message_Unregister) error { + ns := m.GetNs() + + mpid := m.GetId() + if mpid != nil { + mp, err := peer.IDFromBytes(mpid) + if err != nil { + return err + } + + if mp != p { + return fmt.Errorf("peer id mismatch: %s asked to unregister %s", p.Pretty(), mp.Pretty()) + } + } + + return rz.DB.Unregister(p, ns) +} + +func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *pb.Message_DiscoverResponse { + ns := m.GetNs() + + if len(ns) > MaxNamespaceLength { + return newDiscoverResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long") + } + + limit := MaxDiscoverLimit + mlimit := m.GetLimit() + if mlimit > 0 && mlimit < int64(limit) { + limit = int(mlimit) + } + + cookie := m.GetCookie() + if cookie != nil && !rz.DB.ValidCookie(ns, cookie) { + return newDiscoverResponseError(pb.Message_E_INVALID_COOKIE, "bad cookie") + } + + regs, cookie, err := rz.DB.Discover(ns, cookie, limit) + if err != nil { + log.Errorf("Error in query: %s", err.Error()) + return newDiscoverResponseError(pb.Message_E_INTERNAL_ERROR, err.Error()) + } + + return newDiscoverResponse(regs, cookie) +}