From 843c954c098e86c02ac0e27f45df103295cbdd13 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 7 Nov 2013 01:55:29 +1100 Subject: [PATCH] Implementing UDP tracker in its own module --- tracker/tracker.go | 50 ++++++++ tracker/udp.go | 27 ----- tracker/udp/udp_tracker.go | 208 ++++++++++++++++++++++++++++++++ tracker/udp/udp_tracker_test.go | 80 ++++++++++++ tracker/udp_test.go | 20 --- 5 files changed, 338 insertions(+), 47 deletions(-) create mode 100644 tracker/tracker.go delete mode 100644 tracker/udp.go create mode 100644 tracker/udp/udp_tracker.go create mode 100644 tracker/udp/udp_tracker_test.go delete mode 100644 tracker/udp_test.go diff --git a/tracker/tracker.go b/tracker/tracker.go new file mode 100644 index 00000000..576353b5 --- /dev/null +++ b/tracker/tracker.go @@ -0,0 +1,50 @@ +package tracker + +import ( + "net" + "net/url" +) + +type AnnounceRequest struct { + InfoHash [20]byte + PeerId [20]byte + Downloaded int64 + Left int64 + Uploaded int64 + Event AnnounceEvent + IPAddress int32 + Key int32 + NumWant int32 + Port int16 +} + +type AnnounceResponse struct { + Interval int32 + Leechers int32 + Seeders int32 + Peers []Peer +} + +type AnnounceEvent int32 + +type Peer struct { + IP net.IP + Port int +} + +const ( + None AnnounceEvent = iota +) + +type Client interface { + Announce(*AnnounceRequest) (AnnounceResponse, error) +} + +var schemes = make(map[string]func(*url.URL) Client) + +func RegisterClientScheme(scheme string, newFunc func(*url.URL) Client) { +} + +func New(url *url.URL) Client { + return schemes[url.Scheme](url) +} diff --git a/tracker/udp.go b/tracker/udp.go deleted file mode 100644 index 55e9ad37..00000000 --- a/tracker/udp.go +++ /dev/null @@ -1,27 +0,0 @@ -package tracker - -type UDPConnectionRequest struct { - ConnectionId int64 - Action int32 - TransctionId int32 -} - -type UDPAnnounceResponseHeader struct { - Action int32 - TransactionId int32 - Interval int32 - Leechers int32 - Seeders int32 -} - -type UDPAnnounceResponse struct { - UDPAnnounceResponseHeader - PeerAddrSlice -} - -type PeerAddr struct { - IP int32 - Port int16 -} - -type PeerAddrSlice []PeerAddr diff --git a/tracker/udp/udp_tracker.go b/tracker/udp/udp_tracker.go new file mode 100644 index 00000000..13800d13 --- /dev/null +++ b/tracker/udp/udp_tracker.go @@ -0,0 +1,208 @@ +package udp_tracker + +import ( + "bitbucket.org/anacrolix/go.torrent/tracker" + "bytes" + "encoding/binary" + "io" + "math/rand" + "net" + "net/url" + "time" +) + +type Action int32 + +const ( + Connect Action = iota + Announce + Scrape + Error +) + +type ConnectionRequest struct { + ConnectionId int64 + Action int32 + TransctionId int32 +} + +type ConnectionResponse struct { + ConnectionId int64 +} + +type ResponseHeader struct { + Action Action + TransactionId int32 +} + +type RequestHeader struct { + ConnectionId int64 + Action Action + TransactionId int32 +} + +type AnnounceResponseHeader struct { + Interval int32 + Leechers int32 + Seeders int32 +} + +type Peer struct { + IP [4]byte + Port uint16 +} + +func init() { + tracker.RegisterClientScheme("udp", newClient) +} + +func newClient(url *url.URL) tracker.Client { + return &client{} +} + +func newTransactionId() int32 { + return int32(rand.Uint32()) +} + +func timeout(contiguousTimeouts int) (d time.Duration) { + if contiguousTimeouts > 8 { + contiguousTimeouts = 8 + } + d = 15 * time.Second + for ; contiguousTimeouts > 0; contiguousTimeouts-- { + d *= 2 + } + return +} + +type client struct { + contiguousTimeouts int + connectionIdReceived time.Time + connectionId int64 + socket net.Conn +} + +func (c *client) Announce(req *tracker.AnnounceRequest) (res tracker.AnnounceResponse, err error) { + err = c.connect() + if err != nil { + return + } + b, err := c.request(Announce, req) + if err != nil { + return + } + var ( + h AnnounceResponseHeader + ps []Peer + ) + err = readBody(b, &h, &ps) + if err != nil { + return + } + res.Interval = h.Interval + res.Leechers = h.Leechers + res.Seeders = h.Seeders + for _, p := range ps { + res.Peers = append(res.Peers, tracker.Peer{ + IP: p.IP[:], + Port: int(p.Port), + }) + } + return +} + +func (c *client) write(h *RequestHeader, body interface{}) (err error) { + buf := &bytes.Buffer{} + err = binary.Write(buf, binary.BigEndian, h) + if err != nil { + panic(err) + } + err = binary.Write(buf, binary.BigEndian, body) + if err != nil { + panic(err) + } + n, err := c.socket.Write(buf.Bytes()) + if err != nil { + return + } + if n != buf.Len() { + panic("write should send all or error") + } + return +} + +func (c *client) request(action Action, args interface{}) (responseBody []byte, err error) { + tid := newTransactionId() + err = c.write(&RequestHeader{ + ConnectionId: c.connectionId, + Action: action, + TransactionId: tid, + }, args) + if err != nil { + return + } + c.socket.SetDeadline(time.Now().Add(timeout(c.contiguousTimeouts))) + b := make([]byte, 0x10000) // IP limits packet size to 64KB + for { + var n int + n, err = c.socket.Read(b) + if opE, ok := err.(*net.OpError); ok { + if opE.Timeout() { + c.contiguousTimeouts++ + return + } + } + if err != nil { + return + } + buf := bytes.NewBuffer(b[:n]) + var h ResponseHeader + err = binary.Read(buf, binary.BigEndian, &h) + switch err { + case io.ErrUnexpectedEOF: + continue + case nil: + default: + return + } + if h.Action != action { + continue + } + if h.TransactionId != tid { + continue + } + c.contiguousTimeouts = 0 + responseBody = buf.Bytes() + return + } +} + +func readBody(b []byte, data ...interface{}) (err error) { + r := bytes.NewReader(b) + for _, datum := range data { + err = binary.Read(r, binary.BigEndian, datum) + if err != nil { + break + } + } + return +} + +func (c *client) connect() (err error) { + if !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute)) { + return nil + } + c.connectionId = 0x41727101980 + b, err := c.request(Connect, nil) + if err != nil { + return + } + var res ConnectionResponse + err = readBody(b, &res) + if err != nil { + return + } + c.connectionId = res.ConnectionId + c.connectionIdReceived = time.Now() + return +} diff --git a/tracker/udp/udp_tracker_test.go b/tracker/udp/udp_tracker_test.go new file mode 100644 index 00000000..4a451ddb --- /dev/null +++ b/tracker/udp/udp_tracker_test.go @@ -0,0 +1,80 @@ +package udp_tracker + +import ( + "bytes" + "encoding/binary" + "io" + "net" + "syscall" + "testing" +) + +func TestNetIPv4Bytes(t *testing.T) { + ip := net.IP([]byte{127, 0, 0, 1}) + if ip.String() != "127.0.0.1" { + t.FailNow() + } + if string(ip) != "\x7f\x00\x00\x01" { + t.Fatal([]byte(ip)) + } +} + +func TestMarshalAnnounceResponse(t *testing.T) { + w := bytes.NewBuffer(nil) + if err := binary.Write(w, binary.BigEndian, []Peer{{[4]byte{127, 0, 0, 1}, 2}, {[4]byte{255, 0, 0, 3}, 4}}); err != nil { + t.Fatalf("error writing udp announce response addrs: %s", err) + } + if w.String() != "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04" { + t.FailNow() + } + if binary.Size(AnnounceResponseHeader{}) != 12 { + t.FailNow() + } +} + +// Failure to write an entire packet to UDP is expected to given an error. +func TestLongWriteUDP(t *testing.T) { + l, err := net.ListenUDP("udp", nil) + defer l.Close() + if err != nil { + t.Fatal(err) + } + c, err := net.DialUDP("udp", nil, l.LocalAddr().(*net.UDPAddr)) + if err != nil { + t.Fatal(err) + } + defer c.Close() + for msgLen := 1; ; msgLen *= 2 { + n, err := c.Write(make([]byte, msgLen)) + if err != nil { + err := err.(*net.OpError).Err + if err != syscall.EMSGSIZE { + t.Fatalf("write error isn't EMSGSIZE: %s", err) + } + return + } + if n < msgLen { + t.FailNow() + } + } +} + +func TestShortBinaryRead(t *testing.T) { + var data ResponseHeader + err := binary.Read(bytes.NewBufferString("\x00\x00\x00\x01"), binary.BigEndian, &data) + if data.Action != 0 { + t.Log("optimistic binary read now works?!") + } + switch err { + case io.ErrUnexpectedEOF: + default: + // TODO + } +} + +func TestConvertInt16ToInt(t *testing.T) { + i := 50000 + if int(uint16(int16(i))) != 50000 { + t.FailNow() + } +} diff --git a/tracker/udp_test.go b/tracker/udp_test.go deleted file mode 100644 index cf4c0e01..00000000 --- a/tracker/udp_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package tracker - -import ( - "bytes" - "encoding/binary" - "testing" -) - -func TestMarshalUDPAnnounceResponse(t *testing.T) { - w := bytes.NewBuffer(nil) - if err := binary.Write(w, binary.BigEndian, &PeerAddrSlice{{1, 2}, {3, 4}}); err != nil { - t.Fatalf("error writing udp announce response addrs: %s", err) - } - if w.String() != "\x00\x00\x00\x01\x00\x02\x00\x00\x00\x03\x00\x04" { - t.FailNow() - } - if binary.Size(UDPAnnounceResponseHeader{}) != 20 { - t.FailNow() - } -}