Advanced Examples¶
This page contains advanced usage patterns for RequestX.
Concurrent Async Requests¶
import asyncio
import requestx
async def fetch_url(client: requestx.AsyncClient, url: str) -> dict:
response = await client.get(url)
return {"url": url, "status": response.status_code}
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/uuid",
"https://httpbin.org/json",
"https://httpbin.org/headers",
]
async with requestx.AsyncClient() as client:
tasks = [fetch_url(client, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']}")
asyncio.run(main())
Rate-Limited API Client¶
import asyncio
import requestx
class RateLimitedClient:
def __init__(self, base_url: str, requests_per_second: float):
self.client = requestx.AsyncClient(base_url=base_url)
self.semaphore = asyncio.Semaphore(int(requests_per_second))
self.delay = 1.0 / requests_per_second
async def get(self, path: str, **kwargs) -> requestx.Response:
async with self.semaphore:
response = await self.client.get(path, **kwargs)
await asyncio.sleep(self.delay)
return response
async def close(self):
await self.client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
async def main():
async with RateLimitedClient(
"https://api.example.com",
requests_per_second=5
) as client:
for i in range(20):
response = await client.get(f"/item/{i}")
print(f"Item {i}: {response.status_code}")
asyncio.run(main())
Retry with Exponential Backoff¶
import asyncio
import random
import requestx
from requestx import ConnectError, TimeoutException, HTTPStatusError
async def fetch_with_retry(
client: requestx.AsyncClient,
url: str,
max_retries: int = 3,
base_delay: float = 1.0,
) -> requestx.Response:
last_error = None
for attempt in range(max_retries):
try:
response = await client.get(url)
response.raise_for_status()
return response
except (ConnectError, TimeoutException) as e:
last_error = e
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
await asyncio.sleep(delay)
except HTTPStatusError as e:
if e.response.status_code >= 500:
last_error = e
delay = base_delay * (2 ** attempt)
print(f"Server error. Retrying in {delay:.1f}s")
await asyncio.sleep(delay)
else:
raise
raise last_error
async def main():
async with requestx.AsyncClient(
timeout=requestx.Timeout(timeout=10.0)
) as client:
response = await fetch_with_retry(
client,
"https://httpbin.org/get"
)
print(response.json())
asyncio.run(main())
API Pagination¶
import requestx
def paginated_fetch(base_url: str, endpoint: str, per_page: int = 100):
"""Fetch all pages from a paginated API."""
with requestx.Client(base_url=base_url) as client:
page = 1
all_items = []
while True:
response = client.get(
endpoint,
params={"page": page, "per_page": per_page}
)
response.raise_for_status()
items = response.json()
if not items:
break
all_items.extend(items)
print(f"Fetched page {page}: {len(items)} items")
page += 1
return all_items
# Usage
items = paginated_fetch(
"https://api.example.com",
"/items"
)
print(f"Total items: {len(items)}")
Async Pagination¶
import asyncio
import requestx
async def async_paginated_fetch(
base_url: str,
endpoint: str,
per_page: int = 100
) -> list:
"""Fetch all pages concurrently."""
async with requestx.AsyncClient(base_url=base_url) as client:
# First, get total count
response = await client.get(endpoint, params={"per_page": 1})
total = int(response.headers.get("x-total-count", 100))
total_pages = (total + per_page - 1) // per_page
# Fetch all pages concurrently
async def fetch_page(page: int) -> list:
response = await client.get(
endpoint,
params={"page": page, "per_page": per_page}
)
return response.json()
tasks = [fetch_page(page) for page in range(1, total_pages + 1)]
pages = await asyncio.gather(*tasks)
# Flatten results
return [item for page in pages for item in page]
# Usage
asyncio.run(async_paginated_fetch("https://api.example.com", "/items"))
File Download with Progress¶
import requestx
import sys
def download_with_progress(url: str, filename: str):
with requestx.Client() as client:
with client.stream("GET", url) as response:
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
downloaded = 0
with open(filename, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total:
percent = downloaded / total * 100
bar_len = 50
filled = int(bar_len * downloaded / total)
bar = "=" * filled + "-" * (bar_len - filled)
sys.stdout.write(f"\r[{bar}] {percent:.1f}%")
sys.stdout.flush()
print(f"\nDownloaded {filename}")
# Usage
download_with_progress(
"https://httpbin.org/bytes/1000000",
"downloaded_file.bin"
)
Multipart File Upload¶
import requestx
def upload_file(url: str, file_path: str):
with open(file_path, "rb") as f:
files = {"file": (file_path.split("/")[-1], f.read())}
response = requestx.post(url, files=files)
response.raise_for_status()
return response.json()
# Usage
result = upload_file(
"https://httpbin.org/post",
"document.pdf"
)
Webhook Handler¶
import asyncio
import requestx
from typing import Callable, Any
class WebhookSender:
def __init__(self, webhook_url: str, secret: str):
self.webhook_url = webhook_url
self.secret = secret
self.client = requestx.AsyncClient(
timeout=requestx.Timeout(timeout=30.0)
)
async def send(self, event: str, data: dict) -> bool:
try:
response = await self.client.post(
self.webhook_url,
json={"event": event, "data": data},
headers={
"X-Webhook-Secret": self.secret,
"Content-Type": "application/json"
}
)
response.raise_for_status()
return True
except requestx.RequestError as e:
print(f"Webhook failed: {e}")
return False
async def close(self):
await self.client.aclose()
# Usage
async def main():
webhook = WebhookSender(
"https://example.com/webhook",
"secret-key"
)
try:
await webhook.send("user.created", {"id": 123, "name": "John"})
finally:
await webhook.close()
asyncio.run(main())
API Client with Automatic Token Refresh¶
import asyncio
from datetime import datetime, timedelta
import requestx
class APIClient:
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.token_expires = None
self.client = requestx.AsyncClient(base_url=base_url)
self._lock = asyncio.Lock()
async def _refresh_token(self):
response = await self.client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
expires_in = data.get("expires_in", 3600)
self.token_expires = datetime.now() + timedelta(seconds=expires_in - 60)
async def _ensure_token(self):
async with self._lock:
if not self.access_token or datetime.now() >= self.token_expires:
await self._refresh_token()
async def request(self, method: str, path: str, **kwargs) -> requestx.Response:
await self._ensure_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.access_token}"
response = await self.client.request(
method, path, headers=headers, **kwargs
)
return response
async def get(self, path: str, **kwargs) -> requestx.Response:
return await self.request("GET", path, **kwargs)
async def post(self, path: str, **kwargs) -> requestx.Response:
return await self.request("POST", path, **kwargs)
async def close(self):
await self.client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
# Usage
async def main():
async with APIClient(
base_url="https://api.example.com",
client_id="my-client",
client_secret="my-secret",
token_url="https://auth.example.com/oauth/token"
) as api:
users = (await api.get("/users")).json()
print(f"Users: {users}")
asyncio.run(main())
Health Check Endpoint¶
import asyncio
import requestx
async def check_health(urls: list[str]) -> dict:
"""Check health of multiple endpoints."""
results = {}
async with requestx.AsyncClient(
timeout=requestx.Timeout(timeout=5.0)
) as client:
async def check_one(url: str) -> tuple[str, dict]:
try:
response = await client.get(url)
return url, {
"status": "healthy",
"code": response.status_code,
"latency": response.elapsed
}
except requestx.TimeoutException:
return url, {"status": "timeout"}
except requestx.ConnectError:
return url, {"status": "unreachable"}
except Exception as e:
return url, {"status": "error", "message": str(e)}
tasks = [check_one(url) for url in urls]
results_list = await asyncio.gather(*tasks)
return dict(results_list)
# Usage
async def main():
urls = [
"https://httpbin.org/get",
"https://jsonplaceholder.typicode.com/posts/1",
"https://invalid.example.com",
]
health = await check_health(urls)
for url, status in health.items():
print(f"{url}: {status}")
asyncio.run(main())