From e124809ca5f17e608c09fc79423f9c357208a3c5 Mon Sep 17 00:00:00 2001 From: Gyorgy Sarvari Date: Sat, 3 Jan 2026 15:23:14 +0000 Subject: [PATCH 2/2] Limit number of chunks before pausing reading (#11894) (#11916) From: Sam Bull (cherry picked from commit 1e4120e87daec963c67f956111e6bca44d7c3dea) Co-authored-by: J. Nick Koston CVE: CVE-2025-69229 Upstream-Status: Backport [https://github.com/aio-libs/aiohttp/commit/4ed97a4e46eaf61bd0f05063245f613469700229] Signed-off-by: Gyorgy Sarvari --- aiohttp/streams.py | 25 ++++++- tests/test_streams.py | 170 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 108257e..9329534 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -116,6 +116,8 @@ class StreamReader(AsyncStreamReaderMixin): "_protocol", "_low_water", "_high_water", + "_low_water_chunks", + "_high_water_chunks", "_loop", "_size", "_cursor", @@ -145,6 +147,11 @@ class StreamReader(AsyncStreamReaderMixin): self._high_water = limit * 2 if loop is None: loop = asyncio.get_event_loop() + # Ensure high_water_chunks >= 3 so it's always > low_water_chunks. + self._high_water_chunks = max(3, limit // 4) + # Use max(2, ...) because there's always at least 1 chunk split remaining + # (the current position), so we need low_water >= 2 to allow resume. + self._low_water_chunks = max(2, self._high_water_chunks // 2) self._loop = loop self._size = 0 self._cursor = 0 @@ -321,6 +328,15 @@ class StreamReader(AsyncStreamReaderMixin): self._http_chunk_splits.append(self.total_bytes) + # If we get too many small chunks before self._high_water is reached, then any + # .read() call becomes computationally expensive, and could block the event loop + # for too long, hence an additional self._high_water_chunks here. + if ( + len(self._http_chunk_splits) > self._high_water_chunks + and not self._protocol._reading_paused + ): + self._protocol.pause_reading() + # wake up readchunk when end of http chunk received waiter = self._waiter if waiter is not None: @@ -529,7 +545,14 @@ class StreamReader(AsyncStreamReaderMixin): while chunk_splits and chunk_splits[0] < self._cursor: chunk_splits.popleft() - if self._size < self._low_water and self._protocol._reading_paused: + if ( + self._protocol._reading_paused + and self._size < self._low_water + and ( + self._http_chunk_splits is None + or len(self._http_chunk_splits) < self._low_water_chunks + ) + ): self._protocol.resume_reading() return data diff --git a/tests/test_streams.py b/tests/test_streams.py index 1b65f77..c5bc671 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1552,3 +1552,173 @@ async def test_stream_reader_iter_chunks_chunked_encoding(protocol) -> None: def test_isinstance_check() -> None: assert isinstance(streams.EMPTY_PAYLOAD, streams.StreamReader) + + +async def test_stream_reader_pause_on_high_water_chunks( + protocol: mock.Mock, +) -> None: + """Test that reading is paused when chunk count exceeds high water mark.""" + loop = asyncio.get_event_loop() + # Use small limit so high_water_chunks is small: limit // 4 = 10 + stream = streams.StreamReader(protocol, limit=40, loop=loop) + + assert stream._high_water_chunks == 10 + assert stream._low_water_chunks == 5 + + # Feed chunks until we exceed high_water_chunks + for i in range(12): + stream.begin_http_chunk_receiving() + stream.feed_data(b"x") # 1 byte per chunk + stream.end_http_chunk_receiving() + + # pause_reading should have been called when chunk count exceeded 10 + protocol.pause_reading.assert_called() + + +async def test_stream_reader_resume_on_low_water_chunks( + protocol: mock.Mock, +) -> None: + """Test that reading resumes when chunk count drops below low water mark.""" + loop = asyncio.get_event_loop() + # Use small limit so high_water_chunks is small: limit // 4 = 10 + stream = streams.StreamReader(protocol, limit=40, loop=loop) + + assert stream._high_water_chunks == 10 + assert stream._low_water_chunks == 5 + + # Feed chunks until we exceed high_water_chunks + for i in range(12): + stream.begin_http_chunk_receiving() + stream.feed_data(b"x") # 1 byte per chunk + stream.end_http_chunk_receiving() + + # Simulate that reading was paused + protocol._reading_paused = True + protocol.pause_reading.reset_mock() + + # Read data to reduce both size and chunk count + # Reading will consume chunks and reduce _http_chunk_splits + data = await stream.read(10) + assert data == b"xxxxxxxxxx" + + # resume_reading should have been called when both size and chunk count + # dropped below their respective low water marks + protocol.resume_reading.assert_called() + + +async def test_stream_reader_no_resume_when_chunks_still_high( + protocol: mock.Mock, +) -> None: + """Test that reading doesn't resume if chunk count is still above low water.""" + loop = asyncio.get_event_loop() + # Use small limit so high_water_chunks is small: limit // 4 = 10 + stream = streams.StreamReader(protocol, limit=40, loop=loop) + + # Feed many chunks + for i in range(12): + stream.begin_http_chunk_receiving() + stream.feed_data(b"x") + stream.end_http_chunk_receiving() + + # Simulate that reading was paused + protocol._reading_paused = True + + # Read only a few bytes - chunk count will still be high + data = await stream.read(2) + assert data == b"xx" + + # resume_reading should NOT be called because chunk count is still >= low_water_chunks + protocol.resume_reading.assert_not_called() + + +async def test_stream_reader_read_non_chunked_response( + protocol: mock.Mock, +) -> None: + """Test that non-chunked responses work correctly (no chunk tracking).""" + loop = asyncio.get_event_loop() + stream = streams.StreamReader(protocol, limit=40, loop=loop) + + # Non-chunked: just feed data without begin/end_http_chunk_receiving + stream.feed_data(b"Hello World") + + # _http_chunk_splits should be None for non-chunked responses + assert stream._http_chunk_splits is None + + # Reading should work without issues + data = await stream.read(5) + assert data == b"Hello" + + data = await stream.read(6) + assert data == b" World" + + +async def test_stream_reader_resume_non_chunked_when_paused( + protocol: mock.Mock, +) -> None: + """Test that resume works for non-chunked responses when paused due to size.""" + loop = asyncio.get_event_loop() + # Small limit so we can trigger pause via size + stream = streams.StreamReader(protocol, limit=10, loop=loop) + + # Feed data that exceeds high_water (limit * 2 = 20) + stream.feed_data(b"x" * 25) + + # Simulate that reading was paused due to size + protocol._reading_paused = True + protocol.pause_reading.assert_called() + + # Read enough to drop below low_water (limit = 10) + data = await stream.read(20) + assert data == b"x" * 20 + + # resume_reading should be called (size is now 5 < low_water 10) + protocol.resume_reading.assert_called() + + +@pytest.mark.parametrize("limit", [1, 2, 4]) +async def test_stream_reader_small_limit_resumes_reading( + protocol: mock.Mock, + limit: int, +) -> None: + """Test that small limits still allow resume_reading to be called. + + Even with very small limits, high_water_chunks should be at least 3 + and low_water_chunks should be at least 2, with high > low to ensure + proper flow control. + """ + loop = asyncio.get_event_loop() + stream = streams.StreamReader(protocol, limit=limit, loop=loop) + + # Verify minimum thresholds are enforced and high > low + assert stream._high_water_chunks >= 3 + assert stream._low_water_chunks >= 2 + assert stream._high_water_chunks > stream._low_water_chunks + + # Set up pause/resume side effects + def pause_reading() -> None: + protocol._reading_paused = True + + protocol.pause_reading.side_effect = pause_reading + + def resume_reading() -> None: + protocol._reading_paused = False + + protocol.resume_reading.side_effect = resume_reading + + # Feed 4 chunks (triggers pause at > high_water_chunks which is >= 3) + for char in b"abcd": + stream.begin_http_chunk_receiving() + stream.feed_data(bytes([char])) + stream.end_http_chunk_receiving() + + # Reading should now be paused + assert protocol._reading_paused is True + assert protocol.pause_reading.called + + # Read all data - should resume (chunk count drops below low_water_chunks) + data = stream.read_nowait() + assert data == b"abcd" + assert stream._size == 0 + + protocol.resume_reading.assert_called() + assert protocol._reading_paused is False