久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

elasticJob

 liang1234_ 2021-06-06
                     elastic-job是當當開源的一款非常好用的作業(yè)框架,,在這之前,,我們開發(fā)定時任務一般都是使用quartz或者spring-task(ScheduledExecutorService),無論是使用quartz還是spring-task,,我們都會至少遇到兩個痛點:
1.不敢輕易跟著應用服務多節(jié)點部署,,可能會重復多次執(zhí)行而引發(fā)系統(tǒng)邏輯的錯誤,。
2.quartz的集群僅僅只是用來HA,節(jié)點數(shù)量的增加并不能給我們的每次執(zhí)行效率帶來提升,,即不能實現(xiàn)水平擴展,。

本篇博文將會自頂向下地介紹elastic-job,讓大家認識了解并且快速搭建起環(huán)境,。

elastic-job產(chǎn)品線說明

elastic-job2.x之后,,出了兩個產(chǎn)品線:Elastic-Job-LiteElastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿足需求,,本文也是以Elastic-Job-Lite為主,。1.x系列對應的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心類名,,差別雖大,,原理類似,建議使用2.x系列,。寫此博文,,最新release版本為2.0.5


elastic-job-lite原理

舉個典型的job場景,,比如余額寶里的昨日收益,,系統(tǒng)需要job在每天某個時間點開始,給所有余額寶用戶計算收益,。如果用戶數(shù)量不多,,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執(zhí)行,,循環(huán)遍歷所有用戶計算利息,,這沒問題??墒?,如果用戶體量特別大,我們可能會面臨著在第二天之前處理不完這么多用戶,。另外,,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp里,,webapp通常是多節(jié)點部署的,,這樣,我們的job也就是多節(jié)點,,多個job同時執(zhí)行,,很容易造成重復執(zhí)行,比如用戶重復計息,為了避免這種情況,,我們可能會對job的執(zhí)行加鎖,,保證始終只有一個節(jié)點能執(zhí)行,或者干脆讓job從webapp里剝離出來,,獨自部署一個節(jié)點,。
elastic-job就可以幫助我們解決上面的問題,,elastic底層的任務調(diào)度還是使用的quartz,,通過zookeeper來動態(tài)給job節(jié)點分片。
我們來看:
很大體量的用戶需要在特定的時間段內(nèi)計息完成
我們肯定是希望我們的任務可以通過集群達到水平擴展,,集群里的每個節(jié)點都處理部分用戶,,不管用戶數(shù)量有多龐大,我們只要增加機器就可以了,,比如單臺機器特定時間能處理n個用戶,,2臺機器處理2n個用戶,3臺3n,,4臺4n...,,再多的用戶也不怕了。
使用elastic-job開發(fā)的作業(yè)都是zookeeper的客戶端,,比如我希望3臺機器跑job,,我們將任務分成3片,框架通過zk的協(xié)調(diào),,最終會讓3臺機器分別分配到0,1,2的任務片,,比如server0-->0,server1-->1,,server2-->2,,當server0執(zhí)行時,可以只查詢id%3==0的用戶,,server1執(zhí)行時,,只查詢id%3==1的用戶,server2執(zhí)行時,,只查詢id%3==2的用戶,。
任務部署多節(jié)點引發(fā)重復執(zhí)行
在上面的基礎(chǔ)上,我們再增加server3,,此時,,server3分不到任務分片,因為只有3片,,已經(jīng)分完了,。沒有分到任務分片的作業(yè)程序?qū)⒉粓?zhí)行。
如果此時server2掛了,那么server2的分片項會分配給server3,,server3有了分片,,就會替代server2執(zhí)行。
如果此時server3也掛了,,只剩下server0和server1了,,框架也會自動把server3的分片隨機分配給server0或者server1,可能會這樣,,server0-->0,,server1-->1,2。
這種特性稱之為彈性擴容,,即elastic-job名稱的由來,。

代碼演示

我們搭建環(huán)境通過示例代碼來演示上面的例子,elastic-job是不支持單機多實例的,,通過zk的協(xié)調(diào)分片是以ip為單元的,。很多同學上來可能就是通過單機多實例來學習,結(jié)果導致分片和預期不一致,。這里沒辦法,,只能通過多機器或者虛擬機,我們這里使用虛擬機,,另外,,由于資源有限,我們這里僅僅只模擬兩臺機器,。

節(jié)點說明:
本地宿主機器
zookeeper,、job
192.168.241.1

虛擬機
job
192.168.241.128

環(huán)境說明:
Java
請使用JDK1.7及其以上版本。
Zookeeper
請使用Zookeeper3.4.6及其以上版本
Elastic-Job-Lite
2.0.5(2.x系列即可,,最好是2.0.4及其以上,,因為2.0.4版本有本人提交的少許代碼,(*^__^*) 嘻嘻……)

需求說明:
通過兩臺機器演示動態(tài)分片

