首頁>技術>

一、簡介

tbschedule是淘寶開源的,能夠讓批次任務或變化的任務,被動態的分配到不同主機(可分散式)的jvm,不同的執行緒組中並行執行。所有任務能夠不重複,不遺漏的執行。

二、tbschedule知識點

2.1 tbschedule特點

tbschedule特點如下:

能夠讓批次任務或變化的任務,被動態的分配到不同主機(可分散式)的jvm,不同的執行緒組中並行執行。

所有任務能夠不重複,不遺漏的執行。

2.2 tbschedule實現

tbschedule的任務、策略等排程資料是儲存在zookeeper中的。

tbschedule的執行是基於jdk的Timer和TimerTask實現的。

tbschedule中的任務,是依附於策略而執行的。也就是說,任務定義了要執行的行為,包括任務名稱、取資料和資料處理的bean、每次取數的數量、執行的開始與結束時間、任務項等資訊,而策略定義了要執行的任務、在哪臺機器上執行、所有機器最大執行緒組,單個機器執行緒組數等,並控制任務的執行與停止。

三、tbschedule關鍵類

3.1 ZKManager

ZKManager就是最基本的zookeeper會話管理類,內容包括zookeeper的建立、會話的連線或重連線、關閉會話等。

3.2 TBScheduleManagerFactory

TBScheduleManagerFactory是tbschedule管理類,包含的功能有:

3.2.1 配置zookeeper

配置zookeeper,並建立zookeeper會話,zookeeper的配置資訊有:zkConnectString, rootPath, userName, password, zkSessionTimeout, isCheckParentPath

3.2.2 排程任務和排程策略的管理器生成

ScheduleDataManager4ZK,排程任務管理器(對應在zookeeper中的資料),在此進行初始化和生成。

ScheduleStrategyDataManager4ZK,排程策略管理器(對應在zookeeper中的資料),在此進行初始化和生成。

3.2.3 排程服務的重啟、停止等

如stopServer(String strategyName)、stopAll()、reStart()等。

3.3 ScheduleServer

任務處理器(可以理解為執行緒組),由一組執行緒(n個執行緒)組成,每個任務處理器有全域性唯一的識別符號,一般以IP$UUID[例如192.168.1.100$0C78F0C0FA084E54B6665F4D00FA73DC]的形式出現,一個任務型別的資料可以n個任務處理器處理。內分為Sleep模式和NotSleep模式:

3.3.1 sleep模式

sleep模式,當一執行緒處理完任務,同時從任務池取不到任務時,若其它執行緒仍工作,則自己休眠,若其它執行緒已休眠,則新調取需要處理的資料,同時喚醒其它休眠執行緒處理資料;

3.3.2 NotSleep模式

NotSleep模式,當一執行緒處理完任務,同時從任務池取不到任務時,則新調取需要處理的資料,同時喚醒其它休眠執行緒處理資料;

3.4 TaskItem

任務項,也就是將待處理的任務(資料),進行分片劃分,

如:可以按資料的id按10取模,這樣就將數年資料劃分成了0、1、2、3、4、5、6、7、8、9共10個任務項;

也可按資料的首字母分成了A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V、W、X、Y、Z供26個任務項。

這個可以根據需要自行定義的。

3.5 TaskDealBean

自定義的任務處理類,需要實現Schedule的介面IScheduleTaskDealMulti(批處理)或者IScheduleTaskDealSingle(單任務處理),內部主要有兩個方法,一個是篩選需當前任務處理器處理的資料,另一個是處理已篩選好的資料。

3.6 OwnSign

環境,指定執行環境,如:開發環境、測試環境、預發環境、生產環境。在篩選當前任務處理器需處理的資料時,會傳入該引數。

3.7 ScheduleTaskType

任務的配置類,包括執行的執行緒數(threadNumber)、執行時間,任務項分組、沒資料時的休眠時間、每次取數的量等

3.8 ScheduleStrategy

策略的配置類,所有機器的最大執行緒組數(assignNum),單個jvm的執行緒數(numOfSingleServer),執行機器(IPList)等資訊。

四、tbschedule使用

這裡以tbschedule與spring結合為例,介紹使用的開發步驟。其中儘可能多的理解tbschedule和spring後,能理解tbschedule與spring整合的方式其實有多種形式的,在此介紹一種方式 ,任務和策略在spring啟動時進行自動註冊。

4.1 新增maven依賴

