首頁>科技>

使用提示

1.基於aiohttp的代碼樣例支持訪問http,https網頁

2.aiohttp不是python原生庫,需要安裝才能使用: pip install aiohttp

3.aiohttp只支持Python3.5及以上

4.如Windows系統使用aiohttp訪問https網站拋出異常,在import asyncio後調用 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())即可解決。

  • #!/usr/bin/env python

  • # -*- coding: utf-8 -*-

  • """

  • 使用aiohttp請求代理服務器

  • 請求http和https網頁均適用

  • """

  • import random

  • import asyncio

  • # asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) windows系統請求https網站報錯時調用此方法

  • import aiohttp

  • import requests

  • page_url = "http://icanhazip.com/" # 要訪問的目標網頁

  • # API接口,返回格式為json

  • api_url = "http://dps.kdlapi.com/api/getdps/?orderid=9266892014xxxxx&signature=xxxxx&num=5&pt=1&format=json&sep=1" # API接口

  • # API接口返回的proxy_list

  • proxy_list = requests.get(api_url).json().get("data").get("proxy_list")

  • # 用戶名密碼認證(私密代理/獨享代理)

  • username = "username"

  • password = "password"

  • proxy_auth = aiohttp.BasicAuth(username, password)

  • async def fetch(url):

  • async with aiohttp.ClientSession() as session:

  • async with session.get(url, proxy="http://" + random.choice(proxy_list), proxy_auth=proxy_auth) as resp:

  • content = await resp.read()

  • print(f"status_code: {resp.status}, content: {content}")

  • def run():

  • loop = asyncio.get_event_loop()

  • # 異步發出5次請求

  • tasks = [fetch(page_url) for _ in range(5)]

  • loop.run_until_complete(asyncio.wait(tasks))

  • if __name__ == "__main__":

  • run()

1
最新評論
  • 整治雙十一購物亂象,國家再次出手!該跟這些套路說再見了
  • 日見新生,風啟山嵐 瑪莎拉蒂全新Grecale開啟江南詩意之旅