pywebio.session.coroutinebased 源代码

import asyncio
import logging
import threading
from contextlib import contextmanager
from functools import partial

from .base import Session
from ..exceptions import SessionNotFoundException, SessionClosedException, SessionException
from ..utils import random_str, isgeneratorfunction, iscoroutinefunction

logger = logging.getLogger(__name__)


class WebIOFuture:
    def __init__(self, coro=None):
        self.coro = coro

    def __iter__(self):
        result = yield self
        return result

    __await__ = __iter__  # make compatible with 'await' expression


class _context:
    current_session = None  # type:"CoroutineBasedSession"
    current_task_id = None


class CoroutineBasedSession(Session):
    """
    基于协程的任务会话

    当主协程任务和会话内所有通过 `run_async` 注册的协程都退出后,会话关闭。
    当用户浏览器主动关闭会话,CoroutineBasedSession.close 被调用, 协程任务和会话内所有通过 `run_async` 注册的协程都被关闭。

    """

    # 运行事件循环的线程id
    # 用于在 CoroutineBasedSession.get_current_session() 判断调用方是否合法
    # Tornado backend时,在创建第一个CoroutineBasedSession时初始化
    # Flask backend时,在platform.flaskrun_event_loop()时初始化
    event_loop_thread_id = None

    @classmethod
    def get_current_session(cls) -> "CoroutineBasedSession":
        if _context.current_session is None or cls.event_loop_thread_id != threading.current_thread().ident:
            raise SessionNotFoundException("No session found in current context!")

        if _context.current_session.closed():
            raise SessionClosedException

        return _context.current_session

    @staticmethod
    def get_current_task_id():
        if _context.current_task_id is None:
            raise RuntimeError("No current task found in context!")
        return _context.current_task_id

    def __init__(self, target, session_info, on_task_command=None, on_session_close=None):
        """
        :param target: 协程函数
        :param on_task_command: 由协程内发给session的消息的处理函数
        :param on_session_close: 会话结束的处理函数。后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
        """
        assert iscoroutinefunction(target) or isgeneratorfunction(target), ValueError(
            "CoroutineBasedSession accept coroutine function or generator function as task function")

        super().__init__(session_info)

        cls = type(self)

        self._on_task_command = on_task_command or (lambda _: None)
        self._on_session_close = on_session_close or (lambda: None)

        # 当前会话未被Backend处理的消息
        self.unhandled_task_msgs = []

        # 在创建第一个CoroutineBasedSession时 event_loop_thread_id 还未被初始化
        # 则当前线程即为运行 event loop 的线程
        if cls.event_loop_thread_id is None:
            cls.event_loop_thread_id = threading.current_thread().ident

        # 会话内的协程任务
        self.coros = {}  # coro_task_id -> Task()

        self._closed = False

        self._need_keep_alive = False

        # 当前会话未结束运行(已创建和正在运行的)的协程数量。当 _alive_coro_cnt 变为 0 时,会话结束。
        self._alive_coro_cnt = 1

        main_task = Task(self._start_main_task(target), session=self, on_coro_stop=self._on_task_finish)
        self.coros[main_task.coro_id] = main_task

        self._step_task(main_task)

    async def _start_main_task(self, target):
        await target()
        if self.need_keep_alive():
            from ..session import hold
            await hold()

    def _step_task(self, task, result=None):
        asyncio.get_event_loop().call_soon_threadsafe(partial(task.step, result))

    def _on_task_finish(self, task: "Task"):
        self._alive_coro_cnt -= 1

        if task.coro_id in self.coros:
            logger.debug('del self.coros[%s]', task.coro_id)
            del self.coros[task.coro_id]

        if self._alive_coro_cnt <= 0 and not self.closed():
            self.send_task_command(dict(command='close_session'))
            self._on_session_close()
            self.close()

    def send_task_command(self, command):
        """向会话发送来自协程内的消息

        :param dict command: 消息
        """
        if self.closed():
            raise SessionClosedException()
        self.unhandled_task_msgs.append(command)
        self._on_task_command(self)

    async def next_client_event(self):
        # 函数开始不需要判断 self.closed()
        # 如果会话关闭,对 get_current_session().next_client_event() 的调用会抛出SessionClosedException
        return await WebIOFuture()

    def send_client_event(self, event):
        """向会话发送来自用户浏览器的事件️

        :param dict event: 事件️消息
        """
        coro_id = event['task_id']
        coro = self.coros.get(coro_id)
        if not coro:
            logger.error('coro not found, coro_id:%s', coro_id)
            return
        self._step_task(coro, event)

    def get_task_commands(self):
        msgs = self.unhandled_task_msgs
        self.unhandled_task_msgs = []
        return msgs

    def _cleanup(self):
        for t in list(self.coros.values()):  # t.close() may cause self.coros changed size
            t.step(SessionClosedException, throw_exp=True)
            # in case that the task catch the SessionClosedException, we need to close it manually
            t.close()
        self.coros = {}  # delete session tasks

        # reset the reference, to avoid circular reference
        self._on_session_close = None
        self._on_task_command = None

    def close(self, nonblock=False):
        """关闭当前Session。由Backend调用"""
        if self.closed():
            return

        super().close()

        self._cleanup()

    def register_callback(self, callback, mutex_mode=False):
        """ 向Session注册一个回调函数,返回回调id

        :type callback: Callable or Coroutine
        :param callback: 回调函数. 函数签名为 ``callback(data)``. ``data`` 参数为回调事件的值
        :param bool mutex_mode: 互斥模式。若为 ``True`` ,则在运行回调函数过程中,无法响应同一组件(callback_id相同)的新点击事件,仅当 ``callback`` 为协程函数时有效
        :return str: 回调id.
        """

        async def callback_coro():
            while True:
                try:
                    event = await self.next_client_event()
                except SessionClosedException:
                    return

                assert event['event'] == 'callback'
                coro = None
                if iscoroutinefunction(callback):
                    coro = callback(event['data'])
                elif isgeneratorfunction(callback):
                    coro = asyncio.coroutine(callback)(event['data'])
                else:
                    try:
                        res = callback(event['data'])
                        if asyncio.iscoroutine(res):
                            coro = res
                        else:
                            del res  # `res` maybe pywebio.io_ctrl.Output, so need release `res`
                    except Exception:
                        self.on_task_exception()

                if coro is not None:
                    if mutex_mode:
                        await coro
                    else:
                        self.run_async(coro)

        cls = type(self)
        callback_task = Task(callback_coro(), cls.get_current_session())
        # Activate task
        # Don't callback.step(), it will result in recursive calls to step()
        # todo: integrate with inactive_coro_instances
        callback_task.coro.send(None)
        cls.get_current_session().coros[callback_task.coro_id] = callback_task

        self._need_keep_alive = True

        return callback_task.coro_id

    def run_async(self, coro_obj):
        """异步运行协程对象。可以在协程内调用 PyWebIO 交互函数

        :param coro_obj: 协程对象
        :return: An instance of  `TaskHandler` is returned, which can be used later to close the task.
        """
        assert asyncio.iscoroutine(coro_obj), '`run_async()` only accept coroutine object'

        self._alive_coro_cnt += 1

        task = Task(coro_obj, session=self, on_coro_stop=self._on_task_finish)
        self.coros[task.coro_id] = task
        asyncio.get_event_loop().call_soon_threadsafe(task.step)
        return task.task_handle()

    async def run_asyncio_coroutine(self, coro_obj):
        """若会话线程和运行事件的线程不是同一个线程,需要用 asyncio_coroutine 来运行asyncio中的协程"""
        assert asyncio.iscoroutine(coro_obj), '`run_asyncio_coroutine()` only accept coroutine object'

        res = await WebIOFuture(coro=coro_obj)
        return res

    def need_keep_alive(self) -> bool:
        return self._need_keep_alive


