Extract protocol agnostic tracker Client

This commit is contained in:
Matt Joiner 2021-06-24 10:39:56 +10:00
parent 101a269873
commit 98a1423732
7 changed files with 185 additions and 137 deletions

49
tracker/client.go Normal file
View File

@ -0,0 +1,49 @@
package tracker
import (
"context"
"net/url"
trHttp "github.com/anacrolix/torrent/tracker/http"
"github.com/anacrolix/torrent/tracker/udp"
)
type Client interface {
Announce(context.Context, AnnounceRequest, trHttp.AnnounceOpt) (AnnounceResponse, error)
Close() error
}
type NewClientOpts struct {
Http trHttp.NewClientOpts
// Overrides the network in the scheme. Probably a legacy thing.
UdpNetwork string
}
func NewClient(urlStr string, opts NewClientOpts) (Client, error) {
_url, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
switch _url.Scheme {
case "http", "https":
return trHttp.NewClient(_url, opts.Http), nil
case "udp", "udp4", "udp6":
network := _url.Scheme
if opts.UdpNetwork != "" {
network = opts.UdpNetwork
}
cc, err := udp.NewConnClient(udp.NewConnClientOpts{
Network: network,
Host: _url.Host,
})
if err != nil {
return nil, err
}
return &udpClient{
cl: cc,
requestUri: _url.RequestURI(),
}, nil
default:
return nil, ErrBadScheme
}
}

41
tracker/http/client.go Normal file
View File

@ -0,0 +1,41 @@
package http
import (
"crypto/tls"
"net/http"
"net/url"
)
type Client struct {
hc *http.Client
url_ *url.URL
}
type ProxyFunc func(*http.Request) (*url.URL, error)
type NewClientOpts struct {
Proxy ProxyFunc
ServerName string
}
func NewClient(url_ *url.URL, opts NewClientOpts) Client {
return Client{
url_: url_,
hc: &http.Client{
Transport: &http.Transport{
Proxy: opts.Proxy,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: opts.ServerName,
},
// This is for S3 trackers that hold connections open.
DisableKeepAlives: true,
},
},
}
}
func (cl Client) Close() error {
cl.hc.CloseIdleConnections()
return nil
}

View File

