You can also yield a list or dict of Futures, which will be started at the same time and run in parallel; a list or dict of results will be returned when they are all finished:
def_make_coroutine_wrapper(func, replace_callback): """The inner workings of ``@gen.coroutine`` and ``@gen.engine``. The two decorators differ in their treatment of the ``callback`` argument, so we cannot simply implement ``@engine`` in terms of ``@coroutine``. """ # On Python 3.5, set the coroutine flag on our generator, to allow it # to be used with 'await'. ifhasattr(types, 'coroutine'): func = types.coroutine(func)
try: result = func(*args, **kwargs) # 如果这个函数是一个普通的函数,将返回一个值,否者会返回一个生成器。 except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: ifisinstance(result, GeneratorType):# 当它是生成器的时候。 # Inline the first iteration of Runner.run. This lets us # avoid the cost of creating a Runner when the coroutine # never actually yields, which in turn allows us to # use "optional" coroutines in critical path code without # performance penalty for the synchronous case. try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) # result作为一个生成器将会执行到yield处,并返回一个Future对象。 if stack_context._state.contexts isnot orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) try: return future finally: # Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. future = None future.set_result(result) return future return wrapper
可以看到关键的这几句:
1 2 3
result = func(*args, **kwargs) yielded = next(result) Runner(result, future, yielded)
defhandle_yield(self, yielded): # Lists containing YieldPoints require stack contexts; # other lists are handled in convert_yielded. if _contains_yieldpoint(yielded): yielded = multi(yielded)
ifisinstance(yielded, YieldPoint): # YieldPoints are too closely coupled to the Runner to go # through the generic convert_yielded mechanism. self.future = TracebackFuture()
if self.stack_context_deactivate isNone: # Start a stack context if this is the first # YieldPoint we've seen. with stack_context.ExceptionStackContext( self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate
defconvert_yielded(yielded): """Convert a yielded object into a `.Future`. The default implementation accepts lists, dictionaries, and Futures. If the `~functools.singledispatch` library is available, this function may be extended to support additional types. For example:: @convert_yielded.register(asyncio.Future) def _(asyncio_future): return tornado.platform.asyncio.to_tornado_future(asyncio_future) .. versionadded:: 4.1 """ # Lists and dicts containing YieldPoints were handled earlier. ifisinstance(yielded, (list, dict)): return multi(yielded) elif is_future(yielded): return yielded elif isawaitable(yielded): return _wrap_awaitable(yielded) else: raise BadYieldError("yielded unknown object %r" % (yielded,))
defmulti(children, quiet_exceptions=()): """Runs multiple asynchronous operations in parallel. ``children`` may either be a list or a dict whose values are yieldable objects. ``multi()`` returns a new yieldable object that resolves to a parallel structure containing their results. If ``children`` is a list, the result is a list of results in the same order; if it is a dict, the result is a dict with the same keys. That is, ``results = yield multi(list_of_futures)`` is equivalent to:: results = [] for future in list_of_futures: results.append(yield future) If any children raise exceptions, ``multi()`` will raise the first one. All others will be logged, unless they are of types contained in the ``quiet_exceptions`` argument. If any of the inputs are `YieldPoints <YieldPoint>`, the returned yieldable object is a `YieldPoint`. Otherwise, returns a `.Future`. This means that the result of `multi` can be used in a native coroutine if and only if all of its children can be. In a ``yield``-based coroutine, it is not normally necessary to call this function directly, since the coroutine runner will do it automatically when a list or dict is yielded. However, it is necessary in ``await``-based coroutines, or to pass the ``quiet_exceptions`` argument. This function is available under the names ``multi()`` and ``Multi()`` for historical reasons. .. versionchanged:: 4.2 If multiple yieldables fail, any exceptions after the first (which is raised) will be logged. Added the ``quiet_exceptions`` argument to suppress this logging for selected exception types. .. versionchanged:: 4.3 Replaced the class ``Multi`` and the function ``multi_future`` with a unified function ``multi``. Added support for yieldables other than `YieldPoint` and `.Future`. """ if _contains_yieldpoint(children): return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions) else: return multi_future(children, quiet_exceptions=quiet_exceptions)
defmulti_future(children, quiet_exceptions=()): """Wait for multiple asynchronous futures in parallel. This function is similar to `multi`, but does not support `YieldPoints <YieldPoint>`. .. versionadded:: 4.0 .. versionchanged:: 4.2 If multiple ``Futures`` fail, any exceptions after the first (which is raised) will be logged. Added the ``quiet_exceptions`` argument to suppress this logging for selected exception types. .. deprecated:: 4.3 Use `multi` instead. """ ifisinstance(children, dict): keys = list(children.keys()) children = children.values() else: keys = None children = list(map(convert_yielded, children)) assertall(is_future(i) for i in children) unfinished_children = set(children)
future = Future() ifnot children: future.set_result({} if keys isnotNoneelse [])
defcallback(f): unfinished_children.remove(f) ifnot unfinished_children: result_list = [] for f in children: try: result_list.append(f.result()) except Exception as e: if future.done(): ifnotisinstance(e, quiet_exceptions): app_log.error("Multiple exceptions in yield list", exc_info=True) else: future.set_exc_info(sys.exc_info()) ifnot future.done(): if keys isnotNone: future.set_result(dict(zip(keys, result_list))) else: future.set_result(result_list)
listening = set() for f in children: if f notin listening: listening.add(f) f.add_done_callback(callback) return future