Fluent python -- 16 co processes

16, Synergetic process

How generators evolve into coprocessors

The basic behavior of a generator used as a coroutine

Collaborative process demonstration

In [51]: def s_c():
    ...:     print('c start')
    ...:     x = yield
    ...:     print('c received:',x)
    ...:

In [52]: c = s_c()

In [53]: c
Out[53]: <generator object s_c at 0x00000221EF5DB5C8>

In [54]: c.send(111)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-54-ffa2730868ca> in <module>
----> 1 c.send(111)

TypeError: can't send non-None value to a just-started generator

In [55]: next(c)  # First, call next, because the generator has not started and is not paused at the yield, so it cannot send data.
c start

In [56]: next(c)  # The yield expression is the value sent. The coroutine will resume and run until the next yield expression or terminate.
c received: None
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-56-e846efec376d> in <module>
----> 1 next(c)

StopIteration:

In [57]: c.send(111)  # # The yield expression is the value sent.
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-57-ffa2730868ca> in <module>
----> 1 c.send(111)

StopIteration:

A coroutine can be in one of four states. The current state can use inspect getgeneratorstate(...) Function determines that the function will return one of the following strings.

'GEN_CREATED'

Wait for execution to begin.

'GEN_RUNNING'

The interpreter is executing.

This state can only be seen in multithreaded applications. In addition, it's OK for the generator object to call the getgeneratorstate function on itself, but it's useless.

'GEN_SUSPENDED'

Pause at yield expression.

'GEN_CLOSED'

End of execution.

In [59]: import inspect

In [60]: inspect.getgeneratorstate(c)
Out[60]: 'GEN_CLOSED'

Because the parameter of the send method will become the value of the suspended yield expression, the send method can be called only when the coprocessor is suspended, such as my_coro.send(42). However, if the collaboration has not been activated (i.e., the status is' GEN_CREATED '), the situation is different. Therefore, always call next(my_coro) to activate the coroutine -- you can also call my_coro.send(None), the effect is the same.

The first step to call the next(my_coro) function is often called a "prime" coroutine (that is, let the coroutine execute forward to the first yield expression, ready to be used as an active coroutine).

In [62]: def cc(a):
    ...:     print('c start')
    ...:     b = yield a
    ...:     print('rsv:',b)
    ...:     c = yield a + b
    ...:     print('rsv:' ,c)
    ...:

In [63]: c2 = cc(1)

In [64]: inspect.getgeneratorstate(c2)
Out[64]: 'GEN_CREATED'

In [65]: c2.send(None)
c start
Out[65]: 1

In [66]: inspect.getgeneratorstate(c2)
Out[66]: 'GEN_SUSPENDED'

In [67]: c2.send(2)
rsv: 2
Out[67]: 3

In [68]: c2.send(3)
rsv: 3
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-68-e3c5dc9e41ab> in <module>
----> 1 c2.send(3)

StopIteration:

In [69]: inspect.getgeneratorstate(c2)
Out[69]: 'GEN_CLOSED'

In [70]: c3 = cc(2)

In [71]: next(c3)
c start
Out[71]: 2

In [72]: dd = c3.send(3)
rsv: 3

In [73]: dd
Out[73]: 5

The key point is that the execution of the coroutine is suspended at the position where the yield keyword is located. In the assignment statement, the code to the right of = is executed before assignment. Therefore, the line b = yield a will not set the value of b until the client code activates the collaboration again.

The implementation process of cc collaboration is divided into three stages:

1 call next, print the first message, execute yield a, and output the number a

2 call send(11), assign 11 to b, print the second message, and then execute yield a + b to output the number a + 11

3 call send(12), assign 12 to c, print the third message, and the collaboration terminates.

Note that each stage ends in the yield expression, and the next stage starts with that line of code, and then assigns the value of the yield expression to the variable

Calculate the moving average using the co process

