httpx
是一个功能强大的 HTTP 客户端库,支持同步和异步操作。以下是 httpx
的常用操作:
httpx
bashpip install httpx
httpx
并发送请求pythonimport httpx
pythonresponse = httpx.get("https://api.example.com/data")
response.encoding = response.charset_encoding #设置编码
print(response.status_code)
print(response.json()) # 获取 JSON 响应
print(response.text) # 获取文本响应
pythonpayload = {"key": "value"}
response = httpx.post("https://api.example.com/data", json=payload)
print(response.json())
python# PUT 请求
response = httpx.put("https://api.example.com/data", json={"update": "value"})
# DELETE 请求
response = httpx.delete("https://api.example.com/data")
pythonresponse = httpx.get("https://api.example.com/data", params={"query": "test"}, headers={"Authorization": "Bearer token"}, timeout=10.0)
print(response.json())
pythonfiles = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")}
response = httpx.post("https://api.example.com/upload", files=files)
print(response.status_code)
使用 httpx.Client
可以维持一个会话(session),对多个请求使用同一配置。
pythonwith httpx.Client(base_url="https://api.example.com") as client:
response = client.get("/data")
print(response.json())
httpx
支持异步请求,适合高并发和网络密集型应用。
httpx
和 asyncio
pythonimport httpx
import asyncio
pythonasync def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
print(response.json())
# 运行异步任务
asyncio.run(fetch_data())
pythonasync def send_data():
async with httpx.AsyncClient() as client:
response = await client.post("https://api.example.com/data", json={"key": "value"})
print(response.json())
asyncio.run(send_data())
pythonasync def upload_file():
async with httpx.AsyncClient() as client:
files = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")}
response = await client.post("https://api.example.com/upload", files=files)
print(response.status_code)
asyncio.run(upload_file())
可以使用 asyncio.gather
实现多个异步任务并发执行。
pythonasync def fetch(url):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
async def main():
urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
results = await asyncio.gather(*[fetch(url) for url in urls])
print(results)
asyncio.run(main())
pythonasync def fetch_data_with_session():
async with httpx.AsyncClient(base_url="https://api.example.com") as client:
response = await client.get("/data")
print(response.json())
asyncio.run(fetch_data_with_session())
可以为 httpx
请求设置不同类型的超时:
pythontimeout = httpx.Timeout(5.0, connect=10.0)
response = httpx.get("https://api.example.com/data", timeout=timeout)
httpx
不直接支持重试机制,但可以通过自定义来实现:
pythonasync def fetch_with_retries(url, retries=3):
for i in range(retries):
try:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response
except httpx.RequestError:
if i == retries - 1:
raise
# 异步调用
asyncio.run(fetch_with_retries("https://api.example.com"))
同步和异步操作均支持代理配置:
python# 同步代理
response = httpx.get("https://api.example.com", proxies="http://proxyserver:port")
# 异步代理
async with httpx.AsyncClient(proxies="http://proxyserver:port") as client:
response = await client.get("https://api.example.com")
本文作者:皓月归尘
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!