另一種方式是直接下載原始碼,這種方式更好,因為目前版本的tbschedule是有bug的,如檢視zookeeper連線資訊時,tbschedule原始碼是新建已在程式碼寫死的屬性,或者讀取tomcat中未有的tbschedule配置類,這是不對,應直接讀取已有的zookeeper屬性,當然直接maven依賴也不影響使用),spring依賴包在此略過。

<dependency>

<groupId>com.taobao.pamirs.schedule</groupId>

<artifactId>tbschedule</artifactId>

<version>3.2.18</version>

</dependency>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.6</version>

</dependency>

4.2 下載tbschedule原始碼

下載地址:http://code.taobao.org/p/tbschedule, 將路徑下的tbschedule/branches /3.1.0/src/WebRoot/schedule資料夾複製到工程webapp下,裡面tbschdule的jsp檔案,便於頁面檢視與控制任務執行。

4.3 編寫自定義抽象任務類

編寫自定義抽象任務類,內包含排程任務配置類scheduleTaskType和排程策略配置類scheduleStrategy,便於過會對任務進行自動註冊任務和策略資訊,該類同時可實現介面IScheduleTaskDealSingle,繼承該抽象類的任務類實現其中的方法,如AbstractBaseScheduleTask.java:

public abstract class AbstractBaseScheduleTask<T> implements IScheduleTaskDealSingle<T> {

/**

* 排程任務的配置

*/

private ScheduleTaskType scheduleTaskType;

/**

* 排程策略的配置

*/

private ScheduleStrategy scheduleStrategy;

public ScheduleTaskType getScheduleTaskType() {

return scheduleTaskType;

}

public void setScheduleTaskType(ScheduleTaskType scheduleTaskType) {

this.scheduleTaskType = scheduleTaskType;

}

public ScheduleStrategy getScheduleStrategy() {

return scheduleStrategy;

}

public void setScheduleStrategy(ScheduleStrategy scheduleStrategy) {

this.scheduleStrategy = scheduleStrategy;

}

}

4.4 編寫啟動類

編寫啟動類,繼承TBScheduleManagerFactory類,用於配置zookeeper資訊,實現ApplicationListener介面,並實現其內部方法,用於在spring容器啟動後,載入排程任務和排程策略的配置資訊到zookeeper中,如類SystemTBScheduleManagerFactory.java :

