使用提示
1.基於aiohttp的代碼樣例支持訪問http,https網頁
2.aiohttp不是python原生庫,需要安裝才能使用: pip install aiohttp
3.aiohttp只支持Python3.5及以上
4.如Windows系統使用aiohttp訪問https網站拋出異常,在import asyncio後調用 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())即可解決。
-
#!/usr/bin/env python
-
# -*- coding: utf-8 -*-
"""
-
使用aiohttp請求代理服務器
-
請求http和https網頁均適用
-
"""
-
import random
-
import asyncio
-
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) windows系統請求https網站報錯時調用此方法
-
import aiohttp
-
import requests
-
page_url = "http://icanhazip.com/" # 要訪問的目標網頁
-
# API接口,返回格式為json
-
api_url = "http://dps.kdlapi.com/api/getdps/?orderid=9266892014xxxxx&signature=xxxxx&num=5&pt=1&format=json&sep=1" # API接口
-
# API接口返回的proxy_list
-
proxy_list = requests.get(api_url).json().get("data").get("proxy_list")
# 用戶名密碼認證(私密代理/獨享代理)
-
username = "username"
-
password = "password"
-
proxy_auth = aiohttp.BasicAuth(username, password)
-
async def fetch(url):
-
async with aiohttp.ClientSession() as session:
-
async with session.get(url, proxy="http://" + random.choice(proxy_list), proxy_auth=proxy_auth) as resp:
-
content = await resp.read()
-
print(f"status_code: {resp.status}, content: {content}")
-
def run():
-
loop = asyncio.get_event_loop()
-
# 異步發出5次請求
-
tasks = [fetch(page_url) for _ in range(5)]
loop.run_until_complete(asyncio.wait(tasks))
-
if __name__ == "__main__":
-
run()