[文档] class TaskHandler: """The handler of coroutine task See also: `run_async() <pywebio.session.run_async>` """ def __init__(self, close, closed): self._close = close self._closed = closed
[文档] def close(self): """Close the coroutine task.""" return self._close()
[文档] def closed(self) -> bool: """Returns a bool stating whether the coroutine task is closed. """ return self._closed()
class Task: @contextmanager def session_context(self): """ >>> with session_context(): ... res = self.coros[-1].send(data) """ # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空 _context.current_session = self.session _context.current_task_id = self.coro_id try: yield finally: _context.current_session = None _context.current_task_id = None @staticmethod def gen_coro_id(coro=None): """生成协程id""" name = 'coro' if hasattr(coro, '__name__'): name = coro.__name__ return '%s-%s' % (name, random_str(10)) def __init__(self, coro, session: CoroutineBasedSession, on_coro_stop=None): """ :param coro: 协程对象 :param session: 创建该Task的会话实例 :param on_coro_stop: 任务结束(正常结束或外部调用Task.close)时运行的回调 """ self.session = session self.coro = coro self.result = None self.task_closed = False # 任务完毕/取消 self.on_coro_stop = on_coro_stop or (lambda _: None) self.coro_id = self.gen_coro_id(self.coro) self.pending_futures = {} # id(future) -> future logger.debug('Task[%s] created ', self.coro_id) def step(self, result=None, throw_exp=False): """激活协程 :param any result: 向协程传入的数据 :param bool throw_exp: 是否向协程引发异常,为 True 时, result 参数为相应的异常对象 """ coro_yield = None with self.session_context(): try: if throw_exp: coro_yield = self.coro.throw(result) else: coro_yield = self.coro.send(result) except StopIteration as e: if len(e.args) == 1: self.result = e.args[0] self.close() logger.debug('Task[%s] finished', self.coro_id) except Exception as e: if not isinstance(e, SessionException): self.session.on_task_exception() self.close() if coro_yield is None: return future = None if isinstance(coro_yield, WebIOFuture): if coro_yield.coro: future = asyncio.run_coroutine_threadsafe(coro_yield.coro, asyncio.get_event_loop()) else: future = coro_yield if not self.session.closed() and hasattr(future, 'add_done_callback'): future.add_done_callback(self._wakeup) self.pending_futures[id(future)] = future def _wakeup(self, future): if not future.cancelled(): del self.pending_futures[id(future)] self.step(future.result()) def close(self): if self.task_closed: return self.task_closed = True self.coro.close() while self.pending_futures: _, f = self.pending_futures.popitem() f.cancel() self.on_coro_stop(self) self.on_coro_stop = None # avoid circular reference self.session = None logger.debug('Task[%s] closed', self.coro_id) def __del__(self): if not self.task_closed: logger.warning('Task[%s] was destroyed but it is pending!', self.coro_id) def task_handle(self): handle = TaskHandler(close=self.close, closed=lambda: self.task_closed) return handle