In [80]: def avg():
    ...:     total = 0
    ...:     c = 0
    ...:     avg = None
    ...:     while True:
    ...:         term = yield avg
    ...:         total+= term
    ...:         c += 1
    ...:         avg =total / c
    ...:

# This infinite loop shows that as long as the caller keeps sending the value to the coroutine, it will always receive the value and generate the result. Only if the caller calls on a coroutine The close() method, or the collaboration will terminate only when it is recycled by the garbage collector without a reference to it.
# The advantage of using a coroutine is that total and count can be declared as local variables without using instance properties or closures to maintain context between multiple calls.

In [81]: a = avg()

In [82]: a.send(None)

In [83]: a.send(1)
Out[83]: 1.0

In [84]: a.send(2)
Out[84]: 1.5

Decorator of pre excitation process

The function call generates a generator - "the generator returned after the call is completed can always generate values" - the decorator returns the generator after pre excitation.

In [87]: def pre_next(gen):
    ...:     def inner(*args,**kwargs):  # Returns the generator after pre excitation
    ...:         g = gen(*args,**kwargs)
    ...:         next(g)
    ...:         return g
    ...:     return inner
    ...:

In [88]: @pre_next
    ...: def avg():
    ...:     total = 0
    ...:     c = 0
    ...:     avg = None
    ...:     while True:
    ...:         term = yield avg
    ...:         total+= term
    ...:         c += 1
    ...:         avg =total / c
    ...:

In [89]: a = avg()

In [90]: a.send(1)
Out[90]: 1.0

In [91]: b = avg()

In [92]: inspect.getgeneratorstate(b)
Out[92]: 'GEN_SUSPENDED'

When calling a coroutine using the yield from syntax (see section 16.7), it will be automatically pre stimulated, so it is incompatible with decorators such as @ coroutine in example 16-5.

Termination and exception handling

Unhandled exceptions in the coroutine will bubble upward and be passed to the caller of the next function or send method (that is, the object that triggers the coroutine).

Unhandled exceptions will cause the collaboration to terminate
In [93]: b.send(1)
Out[93]: 1.0

In [94]: b.send('a')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-94-5d63b0dfa469> in <module>
----> 1 b.send('a')

<ipython-input-88-028ea1232b5b> in avg()
      6     while True:
      7         term = yield avg
----> 8         total+= term
      9         c += 1
     10         avg =total / c

TypeError: unsupported operand type(s) for +=: 'int' and 'str'

In [95]: b.send(2)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-95-19972eea9127> in <module>
----> 1 b.send(2)

StopIteration:

Example 16-7 suggests a way to terminate a collaboration: Send a sentinel value to let the collaboration exit. Built in constants such as None and ellipses are often used as sentinel values. The advantage of ellisis is that this value is not often present in the data stream. I've also seen people use the stopiteration class (the class itself, not the instance, and not thrown) as the sentinel value; In other words, it is used like this: my_coro.send(StopIteration).

Starting with Python 2.5, the client code can call two methods on the generator object to explicitly send exceptions to the coroutine.

The two methods are throw and close.

generator.throw(exc_type[, exc_value[, traceback]])

Causes the generator to throw the specified exception at the paused yield expression. If the generator handles the thrown exception, the code will execute forward to the next yield expression, and the output value will become the call generator The return value obtained by the throw method. If the generator does not handle the thrown exception, the exception will bubble up and pass to the caller's context.

generator.close()

Causes the generator to throw a GeneratorExit exception at the paused yield expression. If the generator does not handle this exception, or throws a StopIteration exception (usually running to the end), the caller will not report an error. If a GeneratorExit exception is received, the generator must not output a value, otherwise the interpreter will throw a RuntimeError exception. Other exceptions thrown by the generator will bubble up and be passed to the caller.

In [1]: class DE(Exception):
   ...:     """Custom exception"""
   ...:

# The last line of code will not be executed, because only the unhandled exception will abort the infinite loop, and once the unhandled exception occurs, the coroutine will terminate immediately.
In [2]: def demo_exc():
   ...:     print('c start')
   ...:     while True:
   ...:         try:
   ...:             x = yield
   ...:         except DE:
   ...:             print('DE raise')
   ...:         else:
   ...:             print('c received:{}'.format(x))
   ...:     raise RuntimeError('never run')
   ...:

In [3]: e = demo_exc()

In [4]: next(e)
c start

In [5]: next(e)
c received:None

In [6]: e.send(111)
c received:111

In [7]: e.close()  # Close generator

In [8]: from inspect import getgeneratorstate

In [9]: getgeneratorstate(e)
Out[9]: 'GEN_CLOSED'

In [10]: e1 = demo_exc()

In [11]: getgeneratorstate(e1)
Out[11]: 'GEN_CREATED'

In [12]: next(e1)
c start

# If the DemoException exception is passed into demo_exc_handling coroutine, it will process and then continue to run
In [13]: e1.throw(DE)
DE raise

In [14]: getgeneratorstate(e1)
Out[14]: 'GEN_SUSPENDED'

In [15]: e1.send(22)
c received:22

# However, if the exception passed into the collaboration is not handled, the collaboration will stop, that is, the status will change to 'Gen'_ CLOSED'. 
In [16]: e1.throw(ZeroDivisionError)
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-16-175ec079c766> in <module>
----> 1 e1.throw(ZeroDivisionError)

<ipython-input-2-981b1ab8dc67> in demo_exc()
      3     while True:
      4         try:
----> 5             x = yield
      6         except DE:
      7             print('DE raise')

ZeroDivisionError:

In [17]: getgeneratorstate(e1)
Out[17]: 'GEN_CLOSED'
# If you want to clean up no matter how the process ends, put the relevant code in the process definition body into the try/finally block
In [20]: def demo_exc():
    ...:     try:
    ...:         print('c start')
    ...:         while True:
    ...:             try:
    ...:                 x = yield
    ...:             except DE:
    ...:                 print('DE raise')
    ...:             else:
    ...:                 print('c received:{}'.format(x))
    ...:     finally:
    ...:         print('c end')

Let the coroutine return value

In [21]: from collections import namedtuple

In [22]: Res = namedtuple('Res','c avg')

In [24]: def averager():
    ...:     t = 0
    ...:     c = 0
    ...:     while True:
    ...:         term = yield
    ...:         if term is None:
    ...:             break
    ...:         t += term
    ...:         c += 1
    ...:         avg = t/c
    ...:     return Res(c,avg)

In [25]: a = averager()

In [26]: next(a)

In [27]: a.send(1)

In [28]: a.send(2)

# As always, the generator object throws a StopIteration exception. The value attribute of the exception object holds the returned value.
In [29]: next(a)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-29-15841f3f11d4> in <module>
----> 1 next(a)

StopIteration: Res(c=2, avg=1.5)



In [54]: a = averager()

In [55]: a.send(None)

In [56]: a.send(1)

In [57]: a.send(2)

# Catch StopIteration exception and get the value returned by averager
In [59]: try:
    ...:     a.send(None)
    ...: except StopIteration as exc:
    ...:     res = exc.value
    ...:

In [60]: res
Out[60]: Res(c=2, avg=1.5)

Although it takes a circle to obtain the return value of the co process, this is the way defined by PEP 380. When we realize this, it makes sense: the yield from structure will automatically catch the StopIteration exception internally. This is the same way that the for loop handles StopIteration exceptions: the loop mechanism handles exceptions in a way that is easy for the user to understand. For the yield from structure, the interpreter will not only catch the StopIteration exception, but also turn the value of the value attribute into the value of the yield from expression. Unfortunately, we cannot test this behavior interactively in the console, because using yield from (and yield) outside the function will lead to syntax errors. four

4. The iPhone has an extension - IPython YF( https://github.com/tecki/ipython-yf ), after installing this extension, you can Execute yield from directly in the iPhone console. This extension is used to test asynchronous code and can be used in conjunction with the asyncio module. This extension has been submitted as a patch for Python 3.5, but it has not been accepted. See work order 22412 in Python defect tracking system: towards an asynchronous enabled command line( http://bugs.python.org/issue22412).

Use yield from

When using yield from subgen() in the generator gen, subgen will gain control and pass the output value to the caller of gen, that is, the caller can directly control subgen. At the same time, Gen will block and wait for subgen to terminate.

In [61]: def g():
    ...:     for c in 'AB':
    ...:         yield c
    ...:     for i in range(3):
    ...:         yield i
    ...:

In [65]: list(g())
Out[65]: ['A', 'B', 0, 1, 2]

# Can be rewritten as:

In [66]: def gen():
    ...:     yield from 'AB'
    ...:     yield from range(3)
    ...:

In [67]: list(gen())
Out[67]: ['A', 'B', 0, 1, 2]

# itertools module provides an optimized version of the chain function, which is written in C language. The following is a simple implementation of the same function
In [68]: def ch(*iterables):
    ...:     for i in iterables:
    ...:         yield from i
    ...:

In [69]: list(ch('ABC',range(3)))
Out[69]: ['A', 'B', 'C', 0, 1, 2]

What the yield from x expression does to x:

1 iter(x) so x can be any iteratable object

The essential function of the yield from structure is not only to replace the nested for loop of the output value

Syntax for Delegating to a Subgenerator

The main function of yield from is to open a two-way channel to connect the outermost caller with the innermost sub generator, so that they can directly send and output values, and can also directly pass in exceptions, without adding a large number of template code for handling exceptions in the middle process. With this structure, collaborative processes can delegate responsibilities in ways that were previously impossible.

Delegate generator

The generator function that contains the yield from < Iterable > expression.

Sub generator

The generator obtained from the < Iterable > section of the yield from expression.

Caller

Client code that calls the delegate generator.

The child generator may be a simple iterator that only implements__ next__ method; However, yield from can also handle this seed generator. However, the yield from structure is introduced to support implementation__ next__, Generators for the send, close, and throw methods.

    def averager():
        total = 0
        count = 0
        while True:
            x = yield
            if x is None:
                break
            total += x
            count += 1
            return Res(count, total/count)

    a = averager()
    next(a)
    b = a.send(111)  # b is None, and no return value is obtained
    """The generator ends with an error: StopIteration ,Return value: stored in the of the exception value in"""
    """
    Traceback (most recent call last):
    File ".\g.py", line 52, in <module>
        b = a.send(111)
    StopIteration: Res(count=1, average=111.0)
    """
    print(b)
from collections import namedtuple

Res = namedtuple('Res','count average')

def averager():
    total = 0
    count = 0
    while True:
        x = yield
        if x is None:
            break
        total += x
        count += 1
        return Res(count, total/count)

def report(res):
    print(res)  # {'girls;kg': None, 'girls;m': None, 'boys;kg': None, 'boys;m': None}
    for k,v in sorted(res.items()):
        group, unit = k.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
            v.count, group , v.average, unit
        ))

Correct averager:

from collections import namedtuple

Res = namedtuple('Res','count average')

# Sub generator
def averager():
    total = 0
    count = 0
    while True:
        x = yield
        if x is None:
            break
        total += x
        count += 1
    # The returned Result becomes the value of the yield from expression in the grouper function.
    # yield from will handle the StopIterator exception and return the end value
    return Res(count, total/count)  

# Delegate generator
def grouper(res, key):
    # Each iteration of this loop will create an averager instance; Each instance is a generator object used as a coroutine.
    while True:
        res[key] = yield from averager()

# Client code, i.e. caller
def main(data):
    res = {}
    for k,v in data.items():
        group = grouper(res, k)
        next(group)
        for i in v:
            # Pass each value to the grouper. The value passed in finally reaches the line term = yield in the averager function; Grouper never knows what the value passed in is.
            group.send(i)
        # Pass None into the grouper, which will cause the current averager instance to terminate, and let the grouper continue to run. Then create an averager instance to process the next set of values.
        group.send(None)
    report(res)

