Count expected received chunks instead of flagging them

This can be racy. In the TestReceiveChunkStorageFailure, when we have a storage write failure, we request the chunk again, but the peer has sometimes already sent it, and we return from the connection read loop with unexpected chunk after receiving it twice.
This commit is contained in:
Matt Joiner 2020-04-23 12:34:43 +10:00
parent 03887dbc80
commit 12e377e3fe
2 changed files with 27 additions and 7 deletions

View File

@ -81,7 +81,7 @@ type PeerConn struct {
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
validReceiveChunks map[request]struct{}
validReceiveChunks map[request]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
@ -501,9 +501,9 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
}
cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[request]struct{})
cn.validReceiveChunks = make(map[request]int)
}
cn.validReceiveChunks[r] = struct{}{}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
@ -987,6 +987,14 @@ func (c *PeerConn) onReadRequest(r request) error {
return nil
}
func runSafeExtraneous(f func()) {
if true {
go f()
} else {
f()
}
}
// Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
// exit. Returning will end the connection.
func (c *PeerConn) mainReadLoop() (err error) {
@ -1026,6 +1034,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
messageTypesReceived.Add(msg.Type.String(), 1)
if msg.Type.FastExtension() && !c.fastEnabled() {
runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) })
return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
}
switch msg.Type {
@ -1090,7 +1099,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
err = c.peerSentHaveNone()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
delete(c.validReceiveChunks, newRequestFromMessage(&msg))
c.decExpectedChunkReceive(newRequestFromMessage(&msg))
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@ -1107,6 +1116,17 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
}
func (c *PeerConn) decExpectedChunkReceive(r request) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
} else if count > 1 {
c.validReceiveChunks[r] = count - 1
} else {
panic(r)
}
}
func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
defer func() {
// TODO: Should we still do this?
@ -1196,11 +1216,11 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
torrent.Add("chunks received while choking", 1)
}
if _, ok := c.validReceiveChunks[req]; !ok {
if c.validReceiveChunks[req] <= 0 {
torrent.Add("chunks received unexpected", 1)
return errors.New("received unexpected chunk")
}
delete(c.validReceiveChunks, req)
c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) {
torrent.Add("chunks received due to allowed fast", 1)

View File

@ -132,7 +132,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}}
cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1}
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)