Client session
:class:`ClientSession` is the heart and the main entry point for all
client API operations.
Client tracing
The execution flow of a specific request can be followed attaching
listeners coroutines to the signals provided by the
:class:`TraceConfig` instance, this instance will be used as a
parameter for the :class:
`ClientSession` constructor having as a
result a client that triggers the different signals supported by the
:class:`TraceConfig`. By default any instance of
:class:
Custom cookies
To send your own cookies to the server, you can use the cookies
parameter of :class:`ClientSession` constructor:
Database
Launch these sql scripts to init database and fill it with sample data:
psql template1 < demo/sql/init_db.sql
and
psql template1 < demo/sql/sample_data.sql
Now you have two tables:
- for storing their permissions
License
The library is licensed under a MIT license.
Limiting connection pool size
To limit amount of simultaneously opened connections you can pass limit
parameter to connector:
Python examples of aio
def __init__(self, cfg_file, callback, loop=None, username=None, password=None): """ cfg_file: dictionary or filename or yaml text """ self.config = StreamConfigReader().read(cfg_file) self.callback = callback self._loop = loop or asyncio.get_event_loop() self._queue = asyncio.Queue(maxsize=20, loop=self._loop) auth = None if username != None and password != None: auth = aiohttp.BasicAuth(login=username, password=password, encoding='utf-8') self.session = aiohttp.ClientSession( read_timeout=self.config.timeout, conn_timeout=self.config.timeout, raise_for_status=True, loop=self._loop, auth=auth)
def __init__(self, api, host, port, username, password, iter_cnt=-1, iter_delay=600, task_timeout=120, worker_cnt=4, post_timeout=60, no_verify_ssl=False): '''[summary] ''' self._api = api self._workers = [] self._iter_cnt = iter_cnt self._iter_delay = iter_delay self._worker_cnt = worker_cnt self._task_queue = Queue() self._task_timeout = task_timeout self._output_lock = Lock() self._url = f'https://{host}:{port}/mkctf-api/healthcheck' self._ssl = False if no_verify_ssl else None self._auth = BasicAuth(username, password) self._post_timeout = ClientTimeout(total=post_timeout)
def push(self, host, port=443, tags=[], categories=[], username='', password='', no_verify_ssl=False): '''Push challenge configuration to a scoreboard ''' self.__assert_valid_repo() challenges = [] for challenge in self._repo.scan(tags, categories): challenges.append(challenge.conf.raw) url = f'https://{host}:{port}/mkctf-api/push' ssl = False if no_verify_ssl else None auth = BasicAuth(username, password) timeout = ClientTimeout(total=2*60) async with ClientSession(auth=auth, timeout=timeout) as session: async with session.post(url, ssl=ssl, json={'challenges': challenges}) as resp: if resp.status < 400: app_log.info("push succeeded.") return {'pushed': True} app_log.error("push failed.") return {'pushed': False}
def update_proxy(self, proxy, proxy_auth, proxy_headers): if proxy and proxy.scheme not in ['http', 'socks4', 'socks5']: raise ValueError( "Only http, socks4 and socks5 proxies are supported") if proxy and proxy_auth: if proxy.scheme == 'http' and not isinstance(proxy_auth, aiohttp.BasicAuth): raise ValueError("proxy_auth must be None or " "BasicAuth() tuple for http proxy") if proxy.scheme == 'socks4' and not isinstance(proxy_auth, Socks4Auth): raise ValueError("proxy_auth must be None or Socks4Auth() " "tuple for socks4 proxy") if proxy.scheme == 'socks5' and not isinstance(proxy_auth, Socks5Auth): raise ValueError("proxy_auth must be None or Socks5Auth() " "tuple for socks5 proxy") self.proxy = proxy self.proxy_auth = proxy_auth self.proxy_headers = proxy_headers
def __init__( self, *, url: Union[str, URL] = BASE, use_user_agent: bool = False, forwarded_for: Optional[str] = None, proxy: Optional[str] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None, timeout: Union[float, int] = 150, max_requests: int = 250, debug: bool = False, **kwargs, ) -> None: self.semaphore = asyncio.Semaphore(max_requests) self.url = URL(url) self.use_agent = use_user_agent self.forwarded_for = forwarded_for self.proxy = proxy self.proxy_auth = proxy_auth self.timeout = timeout self.debug = debug self.last_result = None # for testing
def post_request(self, uri, payload, headers): try: async with self.semaphore: async with aiohttp.ClientSession() as session: async with session.post( uri, data=json.dumps(payload), headers=headers, auth=aiohttp.BasicAuth(self.username, self.password), verify_ssl=False, ) as _response: if _response.status != 204: await _response.read() else: return _response except (Exception, TimeoutError): self.logger.exception("Failed to communicate with server.") raise BadfishException return _response
def patch_request(self, uri, payload, headers, _continue=False): try: async with self.semaphore: async with aiohttp.ClientSession() as session: async with session.patch( uri, data=json.dumps(payload), headers=headers, auth=aiohttp.BasicAuth(self.username, self.password), verify_ssl=False, ) as _response: await _response.read() except Exception as ex: if _continue: return else: self.logger.debug(ex) self.logger.error("Failed to communicate with server.") raise BadfishException return _response
def put_host_parameter(self, host_id, parameter_id, value): logger.debug("PUT param: {%s:%s}" % (parameter_id, value)) endpoint = "/hosts/%s/parameters/%s" % (host_id, parameter_id) data = {'parameter': {"value": value}} try: async with self.semaphore: async with aiohttp.ClientSession( loop=self.loop ) as session: async with session.put( self.url endpoint, json=data, auth=BasicAuth(self.username, self.password), verify_ssl=False, ) as response: await response.json(content_type="application/json") except Exception as ex: logger.debug(ex) logger.error("There was something wrong with your request.") return False if response.status in [200, 204]: logger.info("Host parameter updated successfully.") return True return False
def post_host_parameter(self, host_id, name, value): logger.debug("PUT param: {%s:%s}" % (name, value)) endpoint = "/hosts/%s/parameters" % host_id data = {"parameter": {"name": name, "value": value}} try: async with self.semaphore: async with aiohttp.ClientSession( loop=self.loop ) as session: async with session.post( self.url endpoint, json=data, auth=BasicAuth(self.username, self.password), verify_ssl=False, ) as response: await response.json(content_type="application/json") except Exception as ex: logger.debug(ex) logger.error("There was something wrong with your request.") return False if response.status in [200, 201, 204]: logger.info("Host parameter updated successfully.") return True return False
def update_user_password(self, login, password): logger.debug("PUT login pass: {%s}" % login) _host_id = await self.get_user_id(login) endpoint = "/users/%s" % _host_id data = {"user": {"login": login, "password": password}} try: async with self.semaphore: async with aiohttp.ClientSession( loop=self.loop ) as session: async with session.put( self.url endpoint, json=data, auth=BasicAuth(self.username, self.password), verify_ssl=False, ) as response: await response.json(content_type="application/json") except Exception as ex: logger.debug(ex) logger.error("There was something wrong with your request.") return False if response.status in [200, 204]: logger.info("User password updated successfully.") return True return False
def get_request(self, uri, _continue=False): try: async with self.semaphore: async with aiohttp.ClientSession(loop=self.loop) as session: async with session.get( uri, auth=BasicAuth(self.username, self.password), verify_ssl=False, timeout=60, ) as _response: await _response.text("utf-8", "ignore") except (Exception, TimeoutError) as ex: if _continue: return else: logger.debug(ex) logger.error("Failed to communicate with server.") raise BadfishException return _response
def post_request(self, uri, payload, headers): try: async with self.semaphore: async with aiohttp.ClientSession(loop=self.loop) as session: async with session.post( uri, data=json.dumps(payload), headers=headers, auth=BasicAuth(self.username, self.password), verify_ssl=False, ) as _response: if _response.status != 204: await _response.text("utf-8", "ignore") else: return _response except (Exception, TimeoutError): logger.exception("Failed to communicate with server.") raise BadfishException return _response
def patch_request(self, uri, payload, headers, _continue=False): try: async with self.semaphore: async with aiohttp.ClientSession(loop=self.loop) as session: async with session.patch( uri, data=json.dumps(payload), headers=headers, auth=BasicAuth(self.username, self.password), verify_ssl=False, ) as _response: await _response.text("utf-8", "ignore") except Exception as ex: if _continue: return else: logger.debug(ex) logger.error("Failed to communicate with server.") raise BadfishException return _response
def get_tracking_pairs(self) -> Dict[str, OrderBookTrackerEntry]: auth: aiohttp.BasicAuth = aiohttp.BasicAuth(login=conf.coinalpha_order_book_api_username, password=conf.coinalpha_order_book_api_password) client_session: aiohttp.ClientSession = await self.get_client_session() response: aiohttp.ClientResponse = await client_session.get(self.SNAPSHOT_REST_URL, auth=auth) timestamp: float = time.time() if response.status != 200: raise EnvironmentError(f"Error fetching order book tracker snapshot from {self.SNAPSHOT_REST_URL}.") binary_data: bytes = await response.read() order_book_tracker_data: Dict[str, Tuple[pd.DataFrame, pd.DataFrame]] = pickle.loads(binary_data) retval: Dict[str, OrderBookTrackerEntry] = {} for trading_pair, (bids_df, asks_df) in order_book_tracker_data.items(): order_book: BinanceOrderBook = BinanceOrderBook() order_book.apply_numpy_snapshot(bids_df.values, asks_df.values) retval[trading_pair] = OrderBookTrackerEntry(trading_pair, timestamp, order_book) return retval
def _sync_send(self, api_url, req_args): params = req_args["params"] if "params" in req_args else None data = req_args["data"] if "data" in req_args else None files = req_args["files"] if "files" in req_args else None _json = req_args["json"] if "json" in req_args else None headers = req_args["headers"] if "headers" in req_args else None token = params.get("token") if params and "token" in params else None auth = ( req_args["auth"] if "auth" in req_args else None ) # Basic Auth for oauth.v2.access / oauth.access if auth is not None: if isinstance(auth, BasicAuth): headers["Authorization"] = auth.encode() elif isinstance(auth, str): headers["Authorization"] = auth else: self._logger.warning( f"As the auth: {auth}: {type(auth)} is unsupported, skipped" ) body_params = {} if params: body_params.update(params) if data: body_params.update(data) return self._urllib_api_call( token=token, url=api_url, query_params={}, body_params=body_params, files=files, json_body=_json, additional_headers=headers, )
def test_proxy_client_request_invalid(loop): with pytest.raises(ValueError) as cm: ProxyClientRequest( 'GET', URL('http://python.org'), proxy=URL('socks6://proxy.org'), proxy_auth=None, loop=loop) assert 'Only http, socks4 and socks5 proxies are supported' in str(cm.value) with pytest.raises(ValueError) as cm: ProxyClientRequest( 'GET', URL('http://python.org'), loop=loop, proxy=URL('http://proxy.org'), proxy_auth=Socks4Auth('l')) assert 'proxy_auth must be None or BasicAuth() ' 'tuple for http proxy' in str(cm.value) with pytest.raises(ValueError) as cm: ProxyClientRequest( 'GET', URL('http://python.org'), loop=loop, proxy=URL('socks4://proxy.org'), proxy_auth=BasicAuth('l')) assert 'proxy_auth must be None or Socks4Auth() ' 'tuple for socks4 proxy' in str(cm.value) with pytest.raises(ValueError) as cm: ProxyClientRequest( 'GET', URL('http://python.org'), loop=loop, proxy=URL('socks5://proxy.org'), proxy_auth=Socks4Auth('l')) assert 'proxy_auth must be None or Socks5Auth() ' 'tuple for socks5 proxy' in str(cm.value)
Redirection history
If a request was redirected, it is possible to view previous responses using
the :attr:`~ClientResponse.history` attribute:
Resolving using custom nameservers
In order to specify the nameservers to when resolving the hostnames,
:term:`aiodns` is required:
Response headers and cookies
We can view the server’s response :attr:`ClientResponse.headers` using
a :class:`~multidict.CIMultiDictProxy`:
assert resp.headers == { 'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2022 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}
Run tests
pip install -r requirements-dev.txt py.test
Setup
Once we have all the code in place we can install it for our application:
Usage — aio 0.4.0- documentation
First of all, what is aiohttp_security about?
aiohttp-security is a set of public API functions as well as a
reference standard for implementation details for securing access to
assets served by a wsgi server.
Assets are secured using authentication and authorization as explained
below. aiohttp-security is part of the
aio-libs project which takes advantage
of asynchronous processing using Python’s asyncio library.