Expose DialFirst

This commit is contained in:
Matt Joiner 2021-06-21 12:54:57 +10:00
parent 9f5ce7e90e
commit 765edfa7cb
3 changed files with 33 additions and 35 deletions

View File

@ -553,9 +553,9 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
return cl.torrents[ih] return cl.torrents[ih]
} }
type dialResult struct { type DialResult struct {
Conn net.Conn Conn net.Conn
Network string Dialer Dialer
} }
func countDialResult(err error) { func countDialResult(err error) {
@ -581,14 +581,19 @@ func (cl *Client) dopplegangerAddr(addr string) bool {
} }
// Returns a connection over UTP or TCP, whichever is first to connect. // Returns a connection over UTP or TCP, whichever is first to connect.
func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) { func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
return DialFirst(ctx, addr, cl.dialers)
}
// Returns a connection over UTP or TCP, whichever is first to connect.
func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
{ {
t := perf.NewTimer(perf.CallerName(0)) t := perf.NewTimer(perf.CallerName(0))
defer func() { defer func() {
if res.Conn == nil { if res.Conn == nil {
t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err())) t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
} else { } else {
t.Mark("returned conn over " + res.Network) t.Mark("returned conn over " + res.Dialer.DialerNetwork())
} }
}() }()
} }
@ -596,24 +601,17 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
// As soon as we return one connection, cancel the others. // As soon as we return one connection, cancel the others.
defer cancel() defer cancel()
left := 0 left := 0
resCh := make(chan dialResult, left) resCh := make(chan DialResult, left)
func() { for _, _s := range dialers {
cl.lock() left++
defer cl.unlock() s := _s
cl.eachDialer(func(s Dialer) bool { go func() {
func() { resCh <- DialResult{
left++ dialFromSocket(ctx, s, addr),
//cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr()) s,
go func() { }
resCh <- dialResult{ }()
cl.dialFromSocket(ctx, s, addr), }
s.LocalAddr().Network(),
}
}()
}()
return true
})
}()
// Wait for a successful connection. // Wait for a successful connection.
func() { func() {
defer perf.ScopeTimer()() defer perf.ScopeTimer()()
@ -633,15 +631,10 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
if res.Conn != nil { if res.Conn != nil {
go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
} }
//if res.Conn != nil {
// cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
//} else {
// cl.logger.Printf("failed to dial %s", addr)
//}
return res return res
} }
func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
c, err := s.Dial(ctx, addr) c, err := s.Dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
// it now in case we close the connection forthwith. // it now in case we close the connection forthwith.
@ -711,7 +704,7 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
} }
return nil, errors.New("dial failed") return nil, errors.New("dial failed")
} }
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc)) c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
if err != nil { if err != nil {
nc.Close() nc.Close()
} }

View File

@ -8,12 +8,8 @@ import (
) )
type Dialer interface { type Dialer interface {
// The network is implied by the instance.
Dial(_ context.Context, addr string) (net.Conn, error) Dial(_ context.Context, addr string) (net.Conn, error)
// This is required for registering with the connection tracker (router connection table DialerNetwork() string
// emulating rate-limiter) before dialing. TODO: What about connections that wouldn't infringe
// on routers, like localhost or unix sockets.
LocalAddr() net.Addr
} }
type NetDialer struct { type NetDialer struct {
@ -21,6 +17,10 @@ type NetDialer struct {
Dialer net.Dialer Dialer net.Dialer
} }
func (me NetDialer) DialerNetwork() string {
return me.Network
}
func (me NetDialer) Dial(ctx context.Context, addr string) (_ net.Conn, err error) { func (me NetDialer) Dial(ctx context.Context, addr string) (_ net.Conn, err error) {
defer perf.ScopeTimerErr(&err)() defer perf.ScopeTimerErr(&err)()
return me.Dialer.DialContext(ctx, me.Network, addr) return me.Dialer.DialContext(ctx, me.Network, addr)

View File

@ -106,11 +106,16 @@ func listenUtp(network, addr string, fc firewallCallback) (socket, error) {
return utpSocketSocket{us, network}, err return utpSocketSocket{us, network}, err
} }
// utpSocket wrapper, additionally wrapped for the torrent package's socket interface.
type utpSocketSocket struct { type utpSocketSocket struct {
utpSocket utpSocket
network string network string
} }
func (me utpSocketSocket) DialerNetwork() string {
return me.network
}
func (me utpSocketSocket) Dial(ctx context.Context, addr string) (conn net.Conn, err error) { func (me utpSocketSocket) Dial(ctx context.Context, addr string) (conn net.Conn, err error) {
defer perf.ScopeTimerErr(&err)() defer perf.ScopeTimerErr(&err)()
return me.utpSocket.DialContext(ctx, me.network, addr) return me.utpSocket.DialContext(ctx, me.network, addr)