step1. 引入框架的jar包
  1. <!-- 引入elastic-job-lite核心模塊 -->
  2. <dependency>
  3. <groupId>com.dangdang</groupId>
  4. <artifactId>elastic-job-lite-core</artifactId>
  5. <version>2.0.5</version>
  6. </dependency>
  7. <!-- 使用springframework自定義命名空間時引入 -->
  8. <dependency>
  9. <groupId>com.dangdang</groupId>
  10. <artifactId>elastic-job-lite-spring</artifactId>
  11. <version>2.0.5</version>
  12. </dependency>
step2. 編寫job
  1. package com.fanfan.sample001;
  2. import com.dangdang.ddframe.job.api.ShardingContext;
  3. import com.dangdang.ddframe.job.api.simple.SimpleJob;
  4. import java.util.Date;
  5. /**
  6. * Created by fanfan on 2016/12/20.
  7. */
  8. public class MySimpleJob implements SimpleJob {
  9. @Override
  10. public void execute(ShardingContext shardingContext) {
  11. System.out.println(String.format("------Thread ID: %s, 任務總片數(shù): %s, 當前分片項: %s",
  12. Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
  13. /**
  14. * 實際開發(fā)中,,有了任務總片數(shù)和當前分片項,,就可以對任務進行分片執(zhí)行了
  15. * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
  16. */
  17. }
  18. }
Step3. Spring配置
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www./schema/beans"
  3. xmlns:xsi="http://www./2001/XMLSchema-instance"
  4. xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
  5. xmlns:job="http://www.dangdang.com/schema/ddframe/job"
  6. xsi:schemaLocation="http://www./schema/beans
  7. http://www./schema/beans/spring-beans.xsd
  8. http://www.dangdang.com/schema/ddframe/reg
  9. http://www.dangdang.com/schema/ddframe/reg/reg.xsd
  10. http://www.dangdang.com/schema/ddframe/job
  11. http://www.dangdang.com/schema/ddframe/job/job.xsd">
  12. <!--配置作業(yè)注冊中心 -->
  13. <reg:zookeeper id="regCenter" server-lists="192.168.241.1:2181" namespace="dd-job"
  14. base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
  15. <!-- 配置作業(yè)-->
  16.     <job:simple id="mySimpleJob" class="com.fanfan.sample001.MySimpleJob" registry-center-ref="regCenter"
  17.                 sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true" />
  18. </beans>

Case1. 單節(jié)點








Case2. 增加一個節(jié)點













Case3. 斷開一個節(jié)點






作業(yè)類型

elastic-job提供了三種類型的作業(yè):Simple類型作業(yè)、Dataflow類型作業(yè),、Script類型作業(yè),。這里主要講解前兩者。Script類型作業(yè)意為腳本類型作業(yè),,支持shell,,python,perl等所有類型腳本,,使用不多,,可以參見github文檔,。

SimpleJob需要實現(xiàn)SimpleJob接口,意為簡單實現(xiàn),,未經(jīng)過任何封裝,,與quartz原生接口相似,比如示例代碼中所使用的job,。

Dataflow類型用于處理數(shù)據(jù)流,,需實現(xiàn)DataflowJob接口。該接口提供2個方法可供覆蓋,,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù),。
可通過DataflowJobConfiguration配置是否流式處理。
流式處理數(shù)據(jù)只有fetchData方法的返回值為null或集合長度為空時,,作業(yè)才停止抓取,,否則作業(yè)將一直運行下去,; 非流式處理數(shù)據(jù)則只會在每次作業(yè)執(zhí)行過程中執(zhí)行一次fetchData方法和processData方法,,隨即完成本次作業(yè)。
實際開發(fā)中,,Dataflow類型的job還是很有好用的,。

比如拿余額寶計息來說:

  1. package com.fanfan.sample001;
  2. import com.dangdang.ddframe.job.api.ShardingContext;
  3. import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. /**
  7. * Created by fanfan on 2016/12/23.
  8. */
  9. public class MyDataFlowJob implements DataflowJob<User> {
  10. /*
  11. status
  12. 0:待處理
  13. 1:已處理
  14. */
  15. @Override
  16. public List<User> fetchData(ShardingContext shardingContext) {
  17. List<User> users = null;
  18. /**
  19. * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
  20. */
  21. return users;
  22. }
  23. @Override
  24. public void processData(ShardingContext shardingContext, List<User> data) {
  25. for (User user: data) {
  26. System.out.println(String.format("用戶 %s 開始計息", user.getUserId()));
  27. user.setStatus(1);
  28. /**
  29. * update user
  30. */
  31. }
  32. }
  33. }

  1. <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
  2. sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />

其它功能

上述介紹的是最精簡常用的功能。elastic-job的功能集還不止這些,,比如像作業(yè)事件追蹤,、任務監(jiān)聽等,另外,,elastic-job-lite-console作為一個獨立的運維平臺還提供了用來查詢和操作任務的web頁面,。
這些增強的功能讀者可以在github/elastic-job上自行學習,相信有了本篇博文的基礎(chǔ),,再閱讀那些文檔就特別簡單了,。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點,。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,,謹防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報,。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多