@ -3,7 +3,6 @@ package http
import (
"bytes"
"context"
"crypto/tls"
"expvar"
"fmt"
"io"
@ -13,7 +12,6 @@ import (
"net/url"
"strconv"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/missinggo/httptoo"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/tracker/shared"
@ -22,80 +20,6 @@ import (
var vars = expvar.NewMap("tracker/http")
type Client struct {
hc *http.Client
}
type NewClientOpts struct {
Proxy func(*http.Request) (*url.URL, error)
ServerName string
}
func NewClient(opts NewClientOpts) Client {
return Client{
hc: &http.Client{
Transport: &http.Transport{
Proxy: opts.Proxy,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: opts.ServerName,
},
// This is for S3 trackers that hold connections open.
DisableKeepAlives: true,
},
},
}
}
type HttpResponse struct {
FailureReason string `bencode:"failure reason"`
Interval int32 `bencode:"interval"`
TrackerId string `bencode:"tracker id"`
Complete int32 `bencode:"complete"`
Incomplete int32 `bencode:"incomplete"`
Peers Peers `bencode:"peers"`
// BEP 7
Peers6 krpc.CompactIPv6NodeAddrs `bencode:"peers6"`
}
type Peers []Peer
func (me *Peers) UnmarshalBencode(b []byte) (err error) {
var _v interface{}
err = bencode.Unmarshal(b, &_v)
if err != nil {
return
}
switch v := _v.(type) {
case string:
vars.Add("http responses with string peers", 1)
var cnas krpc.CompactIPv4NodeAddrs
err = cnas.UnmarshalBinary([]byte(v))
if err != nil {
return
}
for _, cp := range cnas {
*me = append(*me, Peer{
IP: cp.IP[:],
Port: int(cp.Port),
})
}
return
case []interface{}:
vars.Add("http responses with list peers", 1)
for _, i := range v {
var p Peer
p.FromDictInterface(i.(map[string]interface{}))
*me = append(*me, p)
}
return
default:
vars.Add("http responses with unhandled peers type", 1)
err = fmt.Errorf("unsupported type: %T", _v)
return
}
}
func setAnnounceParams(_url *url.URL, ar *AnnounceRequest, opts AnnounceOpt) {
q := _url.Query()
@ -148,8 +72,8 @@ type AnnounceOpt struct {
type AnnounceRequest = udp.AnnounceRequest
func (cl Client) Announce(ctx context.Context, ar AnnounceRequest, opt AnnounceOpt, _url *url.URL) (ret AnnounceResponse, err error) {
_url = httptoo.CopyURL(_url)
func (cl Client) Announce(ctx context.Context, ar AnnounceRequest, opt AnnounceOpt) (ret AnnounceResponse, err error) {
_url := httptoo.CopyURL(cl.url_)
setAnnounceParams(_url, &ar, opt)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, _url.String(), nil)
req.Header.Set("User-Agent", opt.UserAgent)

57
tracker/http/protocol.go Normal file
View File

@ -0,0 +1,57 @@
package http
import (
"fmt"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/torrent/bencode"
)
type HttpResponse struct {
FailureReason string `bencode:"failure reason"`
Interval int32 `bencode:"interval"`
TrackerId string `bencode:"tracker id"`
Complete int32 `bencode:"complete"`
Incomplete int32 `bencode:"incomplete"`
Peers Peers `bencode:"peers"`
// BEP 7
Peers6 krpc.CompactIPv6NodeAddrs `bencode:"peers6"`
}
type Peers []Peer
func (me *Peers) UnmarshalBencode(b []byte) (err error) {
var _v interface{}
err = bencode.Unmarshal(b, &_v)
if err != nil {
return
}
switch v := _v.(type) {
case string:
vars.Add("http responses with string peers", 1)
var cnas krpc.CompactIPv4NodeAddrs
err = cnas.UnmarshalBinary([]byte(v))
if err != nil {
return
}
for _, cp := range cnas {
*me = append(*me, Peer{
IP: cp.IP[:],
Port: int(cp.Port),
})
}
return
case []interface{}:
vars.Add("http responses with list peers", 1)
for _, i := range v {
var p Peer
p.FromDictInterface(i.(map[string]interface{}))
*me = append(*me, p)
}
return
default:
vars.Add("http responses with unhandled peers type", 1)
err = fmt.Errorf("unsupported type: %T", _v)
return
}
}

View File

@ -51,10 +51,17 @@ type Announce struct {
const DefaultTrackerAnnounceTimeout = 15 * time.Second
func (me Announce) Do() (res AnnounceResponse, err error) {
_url, err := url.Parse(me.TrackerUrl)
cl, err := NewClient(me.TrackerUrl, NewClientOpts{
Http: trHttp.NewClientOpts{
Proxy: me.HTTPProxy,
ServerName: me.ServerName,
},
UdpNetwork: me.UdpNetwork,
})
if err != nil {
return
}
defer cl.Close()
if me.Context == nil {
// This is just to maintain the old behaviour that should be a timeout of 15s. Users can
// override it by providing their own Context. See comments elsewhere about longer timeouts
@ -63,22 +70,10 @@ func (me Announce) Do() (res AnnounceResponse, err error) {
defer cancel()
me.Context = ctx
}
switch _url.Scheme {
case "http", "https":
cl := trHttp.NewClient(trHttp.NewClientOpts{
Proxy: me.HTTPProxy,
ServerName: me.ServerName,
})
return cl.Announce(me.Context, me.Request, trHttp.AnnounceOpt{
UserAgent: me.UserAgent,
HostHeader: me.HostHeader,
ClientIp4: me.ClientIp4.IP,
ClientIp6: me.ClientIp6.IP,
}, _url)
case "udp", "udp4", "udp6":
return announceUDP(me, _url)
default:
err = ErrBadScheme
return
}
return cl.Announce(me.Context, me.Request, trHttp.AnnounceOpt{
UserAgent: me.UserAgent,
HostHeader: me.HostHeader,
ClientIp4: me.ClientIp4.IP,
ClientIp6: me.ClientIp6.IP,
})
}

View File

@ -1,36 +1,31 @@
package tracker
import (
"context"
"encoding/binary"
"net/url"
trHttp "github.com/anacrolix/torrent/tracker/http"
"github.com/anacrolix/torrent/tracker/udp"
)
type udpAnnounce struct {
url url.URL
a *Announce
type udpClient struct {
cl *udp.ConnClient
requestUri string
}
func (c *udpAnnounce) Do(req AnnounceRequest) (res AnnounceResponse, err error) {
cl, err := udp.NewConnClient(udp.NewConnClientOpts{
Network: c.dialNetwork(),
Host: c.url.Host,
Ipv6: nil,
})
if err != nil {
return
}
defer cl.Close()
if req.IPAddress == 0 && c.a.ClientIp4.IP != nil {
func (c *udpClient) Close() error {
return c.cl.Close()
}
func (c *udpClient) Announce(ctx context.Context, req AnnounceRequest, opts trHttp.AnnounceOpt) (res AnnounceResponse, err error) {
if req.IPAddress == 0 && opts.ClientIp4 != nil {
// I think we're taking bytes in big-endian order (all IPs), and writing it to a natively
// ordered uint32. This will be correctly ordered when written back out by the UDP client
// later. I'm ignoring the fact that IPv6 announces shouldn't have an IP address, we have a
// perfectly good IPv4 address.
req.IPAddress = binary.BigEndian.Uint32(c.a.ClientIp4.IP.To4())
req.IPAddress = binary.BigEndian.Uint32(opts.ClientIp4.To4())
}
h, nas, err := cl.Announce(c.a.Context, req, udp.Options{RequestUri: c.url.RequestURI()})
h, nas, err := c.cl.Announce(ctx, req, udp.Options{RequestUri: c.requestUri})
if err != nil {
return
}
@ -42,19 +37,3 @@ func (c *udpAnnounce) Do(req AnnounceRequest) (res AnnounceResponse, err error)
}
return
}
func (c *udpAnnounce) dialNetwork() string {
if c.a.UdpNetwork != "" {
return c.a.UdpNetwork
}
return "udp"
}
// TODO: Split on IPv6, as BEP 15 says response peer decoding depends on network in use.
func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) {
ua := udpAnnounce{
url: *_url,
a: &opt,
}
return ua.Do(opt.Request)
}

View File

@ -54,17 +54,20 @@ func ipv6(opt *bool, network string, conn net.Conn) bool {
return rip.To16() != nil && rip.To4() == nil
}
func NewConnClient(opts NewConnClientOpts) (cc ConnClient, err error) {
cc.conn, err = net.Dial(opts.Network, opts.Host)
func NewConnClient(opts NewConnClientOpts) (cc *ConnClient, err error) {
conn, err := net.Dial(opts.Network, opts.Host)
if err != nil {
return
}
cc.ipv6 = ipv6(opts.Ipv6, opts.Network, cc.conn)
go cc.reader()
cc.cl = Client{
Dispatcher: &cc.d,
Writer: cc.conn,
cc = &ConnClient{
cl: Client{
Writer: conn,
},
conn: conn,
ipv6: ipv6(opts.Ipv6, opts.Network, conn),
}
cc.cl.Dispatcher = &cc.d
go cc.reader()
return
}