2022-04-05
python模块
00
请注意,本文编写于 1110 天前,最后修改于 40 天前,其中某些信息可能已经过时。

目录

1. 安装 httpx
同步操作
导入 httpx 并发送请求
常用请求操作
GET 请求
POST 请求
其他 HTTP 方法
设置请求参数、头部、超时
上传文件
会话管理
异步操作
导入 httpx 和 asyncio
异步请求操作
异步 GET 请求
异步 POST 请求
异步上传文件
异步并发请求
异步会话管理
异步与同步的常见配置选项
超时设置
重试策略
设置代理

httpx 是一个功能强大的 HTTP 客户端库,支持同步和异步操作。以下是 httpx 的常用操作:

1. 安装 httpx

bash
pip install httpx

同步操作

导入 httpx 并发送请求

python
import httpx

常用请求操作

GET 请求

python
response = httpx.get("https://api.example.com/data") response.encoding = response.charset_encoding #设置编码 print(response.status_code) print(response.json()) # 获取 JSON 响应 print(response.text) # 获取文本响应

POST 请求

python
payload = {"key": "value"} response = httpx.post("https://api.example.com/data", json=payload) print(response.json())

其他 HTTP 方法

python
# PUT 请求 response = httpx.put("https://api.example.com/data", json={"update": "value"}) # DELETE 请求 response = httpx.delete("https://api.example.com/data")

设置请求参数、头部、超时

python
response = httpx.get("https://api.example.com/data", params={"query": "test"}, headers={"Authorization": "Bearer token"}, timeout=10.0) print(response.json())

上传文件

python
files = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")} response = httpx.post("https://api.example.com/upload", files=files) print(response.status_code)

会话管理

使用 httpx.Client 可以维持一个会话(session),对多个请求使用同一配置。

python
with httpx.Client(base_url="https://api.example.com") as client: response = client.get("/data") print(response.json())

异步操作

httpx 支持异步请求,适合高并发和网络密集型应用。

导入 httpxasyncio

python
import httpx import asyncio

异步请求操作

异步 GET 请求

python
async def fetch_data(): async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com/data") print(response.json()) # 运行异步任务 asyncio.run(fetch_data())

异步 POST 请求

python
async def send_data(): async with httpx.AsyncClient() as client: response = await client.post("https://api.example.com/data", json={"key": "value"}) print(response.json()) asyncio.run(send_data())

异步上传文件

python
async def upload_file(): async with httpx.AsyncClient() as client: files = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")} response = await client.post("https://api.example.com/upload", files=files) print(response.status_code) asyncio.run(upload_file())

异步并发请求

可以使用 asyncio.gather 实现多个异步任务并发执行。

python
async def fetch(url): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json() async def main(): urls = ["https://api.example.com/data1", "https://api.example.com/data2"] results = await asyncio.gather(*[fetch(url) for url in urls]) print(results) asyncio.run(main())

异步会话管理

python
async def fetch_data_with_session(): async with httpx.AsyncClient(base_url="https://api.example.com") as client: response = await client.get("/data") print(response.json()) asyncio.run(fetch_data_with_session())

异步与同步的常见配置选项

超时设置

可以为 httpx 请求设置不同类型的超时:

python
timeout = httpx.Timeout(5.0, connect=10.0) response = httpx.get("https://api.example.com/data", timeout=timeout)

重试策略

httpx 不直接支持重试机制,但可以通过自定义来实现:

python
async def fetch_with_retries(url, retries=3): for i in range(retries): try: async with httpx.AsyncClient() as client: response = await client.get(url) return response except httpx.RequestError: if i == retries - 1: raise # 异步调用 asyncio.run(fetch_with_retries("https://api.example.com"))

设置代理

同步和异步操作均支持代理配置:

python
# 同步代理 response = httpx.get("https://api.example.com", proxies="http://proxyserver:port") # 异步代理 async with httpx.AsyncClient(proxies="http://proxyserver:port") as client: response = await client.get("https://api.example.com")

本文作者:皓月归尘

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!