PeerConn.post becomes PeerConn.write

This commit is contained in:
Matt Joiner 2021-05-10 12:29:47 +10:00
parent 56e2a8a3a6
commit 9c9ba1aeac
2 changed files with 16 additions and 26 deletions

View File

@ -975,7 +975,7 @@ const localClientReqq = 1 << 5
// See the order given in Transmission's tr_peerMsgsNew.
func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
conn.post(pp.Message{
conn.write(pp.Message{
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
ExtendedPayload: func() []byte {
@ -1004,11 +1004,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
func() {
if conn.fastEnabled() {
if torrent.haveAllPieces() {
conn.post(pp.Message{Type: pp.HaveAll})
conn.write(pp.Message{Type: pp.HaveAll})
conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
return
} else if !torrent.haveAnyPieces() {
conn.post(pp.Message{Type: pp.HaveNone})
conn.write(pp.Message{Type: pp.HaveNone})
conn.sentHaves.Clear()
return
}
@ -1016,7 +1016,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
conn.postBitfield()
}()
if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
conn.post(pp.Message{
conn.write(pp.Message{
Type: pp.Port,
Port: cl.dhtPort(),
})
@ -1074,12 +1074,12 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
return err
case pp.RequestMetadataExtensionMsgType:
if !t.haveMetadataPiece(piece) {
c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
return nil
}
start := (1 << 14) * piece
c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
return nil
case pp.RejectMetadataExtensionMsgType:
return nil

View File

@ -404,27 +404,17 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
// https://github.com/pion/datachannel/issues/59 is fixed.
const writeBufferHighWaterLen = 1 << 15
// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is
// done asynchronously, so it may be that we're not able to honour backpressure from this method. It
// might be possible to merge this with PeerConn.write down the track? They seem to be very similar.
func (cn *PeerConn) post(msg pp.Message) bool {
torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
// Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
// done asynchronously, so it may be that we're not able to honour backpressure from this method.
func (cn *PeerConn) write(msg pp.Message) bool {
torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
// We don't need to track bytes here because a connection.w Writer wrapper takes care of that
// (although there's some delay between us recording the message, and the connection writer
// flushing it out.).
cn.writeBuffer.Write(msg.MustMarshalBinary())
// Last I checked only Piece messages affect stats, and we don't post those.
// Last I checked only Piece messages affect stats, and we don't write those.
cn.wroteMsg(&msg)
cn.tickleWriter()
return cn.writeBuffer.Len() < writeBufferHighWaterLen
}
// Returns true if there's room to write more.
func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
cn.tickleWriter()
return !cn.writeBufferFull()
}
@ -441,7 +431,7 @@ func (cn *PeerConn) requestMetadataPiece(index int) {
return
}
cn.logger.WithDefaultLevel(log.Debug).Printf("requesting metadata piece %d", index)
cn.post(pp.Message{
cn.write(pp.Message{
Type: pp.Extended,
ExtendedID: eID,
ExtendedPayload: func() []byte {
@ -689,7 +679,7 @@ func (cn *PeerConn) have(piece pieceIndex) {
if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
return
}
cn.post(pp.Message{
cn.write(pp.Message{
Type: pp.Have,
Index: pp.Integer(piece),
})
@ -703,7 +693,7 @@ func (cn *PeerConn) postBitfield() {
if !cn.t.haveAnyPieces() {
return
}
cn.post(pp.Message{
cn.write(pp.Message{
Type: pp.Bitfield,
Bitfield: cn.t.bitfield(),
})
@ -946,7 +936,7 @@ func (c *PeerConn) reject(r Request) {
if !c.fastEnabled() {
panic("fast not enabled")
}
c.post(r.ToMsg(pp.Reject))
c.write(r.ToMsg(pp.Reject))
delete(c.peerRequests, r)
}
@ -1028,7 +1018,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
if c.choking {
c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
}
c.choke(c.post)
c.choke(c.write)
}
func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {