diff --git a/congestion/live/receive.go b/congestion/live/receive.go index 50776d9d..20bfbeb6 100644 --- a/congestion/live/receive.go +++ b/congestion/live/receive.go @@ -194,11 +194,24 @@ func (r *receiver) Push(pkt packet.Packet) { } if pkt.Header().PacketSequenceNumber.Lt(r.lastACKSequenceNumber) { - // Already acknowledged, ignoring - r.statistics.PktDrop++ - r.statistics.ByteDrop += pktLen + if !pkt.Header().RetransmittedPacketFlag { + // Already acknowledged, ignoring + r.statistics.PktDrop++ + r.statistics.ByteDrop += pktLen - return + return + } + // Retransmission for a sequence number that was ACKed past but + // not yet delivered. periodicACK and the delivery loop are + // separate critical sections within Tick: lastACKSequenceNumber + // can advance past a gap (e.g. when packets after the gap have + // ripe PktTsbpdTime) before lastDeliveredSequenceNumber catches + // up. A retransmission may legitimately arrive in that window. + // Allow it to flow into the out-of-order branch below, where + // it will either fill the gap or be detected as a duplicate. + // TLPKTDROP semantics are still enforced by the earlier + // Lte(lastDeliveredSequenceNumber) check, which drops anything + // already delivered or skipped past. } if pkt.Header().PacketSequenceNumber.Equals(r.maxSeenSequenceNumber.Inc()) { diff --git a/congestion/live/receive_test.go b/congestion/live/receive_test.go index b416c295..7589618b 100644 --- a/congestion/live/receive_test.go +++ b/congestion/live/receive_test.go @@ -550,6 +550,108 @@ func TestSkipTooLate(t *testing.T) { require.Equal(t, []uint32{0, 1, 2, 3, 4, 8, 9}, numbers) } +// TestRecvRetransmitPastACK verifies that a retransmission whose sequence +// number satisfies lastDeliveredSequenceNumber < seq < lastACKSequenceNumber +// is accepted and reinserted into packetList rather than dropped as +// "already acknowledged". +// +// The bug lives in a lock-free window inside Tick(): +// +// func (r *receiver) Tick(now uint64) { +// if ok, sequenceNumber, lite := r.periodicACK(now); ok { +// r.sendACK(sequenceNumber, lite) // r.lock is free here +// } +// if list := r.periodicNAK(now); len(list) != 0 { +// r.sendNAK(list) // r.lock is free here +// } +// r.lock.Lock() // delivery loop runs only now +// ... +// } +// +// periodicACK takes and releases r.lock through its own defer, so by the +// time sendACK is invoked the receiver lock is free. In production sendACK +// performs network I/O (c.pop -> c.onSend), and the network reader +// goroutine is free to call Push() during that window. If a retransmit +// arrives there, Push observes lastACKSequenceNumber already advanced +// past the gap but lastDeliveredSequenceNumber still at its pre-tick +// value, and the pre-fix Lt(lastACKSequenceNumber) branch drops it. +// +// This test reproduces that interleaving deterministically -- with +// strictly monotonic PktTsbpdTime on every packet, matching what a real +// SRT stream produces -- by Push()ing the retransmission from inside the +// OnSendACK callback. +func TestRecvRetransmitPastACK(t *testing.T) { + deliveredSeq := []uint32{} + addr, _ := net.ResolveIPAddr("ip", "127.0.0.1") + + var recv *receiver + injected := false + + recv = mockLiveRecv( + func(seq circular.Number, light bool) { + if injected { + return + } + injected = true + + // At this point periodicACK has already advanced + // lastACKSequenceNumber past the gap at seq=5, but the + // delivery loop has not yet run, so + // lastDeliveredSequenceNumber is still its pre-tick value. + // This is the exact state observed in the production trace + // (lastACK=691158858, lastDelivered=691158759, + // retrans seq=691158851). + p := packet.NewPacket(addr) + p.Header().PacketSequenceNumber = circular.New(5, packet.MAX_SEQUENCENUMBER) + p.Header().PktTsbpdTime = uint64(105) + p.Header().RetransmittedPacketFlag = true + recv.Push(p) + }, + nil, + func(p packet.Packet) { + deliveredSeq = append(deliveredSeq, p.Header().PacketSequenceNumber.Val()) + }, + ) + + // Push 0..4 and 6..8 with strictly monotonic PktTsbpdTime; seq=5 is + // missing (lost in transit, pending retransmission). The eventual + // retransmission injected above carries ts=105, preserving the + // monotonic ordering ts(4)=104 < ts(5)=105 < ts(6)=106. + for i := range 5 { + p := packet.NewPacket(addr) + p.Header().PacketSequenceNumber = circular.New(uint32(i), packet.MAX_SEQUENCENUMBER) + p.Header().PktTsbpdTime = uint64(100 + i) + recv.Push(p) + } + for _, i := range []uint32{6, 7, 8} { + p := packet.NewPacket(addr) + p.Header().PacketSequenceNumber = circular.New(i, packet.MAX_SEQUENCENUMBER) + p.Header().PktTsbpdTime = uint64(100 + i) + recv.Push(p) + } + + // Drive Tick. With now=200 every PktTsbpdTime is ripe, so periodicACK + // walks the whole list and advances lastACKSequenceNumber to 8 via the + // "PktTsbpdTime <= now" branch -- crossing the gap at 5. OnSendACK + // then fires (lock free) and Push()es the retransmission for seq=5. + // Only after that does the delivery loop run, which must pick up the + // retransmission and deliver 0..8 in order. + recv.Tick(200) + + require.True(t, injected, "OnSendACK callback should have been invoked") + require.Equal(t, uint32(8), recv.lastACKSequenceNumber.Val(), + "periodicACK should advance lastACK past the gap to 8") + + stats := recv.Stats() + require.Equal(t, uint64(0), stats.PktDrop, + "retransmission for an ACKed-but-not-yet-delivered sequence must not be dropped") + require.Equal(t, uint64(1), stats.PktRetrans, + "retransmission counter must increment") + + require.Equal(t, []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8}, deliveredSeq, + "the late-arriving retransmission must be delivered in order") +} + func TestIssue67(t *testing.T) { ackNumbers := []uint32{} nakNumbers := [][2]uint32{}