diff --git a/client.go b/client.go index dc850a9e..bb0f4f4c 100644 --- a/client.go +++ b/client.go @@ -975,7 +975,7 @@ const localClientReqq = 1 << 5 // See the order given in Transmission's tr_peerMsgsNew. func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() { - conn.post(pp.Message{ + conn.write(pp.Message{ Type: pp.Extended, ExtendedID: pp.HandshakeExtendedID, ExtendedPayload: func() []byte { @@ -1004,11 +1004,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { func() { if conn.fastEnabled() { if torrent.haveAllPieces() { - conn.post(pp.Message{Type: pp.HaveAll}) + conn.write(pp.Message{Type: pp.HaveAll}) conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces())) return } else if !torrent.haveAnyPieces() { - conn.post(pp.Message{Type: pp.HaveNone}) + conn.write(pp.Message{Type: pp.HaveNone}) conn.sentHaves.Clear() return } @@ -1016,7 +1016,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { conn.postBitfield() }() if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() { - conn.post(pp.Message{ + conn.write(pp.Message{ Type: pp.Port, Port: cl.dhtPort(), }) @@ -1074,12 +1074,12 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon return err case pp.RequestMetadataExtensionMsgType: if !t.haveMetadataPiece(piece) { - c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil)) + c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil)) return nil } start := (1 << 14) * piece c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece) - c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)])) + c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)])) return nil case pp.RejectMetadataExtensionMsgType: return nil diff --git a/peerconn.go b/peerconn.go index 48b305a2..7fd819be 100644 --- a/peerconn.go +++ b/peerconn.go @@ -404,27 +404,17 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool { // https://github.com/pion/datachannel/issues/59 is fixed. const writeBufferHighWaterLen = 1 << 15 -// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is -// done asynchronously, so it may be that we're not able to honour backpressure from this method. It -// might be possible to merge this with PeerConn.write down the track? They seem to be very similar. -func (cn *PeerConn) post(msg pp.Message) bool { - torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1) +// Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is +// done asynchronously, so it may be that we're not able to honour backpressure from this method. +func (cn *PeerConn) write(msg pp.Message) bool { + torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1) // We don't need to track bytes here because a connection.w Writer wrapper takes care of that // (although there's some delay between us recording the message, and the connection writer // flushing it out.). cn.writeBuffer.Write(msg.MustMarshalBinary()) - // Last I checked only Piece messages affect stats, and we don't post those. + // Last I checked only Piece messages affect stats, and we don't write those. cn.wroteMsg(&msg) cn.tickleWriter() - return cn.writeBuffer.Len() < writeBufferHighWaterLen -} - -// Returns true if there's room to write more. -func (cn *PeerConn) write(msg pp.Message) bool { - cn.wroteMsg(&msg) - cn.writeBuffer.Write(msg.MustMarshalBinary()) - torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1) - cn.tickleWriter() return !cn.writeBufferFull() } @@ -441,7 +431,7 @@ func (cn *PeerConn) requestMetadataPiece(index int) { return } cn.logger.WithDefaultLevel(log.Debug).Printf("requesting metadata piece %d", index) - cn.post(pp.Message{ + cn.write(pp.Message{ Type: pp.Extended, ExtendedID: eID, ExtendedPayload: func() []byte { @@ -689,7 +679,7 @@ func (cn *PeerConn) have(piece pieceIndex) { if cn.sentHaves.Get(bitmap.BitIndex(piece)) { return } - cn.post(pp.Message{ + cn.write(pp.Message{ Type: pp.Have, Index: pp.Integer(piece), }) @@ -703,7 +693,7 @@ func (cn *PeerConn) postBitfield() { if !cn.t.haveAnyPieces() { return } - cn.post(pp.Message{ + cn.write(pp.Message{ Type: pp.Bitfield, Bitfield: cn.t.bitfield(), }) @@ -946,7 +936,7 @@ func (c *PeerConn) reject(r Request) { if !c.fastEnabled() { panic("fast not enabled") } - c.post(r.ToMsg(pp.Reject)) + c.write(r.ToMsg(pp.Reject)) delete(c.peerRequests, r) } @@ -1028,7 +1018,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { if c.choking { c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly") } - c.choke(c.post) + c.choke(c.write) } func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {