原文連結:https://mp.weixin.qq.com/s/8mbNIBDQw2gMHPkmHfj1JQ
在前幾篇文章中,我們詳細的介紹了 Quartz 的架構原理以及應用實踐,雖然 Quartz 也可以透過叢集方式來保證服務高可用,但是它也有一個的弊端,那就是服務節點數量的增加,並不能提升任務的執行效率,即不能實現水平擴充套件!
之所以產生這樣的結果,是因為 Quartz 在分散式叢集環境下是透過資料庫鎖方式來實現有且只有一個有效的服務節點來執行服務,從而保證服務在叢集環境下定時任務不會被重複呼叫!
如果需要執行的定時任務很少的話,使用 Quartz 不會有太大的問題,但是如果 現在有這麼一個需求,例如理財產品,每天6點系統需要計算每個賬戶昨天的收益,假如這個理財產品,有幾個億的使用者,如果都在一個服務例項上跑,可能第二天都無法處理完這項任務!
類似這樣場景還有很多很多,很顯然 Quartz 很難滿足我們這種大批次、任務執行週期長的任務排程!
因此短板,噹噹網基於 Quartz 開發了一套適合在分散式環境下能高效率的使用伺服器資源的 Elastic-Job 定時任務框架!
Elastic-Job-Lite最大的亮點就是支援彈性擴容縮容,怎麼實現的呢?
比如現在有個任務要執行,如果將任務進行分片成10個,那麼可以同時在10個服務例項上並行執行,互相不影響,從而大大的提升了任務執行效率,並且充分的利用伺服器資源!
對於上面的理財產品,如果這個任務需要處理1個億使用者,那麼我們可以透過水平擴充套件,比如對任務進行分片為500,讓500個服務例項同時執行,每個服務例項處理20萬條資料,不出意外的話,1 - 2個小時可以全部跑完,如果時間還是很長,還可以繼續水平擴張,新增服務例項來執行!
2015 年,噹噹網將其開源,瞬間吸引了一大批程式設計師的關注,同時登頂開源中國第一名!
下面我們就一起來了解一下這款使用非常廣泛的分散式排程框架。
二、專案架構介紹Elastic-Job 最開始只有一個 elastic-job-core 的專案,定位輕量級、無中心化,最核心的服務就是支援彈性擴容和資料分片!
從 2.X 版本以後,主要分為 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子專案。
其中,Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 務 的 協 調 服 務 。
而 Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及程序隔離等服務(跟 Lite 的區別只是部署方式不同,他們使用相同的 API,只要開發一次)。
今天我們主要介紹的是Elastic-Job-Lite,最主要的功能特性如下:
分散式排程協調:採用 zookeeper 實現註冊中心,進行統一排程。支援任務分片:將需要執行的任務進行分片,實現並行排程。支援彈性擴容縮容:將任務拆分為 n 個任務項後,各個伺服器分別執行各自分配到的任務項。一旦有新的伺服器加入叢集,或現有伺服器下線,elastic-job 將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。當然,還有失效轉移、錯過執行作業重觸發等等功能,大家可以訪問官網文件,以獲取更多詳細資料。
應用在各自的節點執行任務,透過 zookeeper 註冊中心協調。節點註冊、節點選舉、任務分片、監聽都在 E-Job 的程式碼中完成。下圖是官網提供得架構圖。
啥也不用多說了,下面我們直接透過實踐介紹,更容易瞭解裡面是怎麼玩的!
三、應用實踐3.1、zookeeper 安裝elastic-job-lite,是直接依賴 zookeeper 的,因此在開發之前我們需要先準備好對應的 zookeeper 環境,關於 zookeeper 的安裝過程,就不多說了,非常簡單,網上都有教程!
3.2、elastic-job-lite-console 安裝elastic-job-lite-console,主要是一個任務作業視覺化介面管理系統。
可以單獨部署,與平臺不關,主要是透過配置註冊中心和資料來源來抓取資料。
獲取的方式也很簡單,直接訪問https://github.com/apache/shardingsphere-elasticjob地址,然後切換到2.1.5的版本號,然後執行mvn clean install進行打包,獲取對應的安裝包將其解壓,進行bin資料夾啟動服務即可!
如果你的網速像蝸牛一樣的慢,還有一個辦法就是從這個地址https://gitee.com/elasticjob/elastic-job獲取對應的原始碼!
啟動服務後,在瀏覽器訪問http://127.0.0.1:8899,輸入賬戶、密碼(都是root)即可進入控制檯頁面,類似如下介面!
進入之後,將上文所在的 zookeeper 註冊中心進行配置,包括資料庫 mysql 的資料來源也可以配置一下!
3.3、建立工程本文采用springboot來搭建工程為例,建立工程並新增elastic-job-lite依賴!
#zookeeper configzookeeper.serverList=127.0.0.1:2181zookeeper.namespace=example-elastic-job-test
3.4、新建 ZookeeperConfig 配置類@Configuration@ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0")public class ZookeeperConfig { /** * zookeeper 配置 * @return */ @Bean(initMethod = "init") public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList, @Value("${zookeeper.namespace}") String namespace){ return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace)); }}
3.5、新建任務處理類elastic-job支援三種類型的作業任務處理!
Simple 型別作業:Simple 型別用於一般任務的處理,只需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行,與Quartz原生介面相似。Dataflow 型別作業:Dataflow 型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。Script型別作業:Script 型別作業意為指令碼型別作業,支援 shell,python,perl等所有型別指令碼。只需透過控制檯或程式碼配置 scriptCommandLine 即可,無需編碼。執行指令碼路徑可包含引數,引數傳遞完畢後,作業框架會自動追加最後一個引數為作業執行時資訊。3.6、新建 Simple 型別作業編寫一個SimpleJob介面的實現類MySimpleJob,當前工作主要是列印一條日誌。
@Slf4jpublic class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info(String.format("Thread ID: %s, 作業分片總數: %s, " + "當前分片項: %s.當前引數: %s," + "作業名稱: %s.作業自定義引數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); }}
建立一個MyElasticJobListener任務監聽器,用於監聽MySimpleJob的任務執行情況。
@Slf4jpublic class MyElasticJobListener implements ElasticJobListener { private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); }}
建立一個MySimpleJobConfig類,將MySimpleJob其注入到zookeeper。
@Configurationpublic class MySimpleJobConfig { /** * 任務名稱 */ @Value("${simpleJob.mySimpleJob.name}") private String mySimpleJobName; /** * cron表示式 */ @Value("${simpleJob.mySimpleJob.cron}") private String mySimpleJobCron; /** * 作業分片總數 */ @Value("${simpleJob.mySimpleJob.shardingTotalCount}") private int mySimpleJobShardingTotalCount; /** * 作業分片引數 */ @Value("${simpleJob.mySimpleJob.shardingItemParameters}") private String mySimpleJobShardingItemParameters; /** * 自定義引數 */ @Value("${simpleJob.mySimpleJob.jobParameters}") private String mySimpleJobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MySimpleJob mySimpleJob() { return new MySimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { //配置任務監聽器 MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); // 定義SIMPLE型別配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; }}
在配置檔案application.properties中配置好對應的mySimpleJob引數!
#elastic job#simpleJob型別的jobsimpleJob.mySimpleJob.name=mySimpleJobsimpleJob.mySimpleJob.cron=0/15 * * * * ?simpleJob.mySimpleJob.shardingTotalCount=3simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=csimpleJob.mySimpleJob.jobParameters=helloWorld
執行程式,看看效果如何?
在上圖demo中,配置的分片數為3,這個時候會有3個執行緒進行同時執行任務,因為都是在一臺機器上執行的,這個任務被執行來3次,下面修改一下埠配置,建立三個相同的服務例項,在看看效果如下:
很清晰的看到任務被執行一次!
3.7、新建 DataFlowJob 型別作業DataFlowJob 型別的任務配置和SimpleJob類似,操作也很簡單!
建立一個DataflowJob型別的實現類MyDataFlowJob。
@Slf4jpublic class MyDataFlowJob implements DataflowJob<String> { private boolean flag = false; @Override public List<String> fetchData(ShardingContext shardingContext) { log.info("開始獲取資料"); if (flag) { return null; } return Arrays.asList("qingshan", "jack", "seven"); } @Override public void processData(ShardingContext shardingContext, List<String> data) { for (String val : data) { // 處理完資料要移除掉,不然就會一直跑,處理可以在上面的方法裡執行。這裡採用 flag log.info("開始處理資料:" + val); } flag = true; }}
接著建立MyDataFlowJob的配置類,將其注入到zookeeper註冊中心。
Configurationpublic class MyDataFlowJobConfig { /** * 任務名稱 */ @Value("${dataflowJob.myDataflowJob.name}") private String jobName; /** * cron表示式 */ @Value("${dataflowJob.myDataflowJob.cron}") private String jobCron; /** * 作業分片總數 */ @Value("${dataflowJob.myDataflowJob.shardingTotalCount}") private int jobShardingTotalCount; /** * 作業分片引數 */ @Value("${dataflowJob.myDataflowJob.shardingItemParameters}") private String jobShardingItemParameters; /** * 自定義引數 */ @Value("${dataflowJob.myDataflowJob.jobParameters}") private String jobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MyDataFlowJob myDataFlowJob() { return new MyDataFlowJob(); } @Bean(initMethod = "init") public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業核心配置 JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); // 定義DATAFLOW型別配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false); // 定義Lite作業根配置 LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build(); return dataflowJobRootConfig; }}
最後,在配置檔案application.properties中配置好對應的myDataflowJob引數!
#dataflow型別的jobdataflowJob.myDataflowJob.name=myDataflowJobdataflowJob.myDataflowJob.cron=0/15 * * * * ?dataflowJob.myDataflowJob.shardingTotalCount=1dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=cdataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter
執行程式,看看效果如何?
需要注意的地方是,如果配置的是流式處理型別,它會不停的拉取資料、處理資料,在拉取的時候,如果返回為空,就不會處理資料!
如果配置的是非流式處理型別,和上面介紹的simpleJob型別,處理一樣!
3.8、新建 ScriptJob 型別作業ScriptJob 型別的任務配置和上面類似,主要是用於定時執行某個指令碼,一般用的比較少!
因為目標是指令碼,沒有執行的任務,所以無需編寫任務作業型別!
只需要編寫一個ScriptJob型別的配置類即可,命令是echo 'Hello World !內容!
@Configurationpublic class MyScriptJobConfig { /** * 任務名稱 */ @Value("${scriptJob.myScriptJob.name}") private String jobName; /** * cron表示式 */ @Value("${scriptJob.myScriptJob.cron}") private String jobCron; /** * 作業分片總數 */ @Value("${scriptJob.myScriptJob.shardingTotalCount}") private int jobShardingTotalCount; /** * 作業分片引數 */ @Value("${scriptJob.myScriptJob.shardingItemParameters}") private String jobShardingItemParameters; /** * 自定義引數 */ @Value("${scriptJob.myScriptJob.jobParameters}") private String jobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean(initMethod = "init") public JobScheduler scriptJobScheduler() { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業核心配置 JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); // 定義SCRIPT型別配置 ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'"); // 定義Lite作業根配置 LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build(); return scriptJobRootConfig; }}
在配置檔案application.properties中配置好對應的myScriptJob引數!
#script型別的jobscriptJob.myScriptJob.name=myScriptJobscriptJob.myScriptJob.cron=0/15 * * * * ?scriptJob.myScriptJob.shardingTotalCount=3scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=cscriptJob.myScriptJob.jobParameters=myScriptJobParamter
執行程式,看看效果如何?
3.9、將任務狀態持久化到資料庫可能有的人會發出疑問,elastic-job是如何儲存資料的,用ZooInspector客戶端連結zookeeper註冊中心,你發現對應的任務配置被儲存到相應的樹根上!
而具體作業任務執行軌跡和狀態結果是不會儲存到zookeeper,需要我們在專案中透過資料來源方式進行持久化!
將任務狀態持久化到資料庫配置過程也很簡單,只需要在對應的配置類上注入資料來源即可,以MySimpleJobConfig為例,程式碼如下:
@Configurationpublic class MySimpleJobConfig { /** * 任務名稱 */ @Value("${simpleJob.mySimpleJob.name}") private String mySimpleJobName; /** * cron表示式 */ @Value("${simpleJob.mySimpleJob.cron}") private String mySimpleJobCron; /** * 作業分片總數 */ @Value("${simpleJob.mySimpleJob.shardingTotalCount}") private int mySimpleJobShardingTotalCount; /** * 作業分片引數 */ @Value("${simpleJob.mySimpleJob.shardingItemParameters}") private String mySimpleJobShardingItemParameters; /** * 自定義引數 */ @Value("${simpleJob.mySimpleJob.jobParameters}") private String mySimpleJobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Autowired private DataSource dataSource;; @Bean public MySimpleJob stockJob() { return new MySimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { //新增事件資料來源配置 JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); // 定義SIMPLE型別配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; }}
同時,需要在配置檔案application.properties中配置好對應的datasource引數!
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-testspring.datasource.username=rootspring.datasource.password=rootspring.datasource.driver-class-name=com.mysql.jdbc.Driver
執行程式,然後在elastic-job-lite-console控制檯配置對應的資料來源!
四、小結本文主要圍繞elasticjob的使用進行簡單介紹,希望大家有所收穫!
在分散式環境環境下,elastic-job-lite支援的彈性擴容、任務分片是最大的亮點,在實際使用的時候,任務分片總數儘可能大於服務例項個數,並且是倍數關係,這樣任務在分片的時候,會更加均勻!
如果想深入的瞭解elasticjob,大家可以訪問官方文件,獲取更加詳細的使用教程!