Streaming Guide¶
RequestX supports streaming for both request and response bodies, enabling efficient handling of large data transfers.
Streaming Responses¶
Synchronous Streaming¶
Use client.stream() for streaming responses:
import requestx
with requestx.Client() as client:
with client.stream("GET", "https://httpbin.org/bytes/10000") as response:
for chunk in response.iter_bytes(chunk_size=1024):
print(f"Received {len(chunk)} bytes")
Asynchronous Streaming¶
Use async streaming with AsyncClient:
import asyncio
import requestx
async def main():
async with requestx.AsyncClient() as client:
async with await client.stream("GET", "https://httpbin.org/bytes/10000") as response:
async for chunk in response.aiter_bytes(chunk_size=1024):
print(f"Received {len(chunk)} bytes")
asyncio.run(main())
Iteration Methods¶
iter_bytes / aiter_bytes¶
Iterate over raw bytes:
# Sync
with client.stream("GET", url) as response:
for chunk in response.iter_bytes(chunk_size=1024):
process_bytes(chunk)
# Async
async with await client.stream("GET", url) as response:
async for chunk in response.aiter_bytes(chunk_size=1024):
process_bytes(chunk)
iter_text / aiter_text¶
Iterate over decoded text:
# Sync
with client.stream("GET", url) as response:
for text in response.iter_text():
process_text(text)
# Async
async with await client.stream("GET", url) as response:
async for text in response.aiter_text():
process_text(text)
iter_lines / aiter_lines¶
Iterate over lines:
# Sync
with client.stream("GET", url) as response:
for line in response.iter_lines():
print(line)
# Async
async with await client.stream("GET", url) as response:
async for line in response.aiter_lines():
print(line)
Download Files¶
Basic File Download¶
import requestx
def download_file(url: str, filename: str):
with requestx.Client() as client:
with client.stream("GET", url) as response:
response.raise_for_status()
with open(filename, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
download_file("https://example.com/large-file.zip", "downloaded.zip")
Download with Progress¶
import requestx
def download_with_progress(url: str, filename: str):
with requestx.Client() as client:
with client.stream("GET", url) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(filename, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size:
percent = (downloaded / total_size) * 100
print(f"\rProgress: {percent:.1f}%", end="")
print("\nDownload complete!")
download_with_progress("https://httpbin.org/bytes/100000", "file.bin")
Async File Download¶
import asyncio
import aiofiles
import requestx
async def download_file_async(url: str, filename: str):
async with requestx.AsyncClient() as client:
async with await client.stream("GET", url) as response:
response.raise_for_status()
async with aiofiles.open(filename, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
await f.write(chunk)
asyncio.run(download_file_async("https://example.com/file.zip", "downloaded.zip"))
Streaming Server-Sent Events (SSE)¶
Handle SSE streams:
import requestx
def handle_sse(url: str):
with requestx.Client() as client:
with client.stream("GET", url) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:]
print(f"Event: {data}")
# Async version
async def handle_sse_async(url: str):
async with requestx.AsyncClient() as client:
async with await client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
print(f"Event: {data}")
Streaming JSON Lines (JSONL)¶
Process JSONL streams:
import json
import requestx
def process_jsonl(url: str):
with requestx.Client() as client:
with client.stream("GET", url) as response:
for line in response.iter_lines():
if line.strip():
data = json.loads(line)
process_record(data)
# Async version
async def process_jsonl_async(url: str):
async with requestx.AsyncClient() as client:
async with await client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line.strip():
data = json.loads(line)
process_record(data)
Response Properties¶
Access response metadata before streaming:
import requestx
with requestx.Client() as client:
with client.stream("GET", url) as response:
# Check status before consuming
print(f"Status: {response.status_code}")
print(f"Headers: {response.headers}")
print(f"Content-Length: {response.headers.get('content-length')}")
# Raise for errors
response.raise_for_status()
# Then stream the content
for chunk in response.iter_bytes():
process(chunk)
Memory Efficiency¶
Streaming is essential for large responses to avoid memory issues:
import requestx
# Bad: Loads entire response into memory
response = client.get("https://example.com/huge-file.zip")
data = response.content # Potentially gigabytes in memory!
# Good: Stream to process without loading all into memory
with client.stream("GET", "https://example.com/huge-file.zip") as response:
for chunk in response.iter_bytes(chunk_size=8192):
# Process chunk by chunk
process_chunk(chunk)
Best Practices¶
- Always use context managers - Ensures streams are properly closed
- Set appropriate chunk sizes - Balance between memory usage and I/O overhead
- Check status before streaming - Verify the response is successful first
- Handle timeouts - Set read timeouts for long-running streams
- Use async for concurrent downloads - Better resource utilization
import asyncio
import requestx
async def download_multiple(urls: list[str], output_dir: str):
async with requestx.AsyncClient(
timeout=requestx.Timeout(timeout=300.0, connect=10.0)
) as client:
async def download_one(url: str):
filename = url.split("/")[-1]
filepath = f"{output_dir}/{filename}"
async with await client.stream("GET", url) as response:
response.raise_for_status()
with open(filepath, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=65536):
f.write(chunk)
return filepath
# Download all concurrently
tasks = [download_one(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"Failed: {url} - {result}")
else:
print(f"Downloaded: {result}")
asyncio.run(download_multiple([
"https://example.com/file1.zip",
"https://example.com/file2.zip",
], "./downloads"))