public class SystemTBScheduleManagerFactory extends TBScheduleManagerFactory implements ApplicationListener<ContextRefreshedEvent>{

@Override

public void onApplicationEvent(ContextRefreshedEvent event) { //註冊排程任務和排程策略

try {

super.init(); //預設初始化資訊

IScheduleDataManager iScheduleDataManager = null;

ScheduleStrategyDataManager4ZK scheduleStrategyDataManager4ZK = null;

int waitSecond = 120; //預設初始化等待時間,最長120秒

while((null == iScheduleDataManager || null == scheduleStrategyDataManager4ZK) && waitSecond>0){

waitSecond--;

TimeUnit.SECONDS.sleep(1); //等待1秒

try{

iScheduleDataManager = super.getScheduleDataManager();//獲取排程任務管理器

scheduleStrategyDataManager4ZK = super.getScheduleStrategyManager();//獲取排程策略管理器

}catch (Exception e){

}

}

Assert.notNull(iScheduleDataManager,"初始化tbschedule配置資訊失敗"); //若仍初始化失敗,則拋異常

Assert.notNull(scheduleStrategyDataManager4ZK,"初始化tbschedule配置資訊失敗"); //若仍初始化失敗,則拋異常

Map<String,AbstractBaseScheduleTask> taskMap = event.getApplicationContext().getBeansOfType(AbstractBaseScheduleTask.class);

for(Map.Entry<String,AbstractBaseScheduleTask> m : taskMap.entrySet()){

String key = m.getKey();

AbstractBaseScheduleTask task = m.getValue();

ScheduleTaskType taskType = task.getScheduleTaskType();

taskType.setBaseTaskType("task_"+key); //任務型別(任務名稱)

taskType.setDealBeanName(key);

ScheduleStrategy scheduleStrategy = task.getScheduleStrategy();

scheduleStrategy.setStrategyName("strategy_"+key); //策略名稱

scheduleStrategy.setTaskName(taskType.getBaseTaskType()); //任務名稱

scheduleStrategy.setKind(ScheduleStrategy.Kind.Schedule);

iScheduleDataManager.updateBaseTaskType(taskType);

scheduleStrategyDataManager4ZK.updateScheduleStrategy(scheduleStrategy);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

4.5 編寫任務類

編寫任務類,繼承前面自定義的抽象類AbstractBaseScheduleTask.java,如:

public class SimpleTask extends AbstractBaseScheduleTask<Date> {

/**

* 執行單個任務

* @param task Object

* @param ownSign 當前環境名稱

* @throws Exception

*/

public boolean execute(Date task, String ownSign) throws Exception{

System.out.println("dispose task : "+task.getTime()); //當前任務處理器內的執行緒處理資料

return true;

}

/**

* 根據條件,查詢當前排程伺服器可處理的任務

* @param taskParameter 任務的自定義引數

* @param ownSign 當前環境名稱

* @param taskItemNum 當前任務型別的任務佇列數量

* @param taskItemList 當前排程伺服器,分配到的可處理佇列

* @param eachFetchDataNum 每次獲取資料的數量

* @return

* @throws Exception

*/

public List<Date> selectTasks(String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {

List<Date> dateList = new ArrayList<>();

List<Long> taskIdList = new ArrayList<>();

for(TaskItemDefine t : taskItemList){ //確定當前任務處理器需處理的任務項id

taskIdList.add(Long.valueOf(t.getTaskItemId()));

}

for(int i=0;i<eachFetchDataNum;i++){ // 新增最多指定數量的待處理資料

Date date = new Date(); //生成待處理資料

Long remainder = date.getTime() % taskItemNum ;

if(taskIdList.contains(remainder)){ //根據資料取模,判斷當前待處理資料,是否應由當前任務處理器處理

dateList.add(date);

}

TimeUnit.SECONDS.sleep(1);

}

return dateList; //返回當前任務處理器需要處理的資料

}

/**

* 獲取任務的比較器,主要在NotSleep模式下需要用到

* @return

*/

public Comparator<Date> getComparator() {

return null;

}

}

6、配置spring檔案spring-tbschedule.xml

配置spring檔案spring-tbschedule.xml,包含啟動類和任務類配置。

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd

">

<bean id="systemTBScheduleManagerFactory" class="com.dragon.tbscheduleStudy.system.SystemTBScheduleManagerFactory">

<property name="zkConfig">

<map>

<entry key="zkConnectString" value="127.0.0.1:2181" />

<entry key="rootPath" value="/myself/tbscheduleStudy" />

<entry key="userName" value="root" />

<entry key="password" value="123456" />

<entry key="zkSessionTimeout" value="8000" />

<entry key="isCheckParentPath" value="true" />

</map>

</property>

</bean>

<bean id="simpleTask" class="com.dragon.tbscheduleStudy.task.SimpleTask" >

<property name="scheduleTaskType">

<bean class="com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType">

<property name="permitRunStartTime" value="0 0 0 * * ?" />

<property name="permitRunEndTime" value="59 59 23 * * ?" />

<property name="sleepTimeNoData" value="3000" />

<property name="sleepTimeInterval" value="1000" />

<property name="fetchDataNumber" value="10" />

<property name="taskItems">

<list>

<value>0:{TYPE=A,KIND=1}</value>

<value>1:{TYPE=B,KIND=2}</value>

<value>2:{TYPE=C,KIND=3}</value>

</list>

</property>

</bean>

</property>

<property name="scheduleStrategy">

<bean class="com.taobao.pamirs.schedule.strategy.ScheduleStrategy">

<property name="assignNum" value="9" />

<property name="numOfSingleServer" value="3" />

<property name="IPList">

<list>

<value>127.0.0.1</value>

</list>

</property>

</bean>

</property>

</bean>

</beans>

4.7 配置web.xml檔案

配置web.xml檔案,如:

<?xml version="1.0" encoding="UTF-8"?>

<web-app xmlns="http://java.sun.com/xml/ns/javaee"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://java.sun.com/xml/ns/javaee

http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"

version="3.0">

<context-param>

<param-name>contextConfigLocation</param-name>

<param-value>classpath:spring-tbschedule.xml</param-value>

</context-param>

<listener>

<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>

</listener>

</web-app>

4.8 修改專案index.jsp頁面

修改專案的index.jsp頁面,重定向到tbschedule首頁,如:

<%

response.sendRedirect("schedule/index.jsp");

%>

至此,開發流程結束,

啟動專案,在瀏覽器輸入http://localhost:8080/schedule/index.jsp,可檢視任務情況,

若想修改配置,則輸入地址http://localhost:8080/schedule/index.jsp?manager=true即可

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 一致性hash演算法