-
1 # 愛可生雲資料庫
-
2 # YagrXu
分散式任務本身涉及到分散式構架,基於不同的任務處理量和反應時間可以考慮 hadoop spark flink 等。 定時最好用serverless的scheduler 觸發。
-
3 # tryetry
需要一個全域性協調器,記master。接下來就是要幹活的應用伺服器,記slave。定時器而且是分散式的,就要求每個伺服器在指定時間都執行相同任務,而且實現對任務的管理如暫停,啟動,停止等!甚至可以建立指定的定時器。還要保證任務執行的原子性!定時任務由master統一分發給每個slave!之後每個slave將處理結果返回給master。由master統一儲存或是回滾!
實現的技術比較多:支援分散式的如zookeeper,系統資訊互動的如netty,支援定時器的如quartz。
-
4 # IT實戰聯盟
一、在JAVA開發領域,目前可以透過以下幾種方式進行定時任務
1、單機部署模式
Timer:jdk中自帶的一個定時排程類,可以簡單的實現按某一頻度進行任務執行。提供的功能比較單一,無法實現複雜的排程任務。
ScheduledExecutorService:也是jdk自帶的一個基於執行緒池設計的定時任務類。其每個排程任務都會分配到執行緒池中的一個執行緒執行,所以其任務是併發執行的,互不影響。
Spring Task:Spring提供的一個任務排程工具,支援註解和配置檔案形式,支援Cron表示式,使用簡單但功能強大。
Quartz:一款功能強大的任務排程器,可以實現較為複雜的排程功能,如每月一號執行、每天凌晨執行、每週五執行等等,還支援分散式排程,就是配置稍顯複雜。
2、分散式叢集模式(不多介紹,簡單提一下)
問題:
I、如何解決定時任務的多次執行?
II、如何解決任務的單點問題,實現任務的故障轉移?
問題I的簡單思考:
1、固定執行定時任務的機器(可以有效避免多次執行的情況 ,缺點就是單點故障問題)。
2、藉助Redis的過期機制和分散式鎖。
3、藉助mysql的鎖機制等。
成熟的解決方案:
1、Quartz:可以去看看這篇文章[Quartz分散式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。
2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)噹噹開發的彈性分散式任務排程系統,採用zookeeper實現分散式協調,實現任務高可用以及分片。
3、xxl-job:(https://github.com/xuxueli/xxl-job)是大眾點評員釋出的分散式任務排程平臺,是一個輕量級分散式任務排程框架。
4、saturn:(https://github.com/vipshop/Saturn) 是唯品會提供一個分散式、容錯和高可用的作業排程服務框架。
二、SpringTask實現定時任務(這裡是基於springboot)
1、簡單的定時任務實現
使用方式:
使用@EnableScheduling註解開啟對定時任務的支援。
使用@Scheduled 註解即可,基於corn、fixedRate、fixedDelay等一些定時策略來實現定時任務。
使用缺點:
1、多個定時任務使用的是同一個排程執行緒,所以任務是阻塞執行的,執行效率不高。
2、其次如果出現任務阻塞,導致一些場景的定時計算沒有實際意義,比如每天12點的一個計算任務被阻塞到1點去執行,會導致結果並非我們想要的。
使用優點:
1、配置簡單
2、適用於單個後臺執行緒執行週期任務,並且保證順序一致執行的場景
原始碼分析:
//預設使用的排程器
if(this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//可以看到SingleThreadScheduledExecutor指定的核心執行緒為1,說白了就是單執行緒執行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延時佇列作為任務的存放佇列,這樣便可以實現任務延遲執行或者定時執行
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2、實現併發的定時任務
使用方式:
方式一:由1中我們知道之所以定時任務是阻塞執行,是配置的執行緒池決定的,那就好辦了,換一個不就行了!直接上程式碼:
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Autowired
private TaskScheduler myThreadPoolTaskScheduler;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
//簡單粗暴的方式直接指定
//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
//也可以自定義的執行緒池,方便執行緒的使用與維護,這裡不多說了
scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
}
}
@Bean(name = "myThreadPoolTaskScheduler")
public TaskScheduler getMyThreadPoolTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//排程器shutdown被呼叫時等待當前被排程的任務完成
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//等待時長
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
方式二:方式一的本質改變了任務排程器預設使用的執行緒池,接下來這種是不改變排程器的預設執行緒池,而是把當前任務交給一個非同步執行緒池去執行
廢話太多,直接上程式碼:
@Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
@Async("myThreadPoolTaskExecutor")
public void scheduledTest02(){
System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
}
//自定義執行緒池
@Bean(name = "myThreadPoolTaskExecutor")
public TaskExecutor getMyThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(200);
taskExecutor.setQueueCapacity(25);
taskExecutor.setKeepAliveSeconds(200);
taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
// 執行緒池對拒絕任務(無執行緒可用)的處理策略,目前只支援AbortPolicy、CallerRunsPolicy;預設為後者
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//排程器shutdown被呼叫時等待當前被排程的任務完成
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待時長
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.initialize();
return taskExecutor;
}
首先使用@EnableAsync 啟用非同步任務
然後在定時任務的方法加上@Async即可,預設使用的執行緒池為SimpleAsyncTaskExecutor(該執行緒池預設來一個任務建立一個執行緒,就會不斷建立大量執行緒,極有可能壓爆伺服器記憶體。當然它有自己的限流機制,這裡就不多說了,有興趣的自己翻翻原始碼~)
專案中為了更好的控制執行緒的使用,我們可以自定義我們自己的執行緒池,使用方式@Async("myThreadPool")
執行緒池的使用心得(後續有專門文章來探討)
java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,對應與spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基礎上增加了新的特性,在spring環境下更容易使用和控制。
使用自定義的執行緒池能夠避免一些預設執行緒池造成的記憶體溢位、阻塞等等問題,更貼合自己的服務特性
使用自定義的執行緒池便於對專案中執行緒的管理、維護以及監控。
即便在非spring環境下也不要使用java預設提供的那幾種執行緒池,坑很多,阿里程式碼規約不說了嗎,得相信大廠!!!
三、動態定時任務的實現
問題:
使用@Scheduled註解來完成設定定時任務,但是有時候我們往往需要對週期性的時間的設定會做一些改變,或者要動態的啟停一個定時任務,那麼這個時候使用此註解就不太方便了,原因在於這個註解中配置的cron表示式必須是常量,那麼當我們修改定時引數的時候,就需要停止服務,重新部署。
解決辦法:
方式一:實現SchedulingConfigurer介面,重寫configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不過需要注意的是,此種方式修改了配置值後,需要在下一次排程結束後,才會更新排程器,並不會在修改配置值時實時更新,實時更新需要在修改配置值時額外增加相關邏輯處理。
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Autowired
private TaskScheduler myThreadPoolTaskScheduler;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
//可以實現動態調整定時任務的執行頻率
scheduledTaskRegistrar.addTriggerTask(
//1.新增任務內容(Runnable)
() -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
//2.設定執行週期(Trigger)
triggerContext -> {
//2.1 從資料庫動態獲取執行週期
String cron = "0/2 * * * * ? ";
//2.2 合法性校驗.
// if (StringUtils.isEmpty(cron)) {
// // Omitted Code ..
// }
//2.3 返回執行週期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
首先,我們要認識下這個排程類,它其實是對java中ScheduledThreadPoolExecutor的一個封裝改進後的產物,主要改進有以下幾點:
1、提供預設配置,因為是ScheduledThreadPoolExecutor,所以只有poolSize這一個預設引數。
2、支援自定義任務,透過傳入Trigger引數。
3、對任務出錯處理進行最佳化,如果是重複性的任務,不丟擲異常,透過日誌記錄下來,不影響下次執行,如果是隻執行一次的任務,將異常往上拋。
順便說下ThreadPoolTaskExecutor相對於ThreadPoolExecutor的改進點:
1、提供預設配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他沒有預設配置
2、實現AsyncListenableTaskExecutor介面,支援對FutureTask新增success和fail的回撥,任務成功或失敗的時候回執行對應回撥方法。
3、因為是spring的工具類,所以丟擲的RejectedExecutionException也會被轉換為spring框架的TaskRejectedException異常(這個無所謂)
4、提供預設ThreadFactory實現,直接透過引數過載配置
扯了這麼多,還是直接上程式碼:
@Component
public class DynamicTimedTask {
private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
//利用建立好的排程類統一管理
//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
//接受任務的返回結果
private ScheduledFuture<?> future;
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
//例項化一個執行緒池任務排程類,可以使用自定義的ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
return new ThreadPoolTaskScheduler();
}
/**
* 啟動定時任務
* @return
*/
public boolean startCron() {
boolean flag = false;
//從資料庫動態獲取執行週期
String cron = "0/2 * * * * ? ";
future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
if (future!=null){
flag = true;
logger.info("定時check訓練模型檔案,任務啟動成功!!!");
}else {
logger.info("定時check訓練模型檔案,任務啟動失敗!!!");
}
return flag;
}
/**
* 停止定時任務
* @return
*/
public boolean stopCron() {
boolean flag = false;
if (future != null) {
boolean cancel = future.cancel(true);
if (cancel){
flag = true;
logger.info("定時check訓練模型檔案,任務停止成功!!!");
}else {
logger.info("定時check訓練模型檔案,任務停止失敗!!!");
}
}else {
flag = true;
logger.info("定時check訓練模型檔案,任務已經停止!!!");
}
return flag;
}
class CheckModelFile implements Runnable{
@Override
public void run() {
//編寫你自己的業務邏輯
System.out.print("模型檔案檢查完畢!!!")
}
}
}
四、總結
基於分散式叢集下的定時任務使用,後續有時間再繼續!!!
-
5 # 會點程式碼的大叔
雖說有現成的框架可以實現,不過還是一步一步地說一下思路。
需求為方便大家的理解,先給大家講一個真實的需求,這是我在第二家公司的一個專案,定時任務每天凌晨執行,需求很簡單:把原始的業務資料,加工處理成待發送的簡訊。
原始資料:姓名-小明,所在地-北京,電話-13800000000,賬單最後還款日期-2018年4月30日。
加工後的資料是:親愛的小明,您的賬單最後還款日期為2018年4月30日,請提前繳費。然後把需要把這條簡訊傳送到13800000000這個手機號上。
定時任務定時任務框架裡面,最有名的就是quartz了,相信大部分Java程式設計師都用過。
我們專案最開始也用的是quartz,只有一個伺服器跑定時任務。但是待處理的資料越來越多,定時服務執行的時間也越來越長,終於有一天,定時任務從晚上跑到了第二天白天也沒有跑完,耽誤了簡訊的傳送。
改造後的定時任務有人就有疑問了,能不能直接把定時服務部署多套不就行了。但是部署多套quartz的話,就會出現問題:待處理的任務有可能會被重複執行。
應對這種問題,我們當時有兩種處理方案:
方案一:定時服務只部署一套,但是定時任務的工作只是提取待處理的任務。
實際的業務處理服務叢集化部署,然後由定式服務提取資料後,傳送給業務處理伺服器進行實際的處理。
方案二:這個是我當時自己想出的一個奇葩的方法,不過這個方案想明白了,對分散式定式服務的理解很有幫助!
定時任務程式部署多套,並且多套環境都是獨立的IP。每套程式定時將IP寫入到資料中(一分鐘對錶update一次,並更新時間戳)。多套服務選舉出一臺主伺服器。主伺服器把所有的待處理任務,儘可能平均分配給每一臺伺服器。(IP和待處理任務對應上,也就是每一條待處理任務只能讓分配的IP處理)處理任務的時候,只處理自己IP對應的任務。一臺伺服器掛了,主伺服器負責把它的IP從資料庫中抹掉(三分鐘沒有對錶進行更新的IP,刪除掉),並重新分配這個IP對應的待處理任務。主伺服器掛了,重新選舉出主伺服器。分散式定時任務框架我只用過Elastic-job,所以只給大家介紹一下這個框架。
任務分片:把一個任務拆分成幾個獨立的任務,然後由分散式伺服器分別執行一個或者多個子任務。比如還是上面那個需求,那麼可以按照【所在地】拆分任務,北京的待處理資料是一個子任務,天津的待處理資料是第二個子任務。
Elastic-Job並不直接提供資料處理的功能,實際的資料處理還是需要自己寫,Elastic-Job會將分片任務分配到各個執行中的作業伺服器。
其實發現了沒有,Elastic-Job做的工作,就是我那個主伺服器做的任務分配的工作,把所在地=北京的,分配給伺服器1處理,把所在地=天津的,分配給伺服器2處理;甚至包括監控每臺作業伺服器是否存活,掛掉一臺重新分配待處理任務,也都是Elastic-Job來做的。
-
6 # 濺濺123321
需要一個任務分發叢集和一個任務執行叢集,任務分發叢集三臺機器即可,任務執行叢集依賴你的業務量。任務分發叢集執行分發任務,按照一定的策略將任務分發給執行叢集各個機器。需要解決任務重複分發問題,以及執行任務的server和分發任務的server掛掉的問題,總之就是兩個叢集的高可用。
解決重複分發問題,很簡單,採用主從模式,這涉及選主操作,分發叢集只有一臺主server工作,其他server在主server掛掉後選主繼續分發,選主使用zk非常方便。
執行叢集中server掛掉後,其上任務需要重新分發給其他server,這個問題使用zk也是很方便可以解決
手機打字 細節就不寫了
回覆列表
Apache Zookeeper是我最近遇到的最酷的技術,我是在研究Solr Cloud功能的時候發現的。Solr的分散式計算讓我印象深刻。你只要開啟一個新的例項就能自動在Solr Cloud中找到。它會將自己分派到某個分片中,並確定出自己是一個Leader(源)還是一個副本。不一會兒,你就可以在你的那些伺服器上查詢到了。即便某些伺服器宕機了也可以繼續工作。非常動態、聰明、酷。
將執行多個應用程式作為一個邏輯程式並不是什麼新玩意。事實上,我在幾年前就已寫過類似的軟體。這種架構比較讓人迷惑,使用起來也費勁。為此Apache Zookeeper提供了一套工具用於管理這種軟體。
為什麼叫Zoo?“因為要協調的分散式系統是一個動物園”。
在本篇文章中,我將說明如何使用PHP安裝和整合Apache ZooKeeper。我們將透過service來協調各個獨立的PHP指令碼,並讓它們同意某個成為Leader(所以稱作Leader選舉)。當Leader退出(或崩潰)時,worker可檢測到並再選出新的leader。
ZooKeeper是一箇中性化的Service,用於管理配置資訊、命名、提供分散式同步,還能組合Service。所有這些種類的Service都會在分散式應用程式中使用到。每次編寫這些Service都會涉及大量的修bug和競爭情況。正因為這種編寫這些Service有一定難度,所以通常都會忽視它們,這就使得在應用程式有變化時變得難以管理應用程式。即使處理得當,實現這些服務的不同方法也會使得部署應用程式變得難以管理。
雖然ZooKeeper是一個Java應用程式,但C也可以使用。這裡就有個PHP的擴充套件,由Andrei Zmievski在2009建立並維護。你可以從PECL中下載,或從GitHub中直接獲取PHP-ZooKeeper。
$ tar zxfv zookeeper-3.4.5.tar.gz $ cd zookeeper-3.4.5/src/c $ ./configure --prefix=/usr/ $ make $ sudo make install
這樣就會安裝ZooKeeper的庫和標頭檔案。現在準備編譯PHP擴充套件。
$ cd$ git clone https://github.com/andreiz/php-zookeeper.git $ cd php-zookeeper $ phpize $ ./configure $ make $ sudo make install
將“zookeeper.so”新增到PHP配置中。
$ vim /etc/php5/cli/conf.d/20-zookeeper.ini
因為我不需要執行在web服務環境下,所以這裡我只編輯了CLI的配置。將下面的行復制到ini檔案中。
extension=zookeeper.so
使用如下命令來確定擴充套件是否已起作用。
$ php -m | grep zookeeper zookeeper
現在是時候執行ZooKeeper了。目前唯一還沒有做的是配置。建立一個用於存放所有service資料的目錄。
$ mkdir /home/you-account/zoo $ cd$ cd zookeeper-3.4.5/ $ cp conf/zoo_sample.cfg conf/zoo.cfg $ vim conf/zoo.cfg
找到名為“dataDir”的屬性,將其指向“/home/you-account/zoo”目錄。
$ bin/zkServer.sh start $ bin/zkCli.sh -server 127.0.0.1:2181[zk: 127.0.0.1:2181(CONNECTED) 14] create /test 1 Created /test[zk: 127.0.0.1:2181(CONNECTED) 19] ls /[test, zookeeper]
此時,你已成功連到了ZooKeeper,並建立了一個名為“/test”的znode(稍後我們會用到)。ZooKeeper以樹形結構儲存資料。這很類似於檔案系統,但“資料夾”(譯者注:這裡指非最底層的節點)又和檔案很像。znode是ZooKeeper儲存的實體。Node(節點)的說法很容易被混淆,所以為了避免混淆這裡使用了znode。
因為我們稍後還會使用,所以這裡我們讓客戶端保持連線狀態。開啟一個新視窗,並建立一個zookeeperdemo1.php檔案。
<?php class ZookeeperDemo extends Zookeeper { public function watcher( $i, $type, $key ) { echo "Insider Watcher\n"; // Watcher gets consumed so we need to set a new one $this->get( "/test", array($this, "watcher" ) ); } } $zoo = new ZookeeperDemo("127.0.0.1:2181");$zoo->get( "/test", array($zoo, "watcher" ) ); while( true ) { echo "."; sleep(2);}
現在執行該指令碼。
$ php zookeeperdemo1.php
此處應該會每隔2秒產生一個點。現在切換到ZooKeeper客戶端,並更新“/test”值。
[zk: 127.0.0.1:2181(CONNECTED) 20] set /test foo
這樣就會靜默觸發PHP指令碼中的“Insider Watcher”訊息。怎麼會這樣的?
ZooKeeper提供了可以繫結在znode的監視器。如果監視器發現znode發生變化,該service會立即通知所有相關的客戶端。這就是PHP指令碼如何知道變化的。Zookeeper::get方法的第二個引數是回撥函式。當觸發事件時,監視器會被消費掉,所以我們需要在回撥函式中再次設定監視器。
現在你可以準備建立分散式應用程式了。其中的挑戰是讓這些獨立的程式決定哪個(是leader)協調它們的工作,以及哪些(是worker)需要執行。這個處理過程叫做leader選舉,在ZooKeeper Recipes and Solutions你能看到相關的實現方法。
這裡簡單來說就是,每個處理(或伺服器)緊盯著相鄰的那個處理(或伺服器)。如果一個已被監視的處理(也即Leader)退出或者崩潰了,監視程式就會查詢其相鄰(此時最老)的那個處理作為Leader。
在真實的應用程式中,leader會給worker分配任務、監控程序和儲存結果。這裡為了簡化,我跳過了這些部分。
建立一個新的PHP檔案,命名為worker.php。
<?php class Worker extends Zookeeper { const CONTAINER = "/cluster"; protected $acl = array( array( "perms" => Zookeeper::PERM_ALL, "scheme" => "world", "id" => "anyone" ) ); private $isLeader = false; private $znode; public function __construct( $host = "", $watcher_cb = null, $recv_timeout = 10000 ) { parent::__construct( $host, $watcher_cb, $recv_timeout ); } public function register() { if( ! $this->exists( self::CONTAINER ) ) { $this->create( self::CONTAINER, null, $this->acl ); } $this->znode = $this->create( self::CONTAINER . "/w-", null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE ); $this->znode = str_replace( self::CONTAINER ."/", "", $this->znode ); printf( "I"m registred as: %s\n", $this->znode ); $watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "Nobody here, I"m the leader\n" ); $this->setLeader( true ); } else { printf( "I"m watching %s\n", $watching ); } } public function watchPrevious() { $workers = $this->getChildren( self::CONTAINER ); sort( $workers ); $size = sizeof( $workers ); for( $i = 0 ; $i < $size ; $i++ ) { if( $this->znode == $workers[ $i ] ) { if( $i > 0 ) { $this->get( self::CONTAINER . "/" . $workers[ $i - 1 ], array( $this, "watchNode" ) ); return $workers[ $i - 1 ]; } return $workers[ $i ]; } } throw new Exception( sprintf( "Something went very wrong! I can"t find myself: %s/%s", self::CONTAINER, $this->znode ) ); } public function watchNode( $i, $type, $name ) { $watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "I"m the new leader!\n" ); $this->setLeader( true ); } else { printf( "Now I"m watching %s\n", $watching ); } } public function isLeader() { return $this->isLeader; } public function setLeader($flag) { $this->isLeader = $flag; } public function run() { $this->register(); while( true ) { if( $this->isLeader() ) { $this->doLeaderJob(); } else { $this->doWorkerJob(); } sleep( 2 ); } } public function doLeaderJob() { echo "Leading\n"; } public function doWorkerJob() { echo "Working\n"; } } $worker = new Worker( "127.0.0.1:2181" );$worker->run();
開啟至少3個終端,在每個終端中執行以下指令碼:
# term1 $ php worker.php I"m registred as: w-0000000001Nobody here, I"m the leader Leading # term2 $ php worker.php I"m registred as: w-0000000002I"m watching w-0000000001 Working # term3 $ php worker.php I"m registred as: w-0000000003I"m watching w-0000000002 Working
現在模擬Leader崩潰的情形。使用Ctrl+c或其他方法退出第一個指令碼。剛開始不會有任何變化,worker可以繼續工作。後來,ZooKeeper會發現超時,並選舉出新的leader。
雖然這些指令碼很容易理解,但是還是有必要對已使用的Zookeeper標誌作註釋。
$this->znode = $this->create( self::CONTAINER . "/w-", null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
每個znode都是EPHEMERAL和SEQUENCE的。
EPHEMRAL代表當客戶端失去連線時移除該znode。這就是為何PHP指令碼會知道超時。SEQUENCE代表在每個znode名稱後新增順序標識。我們透過這些唯一標識來標記worker。
在PHP部分還有些問題要注意。該擴充套件目前還是beta版,如果使用不當很容易發生segmentation fault。比如,不能傳入普通函式作為回撥函式,傳入的必須為方法。我希望更多PHP社群的同仁可以看到Apache ZooKeeper的好,同時該擴充套件也會獲得更多的支援。
ZooKeeper是一個強大的軟體,擁有簡潔和簡單的API。由於文件和示例都做的很好,任何人都可以很容易的編寫分散式軟體。讓我們開始吧,這會很有趣的。