久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Python “黑魔法” 之 Generator Coroutines

 dinghj 2016-06-07
原文出處: hsfzxjy   

寫在前面

學(xué)過 Python 的都知道,,Python 里有一個很厲害的概念叫做 生成器(Generators)。一個生成器就像是一個微小的線程,,可以隨處暫停,,也可以隨時恢復(fù)執(zhí)行,還可以和代碼塊外部進行數(shù)據(jù)交換,。恰當(dāng)使用生成器,,可以極大地簡化代碼邏輯。

也許,,你可以熟練地使用生成器完成一些看似不可能的任務(wù),,如“無窮斐波那契數(shù)列”,并引以為豪,,認(rèn)為所謂的生成器也不過如此——那我可要告訴你:這些都太小兒科了,下面我所要介紹的絕對會讓你大開眼界,。

生成器 可以實現(xiàn) 協(xié)程,,你相信嗎?

什么是協(xié)程

在異步編程盛行的今天,,也許你已經(jīng)對 協(xié)程(coroutines) 早有耳聞,,但卻不一定了解它。我們先來看看 Wikipedia 的定義:

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.

也就是說:協(xié)程是一種 允許在特定位置暫?;蚧謴?fù)的子程序——這一點和 生成器 相似,。但和 生成器 不同的是,協(xié)程 可以控制子程序暫停之后代碼的走向,,而 生成器 僅能被動地將控制權(quán)交還給調(diào)用者,。

協(xié)程 是一種很實用的技術(shù)。和 多進程 與 多線程 相比,協(xié)程 可以只利用一個線程更加輕便地實現(xiàn) 多任務(wù),,將任務(wù)切換的開銷降至最低,。和 回調(diào) 等其他異步技術(shù)相比,,協(xié)程 維持了正常的代碼流程,,在保證代碼可讀性的同時最大化地利用了 阻塞 IO 的空閑時間,。它的高效與簡潔贏得了開發(fā)者們的擁戴,。

Python 中的協(xié)程

早先 Python 是沒有原生協(xié)程支持的,因此在 協(xié)程 這個領(lǐng)域出現(xiàn)了百家爭鳴的現(xiàn)象,。主流的實現(xiàn)由以下兩種:

  • 用 C 實現(xiàn)協(xié)程調(diào)度,。這一派以 gevent 為代表,在底層實現(xiàn)了協(xié)程調(diào)度,,并將大部分的 阻塞 IO 重寫為異步,。
  • 用 生成器模擬。這一派以 Tornado 為代表,。Tornado 是一個老牌的異步 Web 框架,,涵蓋了五花八門的異步編程方式,,其中包括 協(xié)程。本文部分代碼借鑒于 Tornado,。

直至 Python 3.4,,Python 第一次將異步編程納入標(biāo)準(zhǔn)庫中(參見 PEP 3156),其中包括了用生成器模擬的 協(xié)程,。而在 Python 3.5 中,,Guido 總算在語法層面上實現(xiàn)了 協(xié)程(參見 PEP 0492)。比起 yield 關(guān)鍵字,新關(guān)鍵字 asyncawait 具有更好的可讀性,。在不久的將來,,新的實現(xiàn)將會慢慢統(tǒng)一混亂已久的協(xié)程領(lǐng)域。

盡管 生成器協(xié)程 已成為了過去時,,但它曾經(jīng)的輝煌卻不可磨滅,。下面,讓我們一起來探索其中的魔法,。

一個簡單的例子

假設(shè)有兩個子程序 mainprinter,。printer 是一個死循環(huán),等待輸入,、加工并輸出結(jié)果,。main 作為主程序,不時地向 printer 發(fā)送數(shù)據(jù),。

這應(yīng)該怎么實現(xiàn)呢,?

傳統(tǒng)方式中,這幾乎不可能在一個線程中實現(xiàn),,因為死循環(huán)會阻塞,。而協(xié)程卻能很好地解決這個問題:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def printer():
    counter = 0
    while True:
        string = (yield)
        print('[{0}] {1}'.format(counter, string))
        counter += 1
