久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

讓并發(fā)和容錯更容易:Akka示例教程

 richsky 2014-12-18

寫并發(fā)程序很難。程序員不得不處理線程,、鎖和競態(tài)條件等等,這個過程很容易出錯,,而且會導致程序代碼難以閱讀,、測試和維護。

所以,,很多人不傾向于使用多線程編程,。取而代之的是,他們使用單線程進程(譯者注:只含有一個線程的進程),,依賴外部服務(如數(shù)據(jù)庫,、隊列等)處理所需的并發(fā)或異步操作。雖然這種方法在有些情況下是可行的,,但還有很多其他情況不能奏效,。很多實時系統(tǒng)——例如交易或銀行業(yè)務應用,或?qū)崟r游戲——等待一個單線程進程完成就太奢侈了(他們需要立即應答?。F渌囊恍τ谟嬎慊蛸Y源要求非常高的系統(tǒng),如果在程序中不引入并行機制就會耗時很久(有些情況下可以達到幾個小時或數(shù)天),。

常用的一種單線程方法(例如,,在 Node.js里廣泛應用)是使用基于事件的、非阻塞模式(Event-Based, NON-Blocking Paradigm,,其中Paradigm也有譯作成例),。雖然這種方法可以避免上下文切換、鎖和阻塞,,的確能提高性能,,但還是不能解決并發(fā)使用多個處理器(需要啟動和協(xié)調(diào)多個獨立的處理器)的問題。

那么,,這是不是意味著為了構(gòu)建一個并發(fā)程序,,除了深入到線程、鎖和競態(tài)條件之外沒有別的選擇呢,?

感謝Akka框架,,它為我們提供了一種選擇。本教程介紹了Akka的示例,,并仔細研究它如何幫助并簡化分布式并發(fā)應用的實現(xiàn),。

Akka框架是什么

這篇文章介紹了Akka并仔細研究它如何幫助并簡化分布式并發(fā)應用的實現(xiàn)。

Akka是JVM(JAVA虛擬機,,下同)平臺上構(gòu)建高并發(fā),、分布式和容錯應用的工具包和運行時。Akka用 Scala語言寫成,,同時提供了Scala和JAVA的開發(fā)接口,。

Akka處理并發(fā)的方法基于 Actor(沒有慣用譯法,文中使用原詞)模型,。在基于Actor的系統(tǒng)里,,所有的事物都是Actor,就好像在面向?qū)ο笤O計里面所有的事物都是對象一樣,。但是有一個重要區(qū)別——特別是和我們的討論相關的——那就是Actor模型是作為一個并發(fā)模型設計和架構(gòu)的,,而面向?qū)ο竽J絼t不是。更具體一點,,在Scala的Actor系統(tǒng)里,,Actor互相交互并共享信息但并不對交互順序作出預設。Actor之間共享信息和發(fā)起任務的機制是消息傳遞,。

創(chuàng)建和調(diào)度線程,、接收和分發(fā)消息以及處理競態(tài)條件和同步的所有復雜性,都委托給框架,,框架的處理對應用來說是透明的,。

Akka在多個Actor和下面的系統(tǒng)之間建立了一個層次(Layer),,這樣一來,Actor只需要處理消息就可以了,。創(chuàng)建和調(diào)度線程,、接收和分發(fā)消息以及處理競態(tài)條件和同步的所有復雜性,都委托給框架,,框架的處理對應用來說是透明的,。

Actor嚴格遵守 響應式聲明。響應式應用的目標是通過滿足以下一個或多個條件來代替?zhèn)鹘y(tǒng)的多線程應用:

  • 事件驅(qū)動,。使用Actor,,代碼可以異步處理請求并用獨占的方式執(zhí)行非阻塞操作。
  • 可伸縮性,。在Akka里,,不修改代碼就增加節(jié)點是可能的,感謝消息傳遞和本地透明性(Location Transparency),。
  • 高彈性,。任何應用都會碰到錯誤并在某個時間點失敗。Akka的“監(jiān)管”(容錯)策略為實現(xiàn)自愈系統(tǒng)提供了便利,。
  • 響應式,。今天的高性能和快速響應應用需要對用戶快速反饋,因此對于事件的響應需要非常及時,。Akka的非阻塞,、基于消息的策略可以幫助達成這個目標。

Akka中的Actor是什么

Actor本質(zhì)上就是接收消息并采取行動處理消息的對象,。它從消息源中解耦出來,,只負責正確識別接收到的消息類型,并采取相應的行動,。

收到一條消息之后,,一個Actor可能會采取以下一個或多個行動:

  • 執(zhí)行一些本身的操作(例如進行計算、持久化數(shù)據(jù),、調(diào)用外部的Web服務等)
  • 把消息或衍生消息轉(zhuǎn)發(fā)給另外一個Actor
  • 實例化一個新的Actor并把消息轉(zhuǎn)發(fā)給它

或者,,如果這個Actor認為合適的話,可能會完全忽略這條消息(也就是說,,它可能選擇不響應),。

為了實現(xiàn)一個Actor,需要繼承Akka.Actor.Actor這個Trait(一般譯為“特征”,,譯法有一定爭議,,文中保留原詞)并實現(xiàn)Receive方法。當一個消息發(fā)送給Actor時,,它的Receive方法會被(Akka)調(diào)用,。典型的實現(xiàn)包括使用模式匹配(Pattern Matching)來識別消息類型并作出響應,,參見下面的Akka示例:

  1. import akka.actor.Actor  
  2. import akka.actor.Props  
  3. import akka.event.Logging  
  4. class MyActor extends Actor {  
  5.   def receive = {  
  6.     case value: String => doSomething(value)  
  7.     case _ => println("received unknown message")  
  8.   }  
  9. }  

模式匹配是一種相對優(yōu)雅的處理消息的技術(shù),相比基于回調(diào)的實現(xiàn),,更傾向于產(chǎn)生“更整潔”以及更容易瀏覽的代碼,。例如,考慮一個簡化版的HTTP請求/響應實現(xiàn),。

首先,我們使用JavaScript中基于回調(diào)的方式實現(xiàn):

  1. route(url, function(request){  
  2.   var query = buildQuery(request);  
  3.   dbCall(query, function(dbResponse){  
  4.     var wsRequest = buildWebServiceRequest(dbResponse);  
  5.     wsCall(wsRequest, function(wsResponse) {  
  6.       sendReply(wsResponse);  
  7.     });  
  8.   });  
  9. });  
現(xiàn)在,,我們把它和基于模式匹配的實現(xiàn)做個比較:

  1. msg match {  
  2.   case HttpRequest(request) => {  
  3.     val query = buildQuery(request)  
  4.     dbCall(query)  
  5.   }  
  6.   case DbResponse(dbResponse) => {  
  7.     var wsRequest = buildWebServiceRequest(dbResponse);  
  8.     wsCall(dbResponse)  
  9.   }  
  10.   case WsResponse(wsResponse) => sendReply(wsResponse)  
  11. }  
雖然基于回調(diào)的JavaScript代碼更緊湊,,但確實更難以閱讀和瀏覽。相比而言,,基于模式匹配的代碼對于需要考慮哪些情況,、每種情況都是怎么處理的寫法更加清晰。

Actor系統(tǒng)

把一個復雜的問題不斷分解成更小規(guī)模的子問題通常是一種可靠的解決問題的技術(shù),。這個方法對于計算機科學特別有效(和 單一職責原則一致),,因為這樣容易產(chǎn)生整潔的、模塊化的代碼,,產(chǎn)生的冗余很少甚至沒有,,而且維護起來相對容易。

在基于Actor的設計里,,使用這種技術(shù)有助于把Actor的邏輯組織變成一個層級結(jié)構(gòu),,也就是所謂的 Actor系統(tǒng)。Actor系統(tǒng)提供了一個基礎框架,,通過這個系統(tǒng)Actor之間可以進行交互,。

              Actor系統(tǒng)

在Akka里面,和Actor通信的唯一方式就是通過ActorRef,。ActorRef代表Actor的一個引用,,可以阻止其他對象直接訪問或操作這個Actor的內(nèi)部信息和狀態(tài)。消息可以通過一個ActorRef以下面的語法協(xié)議中的一種發(fā)送到一個Actor:
-!(“告知”) —— 發(fā)送消息并立即返回
-?(“請求”) —— 發(fā)送消息并返回一個Future對象,,代表一個可能的應答

每個Actor都有一個收件箱,,用來接收發(fā)送過來的消息。收件箱有多種實現(xiàn)方式可以選擇,,缺省的實現(xiàn)是先進先出(FIFO)隊列,。

在處理多條消息時,一個Actor包含多個實例變量來保持狀態(tài),。Akka確保Actor的每個實例都運行在自己的輕量級線程里,,并保證每次只處理一條消息。這樣一來,,開發(fā)者不必擔心同步或競態(tài)條件,,而每個Actor的狀態(tài)都可以被可靠地保持,。

Akka的Actor API中提供了每個Actor執(zhí)行任務所需要的有用信息:

  • sender:當前處理消息的發(fā)送者的一個ActorRef引用
  • context:Actor運行上下文相關的信息和方法(例如,包括實例化一個新Actor的方法ActorOf
  • supervisionStrategy:定義用來從錯誤中恢復的策略
  • self:Actor本身的ActorRef引用
Akka確保Actor的每個實例都運行在自己的輕量級線程里,,并保證每次只處理一條消息,。這樣一來,開發(fā)者不必擔心同步或競態(tài)條件,,而每個Actor的狀態(tài)都可以被可靠地保持,。

為了把這些教程組織起來,讓我們來考慮一個簡單的例子:統(tǒng)計一個文本文件中單詞的數(shù)量,。

為了達到演示Akka示例的目的,,我們把這個問題分解為兩個子任務;即(1)統(tǒng)計每行單詞數(shù)量的“孩子”任務和(2)匯總這些單行單詞數(shù)量,、得到文件里單詞總數(shù)的“父親”任務,。

父Actor會從文件中裝載每一行,然后委托一個子Actor來計算某一行的單詞數(shù)量,。當子Actor完成之后,,它會把結(jié)果用消息發(fā)回給父Actor。父Actor會收到(每一行的)單詞數(shù)量的消息并維持一個整個文件單詞總數(shù)的計數(shù)器,,這個計數(shù)器會在完成后返回給調(diào)用者,。

(注意以下提供的Akka教程的例子只是為了教學目的,所以沒有顧及所有的邊界條件,、性能優(yōu)化等,。同時,完整可編譯版本的代碼示例可以在這個GIST中找到)

讓我們首先看一個子類StringCounterActor的示例實現(xiàn):

  1. case class ProcessStringMsg(string: String)  
  2. case class StringProcessedMsg(words: Integer)  
  3. class StringCounterActor extends Actor {  
  4.   def receive = {  
  5.     case ProcessStringMsg(string) => {  
  6.       val wordsInLine = string.split(" ").length  
  7.       sender ! StringProcessedMsg(wordsInLine)  
  8.     }  
  9.     case _ => println("Error: message not recognized")  
  10.   }  
  11. }  

這個Actor有一個非常簡單的任務:接收ProcessStringMsg消息(包含一行文本),,計算這行文本中單詞的數(shù)量,,并把結(jié)果通過一個StringProcessedMsg消息返回給發(fā)送者。請注意我們已經(jīng)實現(xiàn)了我們的類,,使用,!(“告知”)方法發(fā)出StringProcessedMsg消息(發(fā)出消息并立即返回)。

好了,,現(xiàn)在我們來關注父WordCounterActor類:

  1.  case class StartProcessFileMsg()  
  2.   
  3.  class WordCounterActor(filename: String) extends Actor {  
  4.   
  5.    private var running = false  
  6.    private var totalLines = 0  
  7.    private var linesProcessed = 0  
  8.    private var result = 0  
  9.    private var fileSender: Option[ActorRef] = None  
  10.   
  11.   def receive = {  
  12.     case StartProcessFileMsg() => {  
  13.       if (running) {  
  14.         // println just used for example purposes;  
  15.         // Akka logger should be used instead  
  16.         println("Warning: duplicate start message received")  
  17.       } else {  
  18.         running = true  
  19.         fileSender = Some(sender) // save reference to process invoker  
  20.         import scala.io.Source._  
  21.         fromFile(filename).getLines.foreach { line =>  
  22.           context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)  
  23.           totalLines += 1  
  24.         }  
  25.       }  
  26.     }  
  27.     case StringProcessedMsg(words) => {  
  28.       result += words  
  29.       linesProcessed += 1  
  30.       if (linesProcessed == totalLines) {  
  31.         fileSender.map(_ ! result)  // provide result to process invoker  
  32.       }  
  33.     }  
  34.     case _ => println("message not recognized!")  
  35.   }  
  36. }  

這里面有很多細節(jié),,我們來逐一考察(注意討論中所引用的行號基于以上代碼示例)。

首先,,請注意要處理的文件名被傳給了WordCounterActor的構(gòu)造方法(第3行),。這意味著這個Actor只會用來處理一個單獨的文件。這樣通過避免重置狀態(tài)變量(running,,totalLines,,linesProcessedresult)也簡化了開發(fā)者的編碼工作,因為這個實例只使用一次(也就是說處理一個單獨的文件),然后就丟棄了,。

接下來,,我們看到WordCounterActor處理了兩種類型的消息:

  • StartProcessFileMsg(第12行)
    • 從最初啟動WordCounterActor的外部Actor接收到的消息
    • 收到這個消息之后,WordCounterActor首先檢查它收到的是不是一個重復的請求
    • 如果這個請求是重復的,,那么WordCounterActor生成一個警告,,然后就不做別的事了(第16行)
    • 如果這不是一個重復的請求:
      • WordCounterActorFileSender實例變量(注意這是一個Option[ActorRef]而不是一個Option[Actor])中保存發(fā)送者的一個引用。當處理最終的StringProcessedMsg(從一個StringCounterActor子類中接收,,如下文所述)時,,為了以后的訪問和響應,這個ActorRef是必需的,。
      • 然后WordCounterActor讀取文件,,當文件中每行都裝載之后,就會創(chuàng)建一個StringCounterActor,,需要處理的包含行文本的消息就會傳遞給它(第21-24行)。
  • StringProcessedMsg(第27行)
    • 當處理完成分配給它的行之后,,從StringCounterActor處接收到的消息
    • 收到此消息之后,,WordCounterActor會把文件的行計數(shù)器增加,如果所有的行都處理完畢(也就是說,,當totalLineslinesProcessed相等),,它會把最終結(jié)果發(fā)給原來的FileSender(第28-31行)。

再次需要注意的是,,在Akka里,,Actor之間通信的唯一機制就是消息傳遞。消息是Actor之間唯一共享的東西,,而且因為多個Actor可能會并發(fā)訪問同樣的消息,,所以為了避免競態(tài)條件和不可預期的行為,消息的不可變性非常重要,。

因為Case Class默認是不可變的并且可以和模式匹配無縫集成,,所以用Case Class的形式來傳遞消息是很常見的。(Scala中的Case Class就是正常的類,,唯一不同的是通過模式匹配提供了可以遞歸分解的機制),。

讓我們通過運行整個應用的示例代碼來結(jié)束這個例子。

  1. object Sample extends App {  
  2.   import akka.util.Timeout  
  3.   import scala.concurrent.duration._  
  4.   import akka.pattern.ask  
  5.   import akka.dispatch.ExecutionContexts._  
  6.   implicit val ec = global  
  7.   override def main(args: Array[String]) {  
  8.     val system = ActorSystem("System")  
  9.     val actor = system.actorOf(Props(new WordCounterActor(args(0))))  
  10.     implicit val timeout = Timeout(25 seconds)  
  11.     val future = actor ? StartProcessFileMsg()  
  12.     future.map { result =>  
  13.       println("Total number of words " + result)  
  14.       system.shutdown  
  15.     }  
  16.   }  
  17. }  
請注意這里的?方法是怎樣發(fā)送一條消息的,。用這種方法,,調(diào)用者可以使用返回的 Future對象,當完成之后可以打印出最后結(jié)果并最終通過停掉Actor系統(tǒng)退出程序,。

Akka的容錯和監(jiān)管者策略

在Actor系統(tǒng)里,,每個Actor都是其子孫的監(jiān)管者。如果Actor處理消息時失敗,,它就會暫停自己及其子孫并發(fā)送一個消息給它的監(jiān)管者,,通常是以異常的形式,。

在Akka里面,監(jiān)管者策略是定義你的系統(tǒng)容錯行為的主要并且直接的機制,。

在Akka里面,,一個監(jiān)管者對于從子孫傳遞上來的異常的響應和處理方式稱作監(jiān)管者策略。 監(jiān)管者策略是定義你的系統(tǒng)容錯行為的主要并且直接的機制,。

當一條消息指示有一個錯誤到達了一個監(jiān)管者,,它會采取如下行動之一:

  • 恢復孩子(及其子孫),保持內(nèi)部狀態(tài),。 當孩子的狀態(tài)沒有被錯誤破壞,,還可以繼續(xù)正常工作的時候,可以使用這種策略,。
  • 重啟孩子(及其子孫),,清除內(nèi)部狀態(tài)。 這種策略應用的場景和第一種正好相反,。如果孩子的狀態(tài)已經(jīng)被錯誤破壞,,在它可以被用到Future之前有必須要重置其內(nèi)部狀態(tài)。
  • 永久地停掉孩子(及其子孫),。 這種策略可以用在下面的場景中:錯誤條件不能被修正,,但是并不影響后面執(zhí)行的操作,這些操作可以在失敗的孩子不存在的情況下完成,。
  • 停掉自己并向上傳播錯誤,。 適用場景:當監(jiān)管者不知道如何處理錯誤,就把錯誤傳遞給自己的監(jiān)管者,。

而且,,一個Actor可以決定是否把行動應用在失敗的子孫上抑或是應用到它的兄弟上。有兩種預定義的策略:

  • OneForOneStrategy:只把指定行動應用到失敗的孩子上
  • AllForOneStrategy:把指定行動應用到所有子孫上

下面是一個使用OneForOneStrategy的簡單例子:

  1. import akka.actor.OneForOneStrategy  
  2. import akka.actor.SupervisorStrategy._  
  3. import scala.concurrent.duration._  
  4. override val supervisorStrategy =  
  5.  OneForOneStrategy() {  
  6.    case _: ArithmeticException      => Resume  
  7.    case _: NullPointerException     => Restart  
  8.    case _: IllegalArgumentException => Stop  
  9.    case _: Exception                => Escalate  
  10.  }  

如果沒有指定策略,,那么就使用如下默認的策略:

  • 如果在初始化Actor時出錯,,或者Actor被結(jié)束(Killed),那么Actor就會停止(Stopped)
  • 如果有任何類型的異常出現(xiàn),,Actor就會重啟

Akka提供的默認策略的實現(xiàn)如下:

  1. final val defaultStrategy: SupervisorStrategy = {  
  2.   def defaultDecider: Decider = {  
  3.     case _: ActorInitializationException ? Stop  
  4.     case _: ActorKilledException         ? Stop  
  5.     case _: Exception                    ? Restart  
  6.   }  
  7.   OneForOneStrategy()(defaultDecider)  
  8. }  
Akka也考慮到對 定制化監(jiān)管者策略的實現(xiàn),,但正如Akka文檔也提出了警告,這么做要小心,,因為錯誤的實現(xiàn)會產(chǎn)生諸如Actor系統(tǒng)被阻塞的問題(也就是說,,其中的多個Actor被永久掛起了)。

本地透明性

Akka架構(gòu)支持 本地透明性,,使得Actor完全不知道他們接受的消息是從哪里發(fā)出來的,。消息的發(fā)送者可能駐留在同一個JVM,也有可能是存在于其他的JVM(或者運行在同一個節(jié)點,或者運行在不同的節(jié)點),。Akka處理這些情況對于Actor(也即對于開發(fā)者)來說是完全透明的,。唯一需要說明的是跨越節(jié)點的消息必須要被序列化。

Akka架構(gòu)支持本地透明性,,使得Actor完全不知道他們接受的消息是從哪里發(fā)出來的,。

Actor系統(tǒng)設計的初衷,就是不需要任何專門的代碼就可以運行在分布式環(huán)境中,。Akka只需要一個配置文件(Application.Conf),,用以說明發(fā)送消息到哪些節(jié)點。下面是配置文件的一個例子:

  1. akka {  
  2.   actor {  
  3.     provider = "akka.remote.RemoteActorRefProvider"  
  4.   }  
  5.   remote {  
  6.     transport = "akka.remote.netty.NettyRemoteTransport"  
  7.     netty {  
  8.       hostname = "127.0.0.1"  
  9.       port = 2552  
  10.     }  
  11.   }  
  12. }  

最后的一些提示

我們已經(jīng)了解了Akka框架幫助完成并發(fā)和高性能的方法,。然而,,正如這篇教程指出的,為了充分發(fā)揮Akka的能力,,在設計和實現(xiàn)系統(tǒng)時,,有些要點值得考慮:

  • 我們應盡最大可能為每個Actor都分配最小的任務(如上面討論的,遵守單一職責原則)
  • Actor應該異步處理事件(也就是處理消息),,不應該阻塞,,否則就會發(fā)生上下文切換,影響性能,。具體來說,最好是在一個Future對象里執(zhí)行阻塞操作(例如IO),,這樣就不會阻塞Actor,,如:
  1. case evt => blockingCall() // BAD  
  2. case evt => Future {  
  3.     blockingCall()           // GOOD  
  4. }  
  • 要確認你的消息都是不可變的,因為互相傳遞消息的Actor都在它們自己的線程里并發(fā)運行,??勺兊南⒑苡锌赡軐е虏豢深A期的行為。
  • 由于在節(jié)點之間發(fā)送的消息必須是可序列化的,,所以必須要記住消息體越大,,序列化、發(fā)送和反序列化所花費的時間就越多,,這也會降低性能,。

結(jié)論

Akka用Scala語言寫成,簡化并為開發(fā)高并發(fā),、分布式和容錯式應用提供了便利,,對開發(fā)者隱藏了很大程度的復雜性。把Akka用好肯定需要了解比這個教程更多的內(nèi)容,,但是希望這里的介紹和示例能夠引起你的注意并繼續(xù)了解Akka,。

Amazon、VMWare和CSC只是現(xiàn)在積極使用Akka的一部分領軍企業(yè)??梢栽L問 Akka的官方網(wǎng)站學到更多的知識,,并多花點時間研究Akka是否適合你的項目。

原文鏈接:Akka Tutorial with Code: Concurrency and Fault Tolerance(責編/劉亞瓊)


免費訂閱“CSDN云計算(左)CSDN大數(shù)據(jù)(右)”微信公眾號,,實時掌握第一手云中消息,,了解最新的大數(shù)據(jù)進展!

CSDN發(fā)布虛擬化,、Docker,、OpenStack、CloudStack,、數(shù)據(jù)中心等相關云計算資訊,,     分享Hadoop、Spark,、NoSQL/NewSQL,、HBase、Impala,、內(nèi)存計算,、流計算、機器學習和智能算法等相關大數(shù)據(jù)觀點,,提供云計算和大數(shù)據(jù)技術(shù),、平臺、實踐和產(chǎn)業(yè)信息等服務,。        

    本站是提供個人知識管理的網(wǎng)絡存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點,。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導購買等信息,謹防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多