# Output report
def report(res):
    print(res)
    for k,v in sorted(res.items()):
        group, unit = k.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
            v.count, group , v.average, unit
        ))

data = {
 'girls;kg':
 [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
 'girls;m':
 [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
 'boys;kg':
 [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
 'boys;m':
 [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
    main(data)



"""
(el_app) PS C:\Users\WangLin\Desktop\version8> python .\g.py
{'girls;kg': Res(count=10, average=42.040000000000006), 'girls;m': Res(count=10, average=1.4279999999999997), 'boys;kg': Res(count=9, average=40.422222222222224), 'boys;m': Res(count=9, average=1.3888888888888888)}
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
"""
In [76]: def a():
    ...:     yield 1
    ...:     return 1
    ...:

In [80]: aa = a()

In [81]: getgeneratorstate(aa)
Out[81]: 'GEN_CREATED'

In [82]: next(aa)
Out[82]: 1

In [83]: getgeneratorstate(aa)
Out[83]: 'GEN_SUSPENDED'

In [84]: b = aa.send(222)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-84-b5a7980ee106> in <module>
----> 1 b = aa.send(222)

StopIteration: 1

In [85]: getgeneratorstate(aa)
Out[85]: 'GEN_CLOSED'

In [86]: b

In [87]: c = aa.send(33)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-87-d2f9f10a5b00> in <module>
----> 1 c = aa.send(33)

StopIteration:

In [88]: c
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-88-2b66fd261ee5> in <module>
----> 1 c

NameError: name 'c' is not defined

In [104]: c = b()

In [105]: c.send(111)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-105-ffa2730868ca> in <module>
----> 1 c.send(111)

TypeError: can't send non-None value to a just-started generator

In [97]: def b():
    ...:     while 1:
    ...:         yield from a()
    ...:

In [98]: c = b()

In [99]: next(c)
Out[99]: 1

In [100]: c.send(1)
Out[100]: 1

In [101]: c.send(1)
Out[101]: 1

In [102]: c.send(1)
Out[102]: 1

In [103]: c.send(1)
Out[103]: 1

why???

In [127]: def a():
     ...:     print('yield do')
     ...:     yield 1  # Composition of values of iteratable objects
     ...:     print('return do')
     ...:     return 2  # Return function execution result

# At this point, a is just a function, because it obviously wants to execute return 
# yield from: first change a() into an iteratable object, that is [1]
# Therefore, the delegation generator keeps executing a in a loop, that is, it takes out the 1 in [1], which is the 1 of yield
In [128]: list(a())
yield do
return do
Out[128]: [1]

In [129]: b = list(a())
yield do
return do

In [130]: b
Out[130]: [1]
In [106]: def a():
     ...:     print('yield do')
     ...:     yield 1
     ...:     print('return do')
     ...:     return 2

In [118]: d = a()

In [119]: next(a)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-119-15841f3f11d4> in <module>
----> 1 next(a)

TypeError: 'function' object is not an iterator

In [120]: def a():
     ...:     print('yield do')
     ...:     yield 1
     ...:     print('return do')
     ...:     #return 2

In [121]: d = a()

In [122]: next(d)
yield do
Out[122]: 1

In [123]: d.send(2)
return do
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-123-aca2617ea1f4> in <module>
----> 1 d.send(2)

StopIteration:

In [124]: def a():
     ...:     print('yield do')
     ...:     x = yield 1
     ...:     print('return do')
     ...:     if x is None:
     ...:         return 2

In [125]: d = a()

In [126]: next(d)
yield do
Out[126]: 1

Each iteration of the outer for loop will create a new grouper instance and assign it to the group variable; Group is the delegation generator. Call next(group) to pre activate the delegation generator grouper. At this time, enter the while True loop. After calling the sub generator averager, pause at the yield from expression.

The inner for loop calls group Send (value), which directly passes the value to the sub generator averager. At the same time, the current grouper instance (Group) pauses at the yield from expression.

After the inner loop ends, the group instance is still suspended at the yield from expression. Therefore, the statement assigned to results[key] in the definition body of the grouper function has not been executed. If there is no group at the end of the outer for loop Send (none), then the averager sub generator will never terminate, and the delegation generator group will never be activated again, so it will never assign a value to results[key].

When the outer for loop iterates again, a new grouper instance will be created and bound to the group variable. The previous grouper instance (and the unexpired averager sub generator instance it created) was recycled by the garbage collector.

The key point of this experiment is that if the sub generator does not terminate, the delegate generator will pause forever at the yield from expression. If so, the program does not move forward because yield from (like yield) transfers control to the client code (that is, the caller of the delegate generator). Obviously, there must be tasks that cannot be completed.

Because the delegate generator is equivalent to a pipeline, any number of delegate generators can be connected together: a delegate generator uses yield from to call a sub generator, and that sub generator itself is also a delegate generator, uses yield from to call another sub generator, and so on. Finally, the chain ends with a simple generator that uses only the yield expression; However, it can also end with any iteratable object

Any yield from chain must be customer driven, calling next(...) on the outermost delegate generator Function or send(...) method. You can call it implicitly, such as using a for loop.

Meaning of yield from

"Using an iterator as a generator is equivalent to inline the definition of a sub generator in a yield from expression. In addition, the sub generator can execute a return statement to return a value, and the returned value will become the value of the yield from expression."

PEP 380 has no such comforting words, because it does not cover all extreme situations. However, it can be roughly said at the beginning.

The approved PEP 380 is in the "Proposal" section( https://www.python.org/dev/peps/pep-0380/#proposal )Six points yield from behavior. Here, I quote it almost intact, but I have replaced the ambiguous word "iterator" with "sub generator", and made further explanation. Examples 16-17 illustrate the following four points.

1 the values produced by the sub generator are passed directly to the caller of the delegate generator (i.e. client code).

2. The values sent to the delegate generator using the send() method are passed directly to the child generator. If the value sent is None, the sub generator is called__ next__ () method. If the value sent is not None, the send() method of the sub generator is called. If the called method throws a StopIteration exception, the delegate generator resumes running. Any other exceptions bubble up and pass to the delegate generator.

3 when the generator exits, the return expr expression in the generator (or sub generator) will trigger a StopIteration(expr) exception.

4 the value of the yield from expression is the first parameter passed to the StopIteration exception when the sub generator terminates.

Two other features of the yield from structure relate to exceptions and terminations.

1. All exceptions passed into the delegate generator, except GeneratorExit, are passed to the throw() method of the child generator. If a StopIteration exception is thrown when the throw() method is called, the delegate generator resumes running. Exceptions other than StopIteration bubble up and pass them to the delegate generator.

2 if the GeneratorExit exception is passed into the delegate generator, or the close() method is called on the delegate generator, then the close() method is called on the sub generator, if it exists. If calling the close() method causes an exception to be thrown, the exception will bubble up and be passed to the delegate generator; Otherwise, the delegate generator throws a GeneratorExit exception.

RESULT = yield from EXPR
_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
     _r = _e.value
else:
     while 1:
        _s = yield _y
     try:
        _y = _i.send(_s)
     except StopIteration as _e:
        _r = _e.value
        break
RESULT = _r

'''
_i(Iterator)
  Sub generator
_y(Output (value)
  The value produced by the sub generator
_r(Results)
  The final result (i.e. after the sub generator runs) yield from Value of expression)
_s((value sent)
  The value sent by the caller to the delegate generator, which is forwarded to the child generator
_e((abnormal)
  Exception object (always in this simplified pseudo code) StopIteration (example)
'''
_i = iter(EXPR) ➊
try:
    _y = next(_i) ➋
except StopIteration as _e:
    _r = _e.value ➌
else:
    while 1: ➍
        try:
            _s = yield _y ➎
        except GeneratorExit as _e: ➏
            try:
                 _m = _i.close
             except AttributeError:
                 pass
             else:
                _m()
                 raise _e
         except BaseException as _e: ➐
             _x = sys.exc_info()
             try:
                 _m = _i.throw
             except AttributeError:
                 raise _e
             else: ➑
                 try:
                     _y = _m(*_x)
                 except StopIteration as _e:
                     _r = _e.value
                     break
         else: ➒
             try: ➓
                 if _s is None: ⓫
                     _y = next(_i)
                 else:
                     _y = _i.send(_s)
             except StopIteration as _e: ⓬
                 _r = _e.value
                 break
RESULT = _r ⓭

Use case: discrete event simulation using CO process

Discrete event simulation is a kind of simulation that models the system into a series of events.

In [1]: from collections import namedtuple

In [2]: E = namedtuple('E','time proc action')

In [3]: def taxi_process(i,n,start_time):  # Taxi number, number of passengers, departure time
   ...:     time = yield E(start_time, i, 'leave garage')  # Taxi departure
   ...:     for k in range(n):
   ...:         time = yield E(time,i,'pick up passenger')  # Passenger boarding
   ...:         time = yield E(time,i,'drop off passenger')  # Passengers get off
   ...:     yield E(time,i,'going home')
   ...:

In [4]: t = taxi_process('xmen',3,8)

In [5]: next(t)
Out[5]: E(time=8, proc='xmen', action='leave garage')

In [6]: _
Out[6]: E(time=8, proc='xmen', action='leave garage')

In [7]: t.send(_.time+10)
Out[7]: E(time=18, proc='xmen', action='pick up passenger')

In [8]: t.send(_.time+1)
Out[8]: E(time=19, proc='xmen', action='drop off passenger')

In [9]: t.send(_.time+10)
Out[9]: E(time=29, proc='xmen', action='pick up passenger')

In [10]: t.send(_.time+1)
Out[10]: E(time=30, proc='xmen', action='drop off passenger')

In [11]: t.send(_.time+10)
Out[11]: E(time=40, proc='xmen', action='pick up passenger')

In [12]: t.send(_.time+1)
Out[12]: E(time=41, proc='xmen', action='drop off passenger')

In [13]: t.send(_.time+10)
Out[13]: E(time=51, proc='xmen', action='going home')

In [14]: t.send(_.time+1)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-14-f9e5703fbef3> in <module>
----> 1 t.send(_.time+1)

StopIteration:

In this simulation system, the coordination of each taxi is controlled by the simulator The main loop driver in the run method. The simulation "clock" is saved in sim_ In the time variable, the simulation clock is updated each time an event is output.

Construct taxis dictionary

taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
             for i in range(num_taxis)}
sim = Simulator(taxis)

Structural results

taxis = {0: taxi_process(ident=0, trips=2, start_time=0),
         1: taxi_process(ident=1, trips=4, start_time=5),
         2: taxi_process(ident=2, trips=6, start_time=10)}
sim = Simulator(taxis)

main function

sim = Simulator(taxis)
sim.run(end_time)

Taxi simulation

class Simulator:
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)
    def run(self, end_time):
        # The first event to schedule each taxi
        for _, proc in sorted(self.procs.items()):
            first_event = next(proc)
            self.events.put(first_event)
        # The main loop of this simulation system
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():
                print('*** end of events ***')
                break
            current_event = self.events.get()
            sim_time, proc_id, previous_action = current_event
            print('taxi:', proc_id, proc_id * ' ', current_event)
            active_proc = self.procs[proc_id]
            next_time = sim_time + compute_duration(previous_action)
            try:
                next_event = active_proc.send(next_time)
            except StopIteration:
                del self.procs[proc_id]
            else:
                self.events.put(next_event)
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

The gist of this example is to show how to handle events in a main loop and how to drive a coroutine by sending data. This is the basic idea at the bottom of asyncio package.

Added by basdog22 on Wed, 12 Jan 2022 12:23:14 +0200