if __name__ == '__main__':
    p = printer()
    next(p)
    p.send('Hi')
    p.send('My name is hsfzxjy.')
    p.send('Bye!')

輸出:

1
2
3
[0] Hi
[1] My name is hsfzxjy.
[2] Bye!

這其實就是最簡單的協(xié)程。程序由兩個分支組成,。主程序通過 send 喚起子程序并傳入數(shù)據(jù),,子程序處理完后,用 yield 將自己掛起,,并返回主程序,,如此交替進行。

協(xié)程調(diào)度

有時,,你的手頭上會有多個任務(wù),,每個任務(wù)耗時很長,而你又不想同步處理,,而是希望能像多線程一樣交替執(zhí)行,。這時,你就需要一個調(diào)度器來協(xié)調(diào)流程了,。

作為例子,,我們假設(shè)有這么一個任務(wù):

1
2
3
4
def task(name, times):
    for i in range(times):
        print(name, i)

如果你直接執(zhí)行 task,那它會在遍歷 times 次之后才會返回,。為了實現(xiàn)我們的目的,,我們需要將 task 人為地切割成若干塊,以便并行處理:

1
2
3
4
5
def task(name, times):
    for i in range(times):
        yield
        print(name, i)

這里的 yield 沒有邏輯意義,,僅是作為暫停的標(biāo)志點,。程序流可以在此暫停,也可以在此恢復(fù)。而通過實現(xiàn)一個調(diào)度器,,我們可以完成多個任務(wù)的并行處理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from collections import deque
class Runner(object):
    def __init__(self, tasks):
        self.tasks = deque(tasks)
    def next(self):
        return self.tasks.pop()
    def run(self):
        while len(self.tasks):
            task = self.next()
            try:
                next(task)
            except StopIteration:
                pass
            else:
                self.tasks.appendleft(task)

這里我們用一個隊列(deque)儲存任務(wù)列表,。其中的 run 是一個重要的方法: 它通過輪轉(zhuǎn)隊列依次喚起任務(wù),并將已經(jīng)完成的任務(wù)清出隊列,,簡潔地模擬了任務(wù)調(diào)度的過程,。

而現(xiàn)在,我們只需調(diào)用:

1
2
3
4
5
Runner([
    task('hsfzxjy', 5),
    task('Jack', 4),
    task('Bob', 6)
]).run()

就可以得到預(yù)想中的效果了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Bob 0
Jack 0
hsfzxjy 0
Bob 1
Jack 1
hsfzxjy 1
Bob 2
Jack 2
hsfzxjy 2
Bob 3
Jack 3
hsfzxjy 3
Bob 4
hsfzxjy 4
Bob 5

簡直完美,!答案和丑陋的多線程別無二樣,,代碼卻簡單了不止一個數(shù)量級。

異步 IO 模擬

你絕對有過這樣的煩惱:程序常常被時滯嚴(yán)重的 IO 操作(數(shù)據(jù)庫查詢,、大文件讀取,、越過長城拿數(shù)據(jù))阻塞,在等待 IO 返回期間,,線程就像死了一樣,,空耗著時間。為此,,你不得不用多線程甚至是多進程來解決問題,。

而事實上,在等待 IO 的時候,,你完全可以做一些與數(shù)據(jù)無關(guān)的操作,,最大化地利用時間。Node.js 在這點做得不錯——它將一切異步化,,壓榨性能。只可惜它的異步是基于事件回調(diào)機制的,,稍有不慎,,你就有可能陷入 Callback Hell 的深淵。

而協(xié)程并不使用回調(diào),,相比之下可讀性會好很多,。其思路大致如下:

  • 維護一個消息隊列,用于儲存 IO 記錄,。
  • 協(xié)程函數(shù) IO 時,,自身掛起,同時向消息隊列插入一個記錄,。
  • 通過輪詢或是 epoll 等事件框架,,捕獲 IO 返回的事件。
  • 從消息隊列中取出記錄,,恢復(fù)協(xié)程函數(shù),。

現(xiàn)在假設(shè)有這么一個耗時任務(wù):

1
2
3
4
5
6
def task(name):
    print(name, 1)
    sleep(1)
    print(name, 2)
    sleep(2)
    print(name, 3)

正常情況下,這個任務(wù)執(zhí)行完需要 3 秒,倘若多個同步任務(wù)同步執(zhí)行,,執(zhí)行時間會成倍增長,。而如果利用協(xié)程,我們就可以在接近 3 秒的時間內(nèi)完成多個任務(wù),。

首先我們要實現(xiàn)消息隊列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
events_list = []
class Event(object):
    def __init__(self, *args, **kwargs):
        self.callback = lambda: None
        events_list.append(self)
    def set_callback(self, callback):
        self.callback = callback
    def is_ready(self):
        result = self._is_ready()
        if result:
            self.callback()
        return result

Event 是消息的基類,,其在初始化時會將自己放入消息隊列 events_list 中。Event 和 調(diào)度器 使用回調(diào)進行交互,。

接著我們要 hack 掉 sleep 函數(shù),,這是因為原生的 time.sleep() 會阻塞線程。通過自定義 sleep 我們可以模擬異步延時操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# sleep.py
from event import Event
from time import time
class SleepEvent(Event):
    def __init__(self, timeout):
        super(SleepEvent, self).__init__(timeout)
        self.timeout = timeout
        self.start_time = time()
    def _is_ready(self):
        return time() - self.start_time >= self.timeout
def sleep(timeout):
    return SleepEvent(timeout)

可以看出:sleep 在調(diào)用后就會立即返回,,同時一個 SleepEvent 對象會被放入消息隊列,,經(jīng)過timeout 秒后執(zhí)行回調(diào)。

再接下來便是協(xié)程調(diào)度了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# runner.py
from event import events_list
def run(tasks):
    for task in tasks:
        _next(task)
    while len(events_list):
        for event in events_list:
            if event.is_ready():
                events_list.remove(event)
                break
def _next(task):
    try:
        event = next(task)
        event.set_callback(lambda: _next(task)) # 1
    except StopIteration:
        pass

run 啟動了所有的子程序,,并開始消息循環(huán),。每遇到一處掛起,調(diào)度器自動設(shè)置回調(diào),,并在回調(diào)中重新恢復(fù)代碼流,。“1” 處巧妙地利用閉包保存狀態(tài),。

最后是主代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
from sleep import sleep
import runner
def task(name):
    print(name, 1)
    yield sleep(1)
    print(name, 2)
    yield sleep(2)
    print(name, 3)
if __name__ == '__main__':
    runner.run((task('hsfzxjy'), task('Jack')))

輸出:

1
2
3
4
5
6
7
hsfzxjy 1
Jack 1
hsfzxjy 2
Jack 2
hsfzxjy 3
Jack 3
# [Finished in 3.0s]

協(xié)程函數(shù)的層級調(diào)用

上面的代碼有一個不足之處,,即協(xié)程函數(shù)返回的是一個 Event 對象。然而事實上只有直接操縱 IO 的協(xié)程函數(shù)才有可能接觸到這個對象,。那么,,對于調(diào)用了 IO 的函數(shù)的調(diào)用者,它們應(yīng)該如何實現(xiàn)呢,?

設(shè)想如下任務(wù):

1
2
3
4
5
6
7
8
9
def long_add(x, y, duration=1):
    yield sleep(duration)
    return x + y
def task(duration):
    print('start:', time())
    print((yield long_add(1, 2, duration)))
    print((yield long_add(3, 4, duration)))

long_add 是 IO 的一級調(diào)用者,,task 調(diào)用 long_add,并利用其返回值進行后續(xù)操作,。

簡而言之,,我們遇到的問題是:一個被喚起的協(xié)程函數(shù)如何喚起它的調(diào)用者?

正如在上個例子中,,協(xié)程函數(shù)通過 Event 的回調(diào)與調(diào)度器交互,。同理,我們也可以使用一個類似的對象,,在這里我們稱其為 Future,。

Future 保存在被調(diào)用者的閉包中,并由被調(diào)用者返回,。而調(diào)用者通過在其上面設(shè)置回調(diào)函數(shù),,實現(xiàn)兩個協(xié)程函數(shù)之間的交互,。

