# _*_ coding: utf-8 _*_
"""
python_aiohttp.py by xianhu
"""
import asyncio
import aiohttp
# ç®åå®ä¾
async def aiohttp_test01(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.status)
print(await resp.text())
loop = asyncio.get_event_loop()
tasks = [aiohttp_test01("https://api.github.com/events")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# å
¶ä»Httpæ¹æ³
# session.post('http://httpbin.org/post', data=b'data')
# session.put('http://httpbin.org/put', data=b'data')
# session.delete('http://httpbin.org/delete')
# session.head('http://httpbin.org/get')
# session.options('http://httpbin.org/get')
# session.patch('http://httpbin.org/patch', data=b'data')
# èªå®ä¹Headers
# payload = {'some': 'data'}
# headers = {'content-type': 'application/json'}
# await session.post(url, data=json.dumps(payload), headers=headers)
# èªå®ä¹Cookie
# cookies = {'cookies_are': 'working'}
# async with ClientSession(cookies=cookies) as session:
# 访é®Cookie: session.cookie_jar
# å¨URLsä¸ä¼ éåæ°
# 1. params = {'key1': 'value1', 'key2': 'value2'}
# 2. params = [('key', 'value1'), ('key', 'value2')]
# async with session.get('http://httpbin.org/get', params=params) as resp:
# assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'
# åéæ°æ®
# payload = {'key1': 'value1', 'key2': 'value2'}
# async with session.post('http://httpbin.org/post', data=payload) as resp:
# async with session.post(url, data=json.dumps(payload)) as resp:
# print(await resp.text())
# åéæä»¶(1)
# files = {'file': open('report.xls', 'rb')}
# await session.post(url, data=files)
# åéæ°æ®(2)
# data = FormData()
# data.add_field('file',
# open('report.xls', 'rb'),
# filename='report.xls',
# content_type='application/vnd.ms-excel')
# await session.post(url, data=data)
# è¶
æ¶è®¾ç½®
# aync with session.get('https://github.com', timeout=60) as r:
# ä»£çæ¯æ
# async with aiohttp.ClientSession() as session:
# async with session.get("http://python.org", proxy="http://some.proxy.com") as resp:
# print(resp.status)
# async with aiohttp.ClientSession() as session:
# proxy_auth = aiohttp.BasicAuth('user', 'pass')
# async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp:
# print(resp.status)
# session.get("http://python.org", proxy="http://user:[email protected]")
# è¿åçå
容
# async with session.get('https://api.github.com/events') as resp:
# print(await resp.text())
# print(await resp.text(encoding='gbk'))
# print(await resp.read())
# print(await resp.json())
# è¿åå
容è¾å¤§
# with open(filename, 'wb') as fd:
# while True:
# chunk = await resp.content.read(chunk_size)
# if not chunk:
# break
# fd.write(chunk)
# è¿åçå
¶ä»åé
# async with session.get('http://httpbin.org/get') as resp:
# print(resp.status) # ç¶æç
# print(resp.headers) # Headers
# print(resp.raw_headers) # åå§Headers
# print(resp.cookies) # è¿åçCookie
# 访é®åå²History
# resp = await session.get('http://example.com/some/redirect/')
# resp: