2
0
mirror of synced 2025-02-24 14:48:27 +00:00
torrent/webtorrent/tracker_client.go

205 lines
5.2 KiB
Go
Raw Normal View History

2020-04-06 16:45:47 +10:00
package webtorrent
import (
"crypto/rand"
"encoding/json"
"fmt"
"sync"
2020-04-06 16:45:47 +10:00
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/tracker"
"github.com/gorilla/websocket"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v2"
)
// Client represents the webtorrent client
type TrackerClient struct {
2020-04-06 16:45:47 +10:00
lock sync.Mutex
peerIDBinary string
infoHashBinary string
2020-04-07 14:30:27 +10:00
outboundOffers map[string]outboundOffer // OfferID to outboundOffer
2020-04-06 16:45:47 +10:00
tracker *websocket.Conn
2020-04-07 14:30:27 +10:00
onConn onDataChannelOpen
logger log.Logger
}
2020-04-07 14:30:27 +10:00
// outboundOffer represents an outstanding offer.
type outboundOffer struct {
originalOffer webrtc.SessionDescription
peerConnection wrappedPeerConnection
dataChannel *webrtc.DataChannel
// Whether we've received an answer for this offer, and closing its PeerConnection has been
// handed off.
answered bool
}
2020-04-07 14:30:27 +10:00
type DataChannelContext struct {
Local, Remote webrtc.SessionDescription
2020-04-13 14:04:34 +10:00
OfferId string
2020-04-07 14:30:27 +10:00
LocalOffered bool
}
type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
func NewTrackerClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *TrackerClient {
return &TrackerClient{
2020-04-07 14:30:27 +10:00
outboundOffers: make(map[string]outboundOffer),
2020-04-06 16:45:47 +10:00
peerIDBinary: binaryToJsonString(peerId[:]),
infoHashBinary: binaryToJsonString(infoHash[:]),
onConn: onConn,
2020-04-07 14:30:27 +10:00
logger: logger,
}
}
func (c *TrackerClient) Run(ar tracker.AnnounceRequest, url string) error {
2020-04-07 14:30:27 +10:00
t, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
2020-04-07 14:30:27 +10:00
return fmt.Errorf("failed to dial tracker: %w", err)
}
defer t.Close()
2020-04-16 17:20:58 +10:00
c.logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", url)
c.tracker = t
2020-04-07 14:30:27 +10:00
go func() {
err := c.announce(ar)
if err != nil {
2020-04-16 17:20:58 +10:00
c.logger.WithDefaultLevel(log.Error).Printf("error announcing: %v", err)
2020-04-07 14:30:27 +10:00
}
}()
err = c.trackerReadLoop(c.tracker)
c.lock.Lock()
c.lock.Unlock()
c.closeUnusedOffers()
return err
}
func (c *TrackerClient) closeUnusedOffers() {
for _, offer := range c.outboundOffers {
if offer.answered {
continue
}
offer.peerConnection.Close()
}
}
func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
var randOfferId [20]byte
_, err := rand.Read(randOfferId[:])
if err != nil {
return fmt.Errorf("generating offer_id bytes: %w", err)
}
offerIDBinary := binaryToJsonString(randOfferId[:])
pc, dc, offer, err := newOffer()
if err != nil {
return fmt.Errorf("creating offer: %w", err)
2020-04-07 14:30:27 +10:00
}
req := AnnounceRequest{
Numwant: 1, // If higher we need to create equal amount of offers.
Uploaded: request.Uploaded,
Downloaded: request.Downloaded,
2020-04-06 16:45:47 +10:00
Left: request.Left,
Event: "started",
Action: "announce",
InfoHash: c.infoHashBinary,
PeerID: c.peerIDBinary,
2020-04-06 16:45:47 +10:00
Offers: []Offer{{
OfferID: offerIDBinary,
Offer: offer,
}},
}
data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshalling request: %w", err)
}
c.lock.Lock()
defer c.lock.Unlock()
err = c.tracker.WriteMessage(websocket.TextMessage, data)
if err != nil {
pc.Close()
2020-04-06 16:45:47 +10:00
return fmt.Errorf("write AnnounceRequest: %w", err)
}
c.outboundOffers[offerIDBinary] = outboundOffer{
peerConnection: pc,
dataChannel: dc,
originalOffer: offer,
}
2020-04-06 16:45:47 +10:00
return nil
}
func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
for {
_, message, err := tracker.ReadMessage()
if err != nil {
return fmt.Errorf("read message error: %w", err)
}
2020-04-16 17:20:58 +10:00
c.logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
var ar AnnounceResponse
if err := json.Unmarshal(message, &ar); err != nil {
c.logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
continue
}
if ar.InfoHash != c.infoHashBinary {
2020-04-13 19:13:23 +10:00
c.logger.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash)
continue
}
switch {
case ar.Offer != nil:
answer, err := getAnswerForOffer(*ar.Offer, c.onConn, ar.OfferID)
if err != nil {
2020-04-06 16:45:47 +10:00
return fmt.Errorf("write AnnounceResponse: %w", err)
}
req := AnnounceResponse{
Action: "announce",
InfoHash: c.infoHashBinary,
PeerID: c.peerIDBinary,
ToPeerID: ar.PeerID,
Answer: &answer,
OfferID: ar.OfferID,
}
data, err := json.Marshal(req)
if err != nil {
2020-04-06 16:45:47 +10:00
return fmt.Errorf("failed to marshal request: %w", err)
}
c.lock.Lock()
err = tracker.WriteMessage(websocket.TextMessage, data)
if err != nil {
2020-04-06 16:45:47 +10:00
return fmt.Errorf("write AnnounceResponse: %w", err)
c.lock.Unlock()
}
c.lock.Unlock()
case ar.Answer != nil:
c.lock.Lock()
2020-04-07 14:30:27 +10:00
offer, ok := c.outboundOffers[ar.OfferID]
c.lock.Unlock()
if !ok {
2020-04-16 17:20:58 +10:00
c.logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
continue
}
c.logger.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
err = offer.setAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
2020-04-07 14:30:27 +10:00
c.onConn(dc, DataChannelContext{
Local: offer.originalOffer,
Remote: *ar.Answer,
2020-04-13 14:04:34 +10:00
OfferId: ar.OfferID,
2020-04-07 14:30:27 +10:00
LocalOffered: true,
})
})
if err != nil {
return fmt.Errorf("failed to sent answer: %w", err)
}
offer.answered = true
}
}
}