寫并發(fā)程序很難。程序員不得不處理線程,、鎖和競態(tài)條件等等,這個過程很容易出錯,,而且會導致程序代碼難以閱讀,、測試和維護。
所以,,很多人不傾向于使用多線程編程,。取而代之的是,他們使用單線程進程(譯者注:只含有一個線程的進程),,依賴外部服務(如數(shù)據(jù)庫,、隊列等)處理所需的并發(fā)或異步操作。雖然這種方法在有些情況下是可行的,,但還有很多其他情況不能奏效,。很多實時系統(tǒng)——例如交易或銀行業(yè)務應用,或?qū)崟r游戲——等待一個單線程進程完成就太奢侈了(他們需要立即應答?。F渌囊恍τ谟嬎慊蛸Y源要求非常高的系統(tǒng),如果在程序中不引入并行機制就會耗時很久(有些情況下可以達到幾個小時或數(shù)天),。
常用的一種單線程方法(例如,,在
Node.js里廣泛應用)是使用基于事件的、非阻塞模式(Event-Based, NON-Blocking Paradigm,,其中Paradigm也有譯作成例),。雖然這種方法可以避免上下文切換、鎖和阻塞,,的確能提高性能,,但還是不能解決并發(fā)使用多個處理器(需要啟動和協(xié)調(diào)多個獨立的處理器)的問題。
那么,,這是不是意味著為了構(gòu)建一個并發(fā)程序,,除了深入到線程、鎖和競態(tài)條件之外沒有別的選擇呢,?
感謝Akka框架,,它為我們提供了一種選擇。本教程介紹了Akka的示例,,并仔細研究它如何幫助并簡化分布式并發(fā)應用的實現(xiàn),。
Akka框架是什么
這篇文章介紹了Akka并仔細研究它如何幫助并簡化分布式并發(fā)應用的實現(xiàn)。
Akka是JVM(JAVA虛擬機,,下同)平臺上構(gòu)建高并發(fā),、分布式和容錯應用的工具包和運行時。Akka用
Scala語言寫成,,同時提供了Scala和JAVA的開發(fā)接口,。
Akka處理并發(fā)的方法基于
Actor(沒有慣用譯法,文中使用原詞)模型,。在基于Actor的系統(tǒng)里,,所有的事物都是Actor,就好像在面向?qū)ο笤O計里面所有的事物都是對象一樣,。但是有一個重要區(qū)別——特別是和我們的討論相關的——那就是Actor模型是作為一個并發(fā)模型設計和架構(gòu)的,,而面向?qū)ο竽J絼t不是。更具體一點,,在Scala的Actor系統(tǒng)里,,Actor互相交互并共享信息但并不對交互順序作出預設。Actor之間共享信息和發(fā)起任務的機制是消息傳遞,。
創(chuàng)建和調(diào)度線程,、接收和分發(fā)消息以及處理競態(tài)條件和同步的所有復雜性,都委托給框架,,框架的處理對應用來說是透明的,。
Akka在多個Actor和下面的系統(tǒng)之間建立了一個層次(Layer),,這樣一來,Actor只需要處理消息就可以了,。創(chuàng)建和調(diào)度線程,、接收和分發(fā)消息以及處理競態(tài)條件和同步的所有復雜性,都委托給框架,,框架的處理對應用來說是透明的,。
Actor嚴格遵守
響應式聲明。響應式應用的目標是通過滿足以下一個或多個條件來代替?zhèn)鹘y(tǒng)的多線程應用:
- 事件驅(qū)動,。使用Actor,,代碼可以異步處理請求并用獨占的方式執(zhí)行非阻塞操作。
- 可伸縮性,。在Akka里,,不修改代碼就增加節(jié)點是可能的,感謝消息傳遞和本地透明性(Location Transparency),。
- 高彈性,。任何應用都會碰到錯誤并在某個時間點失敗。Akka的“監(jiān)管”(容錯)策略為實現(xiàn)自愈系統(tǒng)提供了便利,。
- 響應式,。今天的高性能和快速響應應用需要對用戶快速反饋,因此對于事件的響應需要非常及時,。Akka的非阻塞,、基于消息的策略可以幫助達成這個目標。
Akka中的Actor是什么
Actor本質(zhì)上就是接收消息并采取行動處理消息的對象,。它從消息源中解耦出來,,只負責正確識別接收到的消息類型,并采取相應的行動,。
收到一條消息之后,,一個Actor可能會采取以下一個或多個行動:
- 執(zhí)行一些本身的操作(例如進行計算、持久化數(shù)據(jù),、調(diào)用外部的Web服務等)
- 把消息或衍生消息轉(zhuǎn)發(fā)給另外一個Actor
- 實例化一個新的Actor并把消息轉(zhuǎn)發(fā)給它
或者,,如果這個Actor認為合適的話,可能會完全忽略這條消息(也就是說,,它可能選擇不響應),。
為了實現(xiàn)一個Actor,需要繼承Akka.Actor.Actor這個Trait(一般譯為“特征”,,譯法有一定爭議,,文中保留原詞)并實現(xiàn)Receive方法。當一個消息發(fā)送給Actor時,,它的Receive方法會被(Akka)調(diào)用,。典型的實現(xiàn)包括使用模式匹配(Pattern
Matching)來識別消息類型并作出響應,,參見下面的Akka示例: - import akka.actor.Actor
- import akka.actor.Props
- import akka.event.Logging
- class MyActor extends Actor {
- def receive = {
- case value: String => doSomething(value)
- case _ => println("received unknown message")
- }
- }
模式匹配是一種相對優(yōu)雅的處理消息的技術(shù),相比基于回調(diào)的實現(xiàn),,更傾向于產(chǎn)生“更整潔”以及更容易瀏覽的代碼,。例如,考慮一個簡化版的HTTP請求/響應實現(xiàn),。
首先,我們使用JavaScript中基于回調(diào)的方式實現(xiàn): - route(url, function(request){
- var query = buildQuery(request);
- dbCall(query, function(dbResponse){
- var wsRequest = buildWebServiceRequest(dbResponse);
- wsCall(wsRequest, function(wsResponse) {
- sendReply(wsResponse);
- });
- });
- });
現(xiàn)在,,我們把它和基于模式匹配的實現(xiàn)做個比較:
- msg match {
- case HttpRequest(request) => {
- val query = buildQuery(request)
- dbCall(query)
- }
- case DbResponse(dbResponse) => {
- var wsRequest = buildWebServiceRequest(dbResponse);
- wsCall(dbResponse)
- }
- case WsResponse(wsResponse) => sendReply(wsResponse)
- }
雖然基于回調(diào)的JavaScript代碼更緊湊,,但確實更難以閱讀和瀏覽。相比而言,,基于模式匹配的代碼對于需要考慮哪些情況,、每種情況都是怎么處理的寫法更加清晰。
Actor系統(tǒng)
把一個復雜的問題不斷分解成更小規(guī)模的子問題通常是一種可靠的解決問題的技術(shù),。這個方法對于計算機科學特別有效(和
單一職責原則一致),,因為這樣容易產(chǎn)生整潔的、模塊化的代碼,,產(chǎn)生的冗余很少甚至沒有,,而且維護起來相對容易。
在基于Actor的設計里,,使用這種技術(shù)有助于把Actor的邏輯組織變成一個層級結(jié)構(gòu),,也就是所謂的
Actor系統(tǒng)。Actor系統(tǒng)提供了一個基礎框架,,通過這個系統(tǒng)Actor之間可以進行交互,。
在Akka里面,和Actor通信的唯一方式就是通過ActorRef ,。ActorRef 代表Actor的一個引用,,可以阻止其他對象直接訪問或操作這個Actor的內(nèi)部信息和狀態(tài)。消息可以通過一個ActorRef 以下面的語法協(xié)議中的一種發(fā)送到一個Actor:
-! (“告知”) —— 發(fā)送消息并立即返回
-? (“請求”) —— 發(fā)送消息并返回一個Future對象,,代表一個可能的應答
每個Actor都有一個收件箱,,用來接收發(fā)送過來的消息。收件箱有多種實現(xiàn)方式可以選擇,,缺省的實現(xiàn)是先進先出(FIFO)隊列,。
在處理多條消息時,一個Actor包含多個實例變量來保持狀態(tài),。Akka確保Actor的每個實例都運行在自己的輕量級線程里,,并保證每次只處理一條消息。這樣一來,,開發(fā)者不必擔心同步或競態(tài)條件,,而每個Actor的狀態(tài)都可以被可靠地保持,。
Akka的Actor API中提供了每個Actor執(zhí)行任務所需要的有用信息:
sender :當前處理消息的發(fā)送者的一個ActorRef 引用
context :Actor運行上下文相關的信息和方法(例如,包括實例化一個新Actor的方法ActorOf )
supervisionStrategy :定義用來從錯誤中恢復的策略
self :Actor本身的ActorRef 引用
Akka確保Actor的每個實例都運行在自己的輕量級線程里,,并保證每次只處理一條消息,。這樣一來,開發(fā)者不必擔心同步或競態(tài)條件,,而每個Actor的狀態(tài)都可以被可靠地保持,。
為了把這些教程組織起來,讓我們來考慮一個簡單的例子:統(tǒng)計一個文本文件中單詞的數(shù)量,。
為了達到演示Akka示例的目的,,我們把這個問題分解為兩個子任務;即(1)統(tǒng)計每行單詞數(shù)量的“孩子”任務和(2)匯總這些單行單詞數(shù)量,、得到文件里單詞總數(shù)的“父親”任務,。
父Actor會從文件中裝載每一行,然后委托一個子Actor來計算某一行的單詞數(shù)量,。當子Actor完成之后,,它會把結(jié)果用消息發(fā)回給父Actor。父Actor會收到(每一行的)單詞數(shù)量的消息并維持一個整個文件單詞總數(shù)的計數(shù)器,,這個計數(shù)器會在完成后返回給調(diào)用者,。
(注意以下提供的Akka教程的例子只是為了教學目的,所以沒有顧及所有的邊界條件,、性能優(yōu)化等,。同時,完整可編譯版本的代碼示例可以在這個GIST中找到)
讓我們首先看一個子類StringCounterActor 的示例實現(xiàn):
- case class ProcessStringMsg(string: String)
- case class StringProcessedMsg(words: Integer)
- class StringCounterActor extends Actor {
- def receive = {
- case ProcessStringMsg(string) => {
- val wordsInLine = string.split(" ").length
- sender ! StringProcessedMsg(wordsInLine)
- }
- case _ => println("Error: message not recognized")
- }
- }
這個Actor有一個非常簡單的任務:接收ProcessStringMsg 消息(包含一行文本),,計算這行文本中單詞的數(shù)量,,并把結(jié)果通過一個StringProcessedMsg 消息返回給發(fā)送者。請注意我們已經(jīng)實現(xiàn)了我們的類,,使用,! (“告知”)方法發(fā)出StringProcessedMsg 消息(發(fā)出消息并立即返回)。
好了,,現(xiàn)在我們來關注父WordCounterActor 類: - case class StartProcessFileMsg()
-
- class WordCounterActor(filename: String) extends Actor {
-
- private var running = false
- private var totalLines = 0
- private var linesProcessed = 0
- private var result = 0
- private var fileSender: Option[ActorRef] = None
-
- def receive = {
- case StartProcessFileMsg() => {
- if (running) {
- // println just used for example purposes;
- // Akka logger should be used instead
- println("Warning: duplicate start message received")
- } else {
- running = true
- fileSender = Some(sender) // save reference to process invoker
- import scala.io.Source._
- fromFile(filename).getLines.foreach { line =>
- context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
- totalLines += 1
- }
- }
- }
- case StringProcessedMsg(words) => {
- result += words
- linesProcessed += 1
- if (linesProcessed == totalLines) {
- fileSender.map(_ ! result) // provide result to process invoker
- }
- }
- case _ => println("message not recognized!")
- }
- }
這里面有很多細節(jié),,我們來逐一考察(注意討論中所引用的行號基于以上代碼示例)。
首先,,請注意要處理的文件名被傳給了WordCounterActor 的構(gòu)造方法(第3行),。這意味著這個Actor只會用來處理一個單獨的文件。這樣通過避免重置狀態(tài)變量(running ,,totalLines ,,linesProcessed 和result )也簡化了開發(fā)者的編碼工作,因為這個實例只使用一次(也就是說處理一個單獨的文件),然后就丟棄了,。
接下來,,我們看到WordCounterActor 處理了兩種類型的消息:
StartProcessFileMsg (第12行)
- 從最初啟動
WordCounterActor 的外部Actor接收到的消息
- 收到這個消息之后,
WordCounterActor 首先檢查它收到的是不是一個重復的請求
- 如果這個請求是重復的,,那么
WordCounterActor 生成一個警告,,然后就不做別的事了(第16行)
- 如果這不是一個重復的請求:
WordCounterActor 在FileSender 實例變量(注意這是一個Option[ActorRef] 而不是一個Option[Actor] )中保存發(fā)送者的一個引用。當處理最終的StringProcessedMsg (從一個StringCounterActor 子類中接收,,如下文所述)時,,為了以后的訪問和響應,這個ActorRef 是必需的,。
- 然后
WordCounterActor 讀取文件,,當文件中每行都裝載之后,就會創(chuàng)建一個StringCounterActor ,,需要處理的包含行文本的消息就會傳遞給它(第21-24行)。
StringProcessedMsg (第27行)
- 當處理完成分配給它的行之后,,從
StringCounterActor 處接收到的消息
- 收到此消息之后,,
WordCounterActor 會把文件的行計數(shù)器增加,如果所有的行都處理完畢(也就是說,,當totalLines 和linesProcessed 相等),,它會把最終結(jié)果發(fā)給原來的FileSender (第28-31行)。
再次需要注意的是,,在Akka里,,Actor之間通信的唯一機制就是消息傳遞。消息是Actor之間唯一共享的東西,,而且因為多個Actor可能會并發(fā)訪問同樣的消息,,所以為了避免競態(tài)條件和不可預期的行為,消息的不可變性非常重要,。
因為Case Class默認是不可變的并且可以和模式匹配無縫集成,,所以用Case Class的形式來傳遞消息是很常見的。(Scala中的Case
Class就是正常的類,,唯一不同的是通過模式匹配提供了可以遞歸分解的機制),。
讓我們通過運行整個應用的示例代碼來結(jié)束這個例子。 - object Sample extends App {
- import akka.util.Timeout
- import scala.concurrent.duration._
- import akka.pattern.ask
- import akka.dispatch.ExecutionContexts._
- implicit val ec = global
- override def main(args: Array[String]) {
- val system = ActorSystem("System")
- val actor = system.actorOf(Props(new WordCounterActor(args(0))))
- implicit val timeout = Timeout(25 seconds)
- val future = actor ? StartProcessFileMsg()
- future.map { result =>
- println("Total number of words " + result)
- system.shutdown
- }
- }
- }
請注意這里的? 方法是怎樣發(fā)送一條消息的,。用這種方法,,調(diào)用者可以使用返回的
Future對象,當完成之后可以打印出最后結(jié)果并最終通過停掉Actor系統(tǒng)退出程序,。
Akka的容錯和監(jiān)管者策略
在Actor系統(tǒng)里,,每個Actor都是其子孫的監(jiān)管者。如果Actor處理消息時失敗,,它就會暫停自己及其子孫并發(fā)送一個消息給它的監(jiān)管者,,通常是以異常的形式,。
在Akka里面,監(jiān)管者策略是定義你的系統(tǒng)容錯行為的主要并且直接的機制,。
在Akka里面,,一個監(jiān)管者對于從子孫傳遞上來的異常的響應和處理方式稱作監(jiān)管者策略。
監(jiān)管者策略是定義你的系統(tǒng)容錯行為的主要并且直接的機制,。
當一條消息指示有一個錯誤到達了一個監(jiān)管者,,它會采取如下行動之一:
- 恢復孩子(及其子孫),保持內(nèi)部狀態(tài),。 當孩子的狀態(tài)沒有被錯誤破壞,,還可以繼續(xù)正常工作的時候,可以使用這種策略,。
- 重啟孩子(及其子孫),,清除內(nèi)部狀態(tài)。 這種策略應用的場景和第一種正好相反,。如果孩子的狀態(tài)已經(jīng)被錯誤破壞,,在它可以被用到Future之前有必須要重置其內(nèi)部狀態(tài)。
- 永久地停掉孩子(及其子孫),。 這種策略可以用在下面的場景中:錯誤條件不能被修正,,但是并不影響后面執(zhí)行的操作,這些操作可以在失敗的孩子不存在的情況下完成,。
- 停掉自己并向上傳播錯誤,。 適用場景:當監(jiān)管者不知道如何處理錯誤,就把錯誤傳遞給自己的監(jiān)管者,。
而且,,一個Actor可以決定是否把行動應用在失敗的子孫上抑或是應用到它的兄弟上。有兩種預定義的策略:
OneForOneStrategy :只把指定行動應用到失敗的孩子上
AllForOneStrategy :把指定行動應用到所有子孫上
下面是一個使用OneForOneStrategy 的簡單例子: - import akka.actor.OneForOneStrategy
- import akka.actor.SupervisorStrategy._
- import scala.concurrent.duration._
- override val supervisorStrategy =
- OneForOneStrategy() {
- case _: ArithmeticException => Resume
- case _: NullPointerException => Restart
- case _: IllegalArgumentException => Stop
- case _: Exception => Escalate
- }
如果沒有指定策略,,那么就使用如下默認的策略:
- 如果在初始化Actor時出錯,,或者Actor被結(jié)束(Killed),那么Actor就會停止(Stopped)
- 如果有任何類型的異常出現(xiàn),,Actor就會重啟
Akka提供的默認策略的實現(xiàn)如下: - final val defaultStrategy: SupervisorStrategy = {
- def defaultDecider: Decider = {
- case _: ActorInitializationException ? Stop
- case _: ActorKilledException ? Stop
- case _: Exception ? Restart
- }
- OneForOneStrategy()(defaultDecider)
- }
Akka也考慮到對
定制化監(jiān)管者策略的實現(xiàn),,但正如Akka文檔也提出了警告,這么做要小心,,因為錯誤的實現(xiàn)會產(chǎn)生諸如Actor系統(tǒng)被阻塞的問題(也就是說,,其中的多個Actor被永久掛起了)。
本地透明性
Akka架構(gòu)支持
本地透明性,,使得Actor完全不知道他們接受的消息是從哪里發(fā)出來的,。消息的發(fā)送者可能駐留在同一個JVM,也有可能是存在于其他的JVM(或者運行在同一個節(jié)點,或者運行在不同的節(jié)點),。Akka處理這些情況對于Actor(也即對于開發(fā)者)來說是完全透明的,。唯一需要說明的是跨越節(jié)點的消息必須要被序列化。
Akka架構(gòu)支持本地透明性,,使得Actor完全不知道他們接受的消息是從哪里發(fā)出來的,。
Actor系統(tǒng)設計的初衷,就是不需要任何專門的代碼就可以運行在分布式環(huán)境中,。Akka只需要一個配置文件(Application.Conf),,用以說明發(fā)送消息到哪些節(jié)點。下面是配置文件的一個例子:
- akka {
- actor {
- provider = "akka.remote.RemoteActorRefProvider"
- }
- remote {
- transport = "akka.remote.netty.NettyRemoteTransport"
- netty {
- hostname = "127.0.0.1"
- port = 2552
- }
- }
- }
最后的一些提示
我們已經(jīng)了解了Akka框架幫助完成并發(fā)和高性能的方法,。然而,,正如這篇教程指出的,為了充分發(fā)揮Akka的能力,,在設計和實現(xiàn)系統(tǒng)時,,有些要點值得考慮:
- 我們應盡最大可能為每個Actor都分配最小的任務(如上面討論的,遵守單一職責原則)
- Actor應該異步處理事件(也就是處理消息),,不應該阻塞,,否則就會發(fā)生上下文切換,影響性能,。具體來說,最好是在一個Future對象里執(zhí)行阻塞操作(例如IO),,這樣就不會阻塞Actor,,如:
- case evt => blockingCall() // BAD
- case evt => Future {
- blockingCall() // GOOD
- }
- 要確認你的消息都是不可變的,因為互相傳遞消息的Actor都在它們自己的線程里并發(fā)運行,??勺兊南⒑苡锌赡軐е虏豢深A期的行為。
- 由于在節(jié)點之間發(fā)送的消息必須是可序列化的,,所以必須要記住消息體越大,,序列化、發(fā)送和反序列化所花費的時間就越多,,這也會降低性能,。
結(jié)論
Akka用Scala語言寫成,簡化并為開發(fā)高并發(fā),、分布式和容錯式應用提供了便利,,對開發(fā)者隱藏了很大程度的復雜性。把Akka用好肯定需要了解比這個教程更多的內(nèi)容,,但是希望這里的介紹和示例能夠引起你的注意并繼續(xù)了解Akka,。
Amazon、VMWare和CSC只是現(xiàn)在積極使用Akka的一部分領軍企業(yè)??梢栽L問
Akka的官方網(wǎng)站學到更多的知識,,并多花點時間研究Akka是否適合你的項目。
原文鏈接:Akka Tutorial with Code: Concurrency and Fault Tolerance(責編/劉亞瓊)
免費訂閱“CSDN云計算(左)和CSDN大數(shù)據(jù)(右)”微信公眾號,,實時掌握第一手云中消息,,了解最新的大數(shù)據(jù)進展!
CSDN發(fā)布虛擬化,、Docker,、OpenStack、CloudStack,、數(shù)據(jù)中心等相關云計算資訊,, 分享Hadoop、Spark,、NoSQL/NewSQL,、HBase、Impala,、內(nèi)存計算,、流計算、機器學習和智能算法等相關大數(shù)據(jù)觀點,,提供云計算和大數(shù)據(jù)技術(shù),、平臺、實踐和產(chǎn)業(yè)信息等服務,。
|