pywebio.platform.aiohttp 源代码

import asyncio
import fnmatch
import json
import logging
from functools import partial
from os import path, listdir
from urllib.parse import urlparse

from aiohttp import web

from .tornado import open_webbrowser_on_server_started
from .utils import make_applications
from ..session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target, Session
from ..session.base import get_session_info_from_headers
from ..utils import get_free_port, STATIC_PATH, iscoroutinefunction, isgeneratorfunction

logger = logging.getLogger(__name__)


def _check_origin(origin, allowed_origins, host):
    if _is_same_site(origin, host):
        return True

    return any(
        fnmatch.fnmatch(origin, patten)
        for patten in allowed_origins
    )


def _is_same_site(origin, host):
    """判断 origin 和 host 是否一致。origin 和 host 都为http协议请求头"""
    parsed_origin = urlparse(origin)
    origin = parsed_origin.netloc
    origin = origin.lower()

    # Check to see that origin matches host directly, including ports
    return origin == host


def _webio_handler(applications, websocket_settings, check_origin_func=_is_same_site):
    """获取用于Tornado进行整合的RequestHandle类

    :param dict applications: 任务名->任务函数 的映射
    :param callable check_origin_func: check_origin_func(origin, handler) -> bool
    :return: Tornado RequestHandle类
    """
    ioloop = asyncio.get_event_loop()

    async def wshandle(request: web.Request):
        origin = request.headers.get('origin')
        if origin and not check_origin_func(origin=origin, host=request.host):
            return web.Response(status=403, text="Cross origin websockets not allowed")

        ws = web.WebSocketResponse(**websocket_settings)
        await ws.prepare(request)

        close_from_session_tag = False  # 是否由session主动关闭连接

        def send_msg_to_client(session: Session):
            for msg in session.get_task_commands():
                msg_str = json.dumps(msg)
                ioloop.create_task(ws.send_str(msg_str))

        def close_from_session():
            nonlocal close_from_session_tag
            close_from_session_tag = True
            ioloop.create_task(ws.close())
            logger.debug("WebSocket closed from session")

        session_info = get_session_info_from_headers(request.headers)
        session_info['user_ip'] = request.remote
        session_info['request'] = request
        session_info['backend'] = 'aiohttp'

        app_name = request.query.getone('app', 'index')
        application = applications.get(app_name) or applications['index']

        if iscoroutinefunction(application) or isgeneratorfunction(application):
            session = CoroutineBasedSession(application, session_info=session_info,
                                            on_task_command=send_msg_to_client,
                                            on_session_close=close_from_session)
        else:
            session = ThreadBasedSession(application, session_info=session_info,
                                         on_task_command=send_msg_to_client,
                                         on_session_close=close_from_session, loop=ioloop)

        async for msg in ws:
            if msg.type == web.WSMsgType.text:
                data = msg.json()
                if data is not None:
                    session.send_client_event(data)
            elif msg.type == web.WSMsgType.binary:
                pass
            elif msg.type == web.WSMsgType.close:
                if not close_from_session_tag:
                    session.close()
                    logger.debug("WebSocket closed from client")

        return ws

    return wshandle


[文档]def webio_handler(applications, allowed_origins=None, check_origin=None, websocket_settings=None): """获取在aiohttp中运行PyWebIO任务函数的 `Request Handle <https://docs.aiohttp.org/en/stable/web_quickstart.html#aiohttp-web-handler>`_ 协程。 Request Handle基于WebSocket协议与浏览器进行通讯。 :param list/dict/callable applications: PyWebIO应用。 :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。 :param callable check_origin: 请求来源检查函数。 :param dict websocket_settings: 创建 aiohttp WebSocketResponse 时使用的参数。见 https://docs.aiohttp.org/en/stable/web_reference.html#websocketresponse 关于 ``applications`` 、 ``allowed_origins`` 、 ``check_origin`` 参数的详细说明见 :func:`pywebio.platform.aiohttp.start_server` 的同名参数。 :return: aiohttp Request Handler """ applications = make_applications(applications) for target in applications.values(): register_session_implement_for_target(target) websocket_settings = websocket_settings or {} if check_origin is None: check_origin_func = partial(_check_origin, allowed_origins=allowed_origins or []) else: check_origin_func = lambda origin, handler: _is_same_site(origin, handler) or check_origin(origin) return _webio_handler(applications=applications, check_origin_func=check_origin_func, websocket_settings=websocket_settings)
[文档]def static_routes(prefix='/'): """获取用于提供PyWebIO静态文件的aiohttp路由列表 :param str prefix: 静态文件托管的URL路径,默认为根路径 ``/`` :return: aiohttp路由列表 """ async def index(request): return web.FileResponse(path.join(STATIC_PATH, 'index.html')) files = [path.join(STATIC_PATH, d) for d in listdir(STATIC_PATH)] dirs = filter(path.isdir, files) routes = [web.static(prefix + path.basename(d), d) for d in dirs] routes.append(web.get(prefix, index)) return routes
[文档]def start_server(applications, port=0, host='', debug=False, allowed_origins=None, check_origin=None, auto_open_webbrowser=False, websocket_settings=None, **aiohttp_settings): """启动一个 aiohttp server 将PyWebIO应用作为Web服务提供。 :param list/dict/callable applications: PyWebIO应用. 格式同 :func:`pywebio.platform.tornado.start_server` 的 ``applications`` 参数 :param int port: 服务监听的端口。设置为 ``0`` 时,表示自动选择可用端口。 :param str host: 服务绑定的地址。 ``host`` 可以是IP地址或者为hostname。如果为hostname,服务会监听所有与该hostname关联的IP地址。 通过设置 ``host`` 为空字符串或 ``None`` 来将服务绑定到所有可用的地址上。 :param bool debug: 是否开启asyncio的Debug模式 :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。 来源包含协议、域名和端口部分,允许使用 Unix shell 风格的匹配模式(全部规则参见 `Python文档 <https://docs.python.org/zh-tw/3/library/fnmatch.html>`_ ): - ``*`` 为通配符 - ``?`` 匹配单个字符 - ``[seq]`` 匹配seq中的字符 - ``[!seq]`` 匹配不在seq中的字符 比如 ``https://*.example.com`` 、 ``*://*.example.com`` :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议、域名和端口部分)字符串, 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略 :param bool auto_open_webbrowser: 当服务启动后,是否自动打开浏览器来访问服务。(该操作需要操作系统支持) :param dict websocket_settings: 创建 aiohttp WebSocketResponse 时使用的参数。见 https://docs.aiohttp.org/en/stable/web_reference.html#websocketresponse :param aiohttp_settings: 需要传给 aiohttp Application 的参数。可用参数见 https://docs.aiohttp.org/en/stable/web_reference.html#application """ kwargs = locals() if not host: host = '0.0.0.0' if port == 0: port = get_free_port() handler = webio_handler(applications, allowed_origins=allowed_origins, check_origin=check_origin, websocket_settings=websocket_settings) app = web.Application(**aiohttp_settings) app.router.add_routes([web.get('/io', handler)]) app.router.add_routes(static_routes()) if auto_open_webbrowser: asyncio.get_event_loop().create_task(open_webbrowser_on_server_started('localhost', port)) if debug: logging.getLogger("asyncio").setLevel(logging.DEBUG) print('Listen on %s:%s' % (host, port)) web.run_app(app, host=host, port=port)