2022-04-05
python模块
00
请注意,本文编写于 1179 天前,最后修改于 49 天前,其中某些信息可能已经过时。

目录

1. 安装 httpx
同步操作
导入 httpx 并发送请求
常用请求操作
GET 请求
POST 请求
其他 HTTP 方法
设置请求参数、头部、超时
上传文件
会话管理
异步操作
导入 httpx 和 asyncio
异步请求操作
异步 GET 请求
异步 POST 请求
异步上传文件
异步并发请求
异步会话管理
异步与同步的常见配置选项
超时设置
重试策略
设置代理

httpx 是一个功能强大的 HTTP 客户端库,支持同步和异步操作。

1. 安装 httpx

bash
pip install httpx

同步操作

导入 httpx 并发送请求

python
import httpx

常用请求操作

GET 请求

python
response = httpx.get("http://httpbin.org/get") response.encoding = response.charset_encoding #设置编码 print(response.status_code) print(response.json()) # 获取 JSON 响应 print(response.text) # 获取文本响应

POST 请求

python
payload = {"key": "value"} response = httpx.post("http://httpbin.org/post", json=payload) print(response.json())

其他 HTTP 方法

python
# PUT 请求 response = httpx.put("https://httpbin.org/put", json={"update": "value"}) # DELETE 请求 response = httpx.delete("https://httpbin.org/delete")

设置请求参数、头部、超时

python
response = httpx.get("https://httpbin.org/get", params={"query": "test"}, headers={"Authorization": "Bearer token"}, timeout=10.0) print(response.json())

上传文件

python
files = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")} response = httpx.post("https://httpbin.org/post", files=files) print(response.status_code)

会话管理

使用 httpx.Client 可以维持一个会话(session),对多个请求使用同一配置。

python
with httpx.Client(base_url="https://httpbin.org") as client: response = client.get("/get") print(response.json())

异步操作

httpx 支持异步请求,适合高并发和网络密集型应用。

导入 httpxasyncio

python
import httpx import asyncio

异步请求操作

异步 GET 请求

python
async def fetch_data(): async with httpx.AsyncClient() as client: response = await client.get("http://httpbin.org/get") print(response.json()) # 运行异步任务 asyncio.run(fetch_data())

异步 POST 请求

python
async def send_data(): async with httpx.AsyncClient() as client: response = await client.post("https://httpbin.org/post", json={"key": "value"}) print(response.json()) asyncio.run(send_data())

异步上传文件

python
async def upload_file(): async with httpx.AsyncClient() as client: files = {"file": ("filename.txt", open("filename.txt", "rb"), "text/plain")} response = await client.post("https://httpbin.org/post", files=files) print(response.status_code) asyncio.run(upload_file())

异步并发请求

可以使用 asyncio.gather 实现多个异步任务并发执行。

python
async def fetch(url): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json() async def main(): urls = ["https://httpbin.org/get?id=1", "https://httpbin.org/get?id=2"] results = await asyncio.gather(*[fetch(url) for url in urls]) print(results) asyncio.run(main())

异步会话管理

python
async def fetch_data_with_session(): async with httpx.AsyncClient(base_url="https://httpbin.org") as client: response = await client.get("/get") print(response.json()) asyncio.run(fetch_data_with_session())

异步与同步的常见配置选项

超时设置

可以为 httpx 请求设置不同类型的超时:

python
timeout = httpx.Timeout(5.0, connect=10.0) response = httpx.get("https://httpbin.org/get", timeout=timeout)

重试策略

httpx 不直接支持重试机制,但可以通过自定义来实现:

python
async def fetch_with_retries(url, retries=3): for i in range(retries): try: async with httpx.AsyncClient() as client: response = await client.get(url) return response except httpx.RequestError: if i == retries - 1: raise # 异步调用 asyncio.run(fetch_with_retries("https://httpbin.org"))

设置代理

同步和异步操作均支持代理配置:

python
# 同步代理 response = httpx.get("https://httpbin.org", proxies="http://proxyserver:port") # 异步代理 async with httpx.AsyncClient(proxies="http://proxyserver:port") as client: response = await client.get("https://httpbin.org")

本文作者:皓月归尘

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!