Future 的代碼如下,看起來有點像 Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# future.py
class Future(object):
    def __init__(self):
        super(Future, self).__init__()
        self.callback = lambda *args: None
        self._done = False
    def set_callback(self, callback):
        self.callback = callback
    def done(self, value=None):
        self._done = True
        self.callback(value)

Future 的回調(diào)函數(shù)允許接受一個參數(shù)作為返回值,,以盡可能地模擬一般函數(shù),。

但這樣一來,協(xié)程函數(shù)就會有些復(fù)雜了,。它們不僅要負(fù)責(zé)喚醒被調(diào)用者,,還要負(fù)責(zé)與調(diào)用者之間的交互。這會產(chǎn)生許多重復(fù)代碼,。為了 D.R.Y,,我們用裝飾器封裝這一邏輯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# co.py
from functools import wraps
from future import Future
def _next(gen, future, value=None):
    try:
        try:
            yielded_future = gen.send(value)
        except TypeError:
            yielded_future = next(gen)
        yielded_future.set_callback(lambda value: _next(gen, future, value))
    except StopIteration as e:
        future.done(e.value)
def coroutine(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()
        gen = func(*args, **kwargs)
        _next(gen, future)
        return future
    return wrapper

coroutine 包裝過的生成器成為了一個普通函數(shù),返回一個 Future 對象,。_next 為喚醒的核心邏輯,,通過一個類似遞歸的回調(diào)設(shè)置簡潔地實現(xiàn)自我喚醒。當(dāng)自己執(zhí)行完時,,會將自己閉包內(nèi)的Future對象標(biāo)記為done,,從而喚醒調(diào)用者。

為了適應(yīng)新變化,,sleep 也要做相應(yīng)的更改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from event import Event
from future import Future
from time import time
class SleepEvent(Event):
    def __init__(self, timeout):
        super(SleepEvent, self).__init__()
        self.start_time = time()
        self.timeout = timeout
    def _is_ready(self):
        return time() - self.start_time >= self.timeout
def sleep(timeout):
    future = Future()
    event = SleepEvent(timeout)
    event.set_callback(lambda: future.done())
    return future

sleep 不再返回 Event 對象,,而是一致地返回 Future,并作為 EventFuture 之間的代理者,。

基于以上更改,,調(diào)度器可以更加簡潔——這是因為協(xié)程函數(shù)能夠自我喚醒:

1
2
3
4
5
6
7
8
9
10
# runner.py
from event import events_list
def run():
    while len(events_list):
        for event in events_list:
            if event.is_ready():
                events_list.remove(event)
                break

主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from co import coroutine
from sleep import sleep
import runner
from time import time
@coroutine
def long_add(x, y, duration=1):
    yield sleep(duration)
    return x + y
@coroutine
def task(duration):
    print('start:', time())
    print((yield long_add(1, 2, duration)), time())
    print((yield long_add(3, 4, duration)), time())
task(2)
task(1)
runner.run()

由于我們使用了一個糟糕的事件輪詢機制,密集的計算會阻塞通往 stdout 的輸出,,因而看起來所有的結(jié)果都是一起打印出來的,。為此,我在打印時特地加上了時間戳,,以演示協(xié)程的效果,。輸出如下:

1
2
3
4
5
6
start: 1459609512.263156
start: 1459609512.263212
3 1459609513.2632613
3 1459609514.2632234
7 1459609514.263319
7 1459609516.2633028

這事實上是 tornado.gen.coroutine 的簡化版本,為了敘述方便我略去了許多細節(jié),,如異常處理以及調(diào)度優(yōu)化,目的是讓大家能較清晰地了解 生成器協(xié)程 背后的機制,。因此,,這段代碼并不能用于實際生產(chǎn)中

小結(jié)

  • 這,,才叫精通生成器,。
  • 學(xué)習(xí)編程,不僅要知其然,,亦要知其所以然,。
  • Python 是有魔法的,,只有想不到,沒有做不到,。

References

加入伯樂在線專欄作者,。擴大知名度,還能得贊賞,!詳見《招募專欄作者
1 贊 6 收藏 8 評論

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點,。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多