2
0
mirror of synced 2025-02-24 14:48:27 +00:00

Fix keep-alive timer use in connection writer, and make connection a public type

I'm not sure if making this public is the right approach yet, but for now it's a good way to conditionally invoke methods on Client from external packages.
This commit is contained in:
Matt Joiner 2013-10-22 18:00:35 +11:00
parent d039436f55
commit 8df567e822

View File

@ -85,7 +85,7 @@ type Request struct {
ChunkSpec
}
type connection struct {
type Connection struct {
Socket net.Conn
post chan encoding.BinaryMarshaler
write chan []byte
@ -102,23 +102,23 @@ type connection struct {
PeerPieces []bool
}
func (c *connection) Close() {
func (c *Connection) Close() {
c.Socket.Close()
close(c.post)
}
func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
if c.PeerPieces == nil {
return false
}
return c.PeerPieces[index]
}
func (c *connection) Post(msg encoding.BinaryMarshaler) {
func (c *Connection) Post(msg encoding.BinaryMarshaler) {
c.post <- msg
}
func (c *connection) Request(chunk Request) bool {
func (c *Connection) Request(chunk Request) bool {
if len(c.Requests) >= maxRequests {
return false
}
@ -141,7 +141,7 @@ func (c *connection) Request(chunk Request) bool {
return true
}
func (c *connection) SetInterested(interested bool) {
func (c *Connection) SetInterested(interested bool) {
if c.Interested == interested {
return
}
@ -161,15 +161,17 @@ var (
keepAliveBytes [4]byte
)
func (conn *connection) writer() {
func (conn *Connection) writer() {
timer := time.NewTimer(0)
for {
timer := time.NewTimer(time.Minute)
if !timer.Reset(time.Minute) {
<-timer.C
}
var b []byte
select {
case <-timer.C:
b = keepAliveBytes[:]
case b = <-conn.write:
timer.Stop()
if b == nil {
return
}
@ -186,7 +188,7 @@ func (conn *connection) writer() {
}
}
func (conn *connection) writeOptimizer() {
func (conn *Connection) writeOptimizer() {
pending := list.New()
var nextWrite []byte
defer close(conn.write)
@ -539,7 +541,7 @@ func (me *Torrent) haveAnyPieces() bool {
}
func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
conn := &connection{
conn := &Connection{
Socket: sock,
Choked: true,
PeerChoked: true,
@ -596,14 +598,14 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte
}
err = me.connectionLoop(torrent, conn)
if err != nil {
err = fmt.Errorf("during connection loop: %s", err)
err = fmt.Errorf("during Connection loop: %s", err)
}
me.dropConnection(torrent, conn)
me.mu.Unlock()
return
}
func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
if conn.PeerPieces == nil {
conn.PeerPieces = make([]bool, len(torrent.Pieces))
}
@ -617,11 +619,11 @@ func (t *Torrent) wantPiece(index int) bool {
return !t.Pieces[index].Complete()
}
func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
me.replenishConnRequests(torrent, conn)
}
func (me *Client) connectionLoop(torrent *Torrent, conn *connection) error {
func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
decoder := peer_protocol.Decoder{
R: bufio.NewReader(conn.Socket),
MaxLength: 256 * 1024,
@ -689,7 +691,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *connection) error {
}
}
func (me *Client) dropConnection(torrent *Torrent, conn *connection) {
func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
conn.Socket.Close()
for i0, c := range torrent.Conns {
if c != conn {
@ -702,10 +704,10 @@ func (me *Client) dropConnection(torrent *Torrent, conn *connection) {
torrent.Conns = torrent.Conns[:i1]
return
}
panic("no such connection")
panic("no such Connection")
}
func (me *Client) addConnection(t *Torrent, c *connection) bool {
func (me *Client) addConnection(t *Torrent, c *Connection) bool {
for _, c := range t.Conns {
if c.PeerId == c.PeerId {
return false
@ -779,7 +781,7 @@ func (me *Client) WaitAll() {
func (me *Client) Stop() {
}
func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
requestHeatMap := torrent.requestHeat()
addRequest := func(req Request) (again bool) {
piece := torrent.Pieces[req.Index]