一、簡介
這裡介紹zookeeper在spring boot中的使用,直接程式碼示例。
二、示例
2.1 新增maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<groupId>com.dragon.study</groupId>
<artifactId>spring-boot-zookeeper</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-zookeeper</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2 新增配置檔案application.yaml
application.yaml檔案內容如下:
server:
port: 10020
spring:
application:
name: spring-boot-zookeeper
zookeeper:
connectString: localhost:2181
maxRetries: 3 # 最大重試次數
baseSleepTimeMs: 1000 #初始休眠時間
2.3 定義zookeeper客戶端
ZkClientFactoryBean.java類如下:
package com.dragon.study.springbootzookeeper.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ZkClientFactoryBean implements FactoryBean<CuratorFramework> {
@Value("${zookeeper.connectString:localhost:2181}")
private String connectString;
@Value("${zookeeper.maxRetries:3}")
private int maxRetries;
@Value("${zookeeper.baseSleepTimeMs:1000}")
private int baseSleepTimeMs;
private CuratorFramework curatorClient;
@PostConstruct
public void init() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
curatorClient = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(retryPolicy)
.build();
curatorClient.start();
}
@Override
public CuratorFramework getObject() throws Exception {
return curatorClient;
}
@Override
public Class<?> getObjectType() {
return CuratorFramework.class;
}
}
2.4 定義zookeeper監聽器
package com.dragon.study.springbootzookeeper.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Optional;
@Component
public class ZkListener {
@Resource
private CuratorFramework curatorClient;
@PostConstruct
public void init() throws Exception {
String path = "/zk";
//監聽當前節點變化,不監聽子節點變化
NodeCache nodeCache = new NodeCache(curatorClient, path);
nodeCache.start();
nodeCache.getListenable().addListener(() -> {
String nodePath = nodeCache.getCurrentData().getPath();
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("nodeChanged,nodePath:" + nodePath + " data:" + data);
});
//監聽子節點變化,不監聽當前節點變化
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorClient, path, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener((curatorFramework, event) -> {
String type = event.getType().name();
System.out.println("pathChildrenCache, type:" + type);
Optional.ofNullable(event.getData()).ifPresent(t->{
String nodePath = t.getPath();
String data = new String(t.getData());
System.out.println("pathChildrenCache, nodePath:" + nodePath + " data:" + data + " type:" + type);
});
});
//監聽當前節點及其所有子節點變化
TreeCache treeCache = new TreeCache(curatorClient, path);
treeCache.start();
treeCache.getListenable().addListener((curatorFramework, event) -> {
String type = event.getType().name();
System.out.println("treeCache, type:" + type);
Optional.ofNullable(event.getData()).ifPresent(t->{
String nodePath = t.getPath();
String data = new String(t.getData());
System.out.println("treeCache, nodePath:" + nodePath + " data:" + data + " type:" + type);
});
});
}
}
2.5 啟動類
package com.dragon.study.springbootzookeeper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootZookeeperApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootZookeeperApplication.class, args);
}
}
2.6 測試
先執行啟動類SpringBootZookeeperApplication,開啟前面zookeeper監聽。接下來定義測試類:
package com.dragon.study.springbootzookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class SpringBootZookeeperApplicationTests {
@Resource
private CuratorFramework curatorClient;
@Test
void zkTest() throws Exception {
//建立結點,輸出如下:
// treeCache, type:NODE_ADDED
// treeCache, nodePath:/zk data: type:NODE_ADDED
// nodeChanged,nodePath:/zk data:
curatorClient.create().creatingParentsIfNeeded().forPath("/zk");
//建立結點,同時設定值,輸出如下:
// treeCache, type:NODE_ADDED
// treeCache, nodePath:/zk data: type:NODE_ADDED
// nodeChanged,nodePath:/zk data:
curatorClient.create().creatingParentsIfNeeded().forPath("/zk", "this is zk1".getBytes());
//單獨設定結點值
curatorClient.setData().forPath("/zk", "this is zk6".getBytes());
//建立包含父結點的結點,輸出如下:
// treeCache, type:NODE_ADDED
// treeCache, nodePath:/zk/one data:this is one type:NODE_ADDED
curatorClient.create().creatingParentsIfNeeded().forPath("/zk/one", "this is one".getBytes());
//單獨設定結點值,輸出如下:
// treeCache, type:NODE_UPDATED
// treeCache, nodePath:/zk/one data:this is zk one2 type:NODE_UPDATED
curatorClient.setData().forPath("/zk/one", "this is zk one2".getBytes());
//事務,執行多個操作
CuratorTransaction curatorTransaction = curatorClient.inTransaction();
curatorTransaction.create().withMode(CreateMode.EPHEMERAL).forPath("/zk/three", "this is three".getBytes())
.and().create().withMode(CreateMode.PERSISTENT).forPath("/zk/four", "this is four".getBytes())
.and().commit();
}
}