環境:CentOS Linux release 8.1.1911 (Core) + zookeeper-3.6.2+kafka_2.13-2.7.0(2.13是scala版本,2.7.0是kafka版本,kafa使用scala開發的)+ jdk8
建立主題bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2
解釋:建立名為test的主題,其中partitions 4 為4個分割槽(分片),--replication-factor 2 每個分割槽(分片)都有2個副本(1個leader,1個replication)。
檢視主題資訊bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
檢視名為test的主題資訊
結果:
解釋:
第一行:Topic: test PartitionCount: 4 ReplicationFactor: 2 Configs:
統計資訊:主題test,分割槽(分片)4個,副本數2個。
第二行:Topic: test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
主題test 分割槽(分片)編號為0的領導者(Leader)是broker.id=1的這個節點,其中副本在broker.id=1,2的兩個節點上,isr 副本都已同步的的節點集合,這個集合中的所有節點都是存活狀態,並且跟leader同步。
檢視所有的主題資訊bin/kafka-topics.sh --list --zookeeper localhost:2181
修改主題的分割槽(分片)數量
bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6
分割槽數只能增加不能減少
刪除主題bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
生產訊息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
消費訊息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1
--from-beginning:從頭開始消費訊息。當停止消費端後再執行上面的命令訊息會又從頭開始接受一遍。
--group xx: 組名。
檢視消費者組資訊 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --members
檢視具體組的連線資訊。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1
檢視當前消費者組每個分割槽訊息偏移量資訊及客戶端資訊。
檢視訊息目錄資訊bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list test
檢視日誌檔案
bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-log
Java程式碼測試生產及消費訊息
pom.xml新增依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
生產者:
public class KafkaProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094"); // group.id,指定了消費者所屬群組 props.put("group.id", "g1"); // 開啟自動提交 offset,關於offset提交 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props) ; int i = 0; for (int n = 0; n < 10; n++) { ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg + " + i) ; producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace() ; } } }) ; TimeUnit.SECONDS.sleep(1) ; } } }
消費者:
public class KafkaDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094"); // group.id,指定了消費者所屬群組 props.put("group.id", "g1"); // 開啟自動提交 offset,關於offset提交,我們後續再來詳細說說 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); /** * What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul> */ // 想要從頭開始消費需要具備兩個條件:1,換組名;2,auto.offset.reset 設定為earliest // 如果只是換了組而沒有配置auto.offset.reset為earliest(預設:latest)將不會生效 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") ; KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props) ; // 檢視分割槽資訊 // List<PartitionInfo> partitions = consumer.partitionsFor("second") ; // System.out.println(partitions) ; // 手動控制分割槽 /*TopicPartition p0 = new TopicPartition("second", 0) ; TopicPartition p1 = new TopicPartition("second", 1) ; consumer.assign(Arrays.asList(p0, p1)) ;*/ //訂閱消費主題,可以是多個主題 consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("接受到 = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); TimeUnit.MILLISECONDS.sleep(500) ; } } }
結果:
注意:kafka和我測試程式不在一臺電腦上,所以如果你是遠端訪問kafka那麼需要修改對應的配置檔案%KAFKA_HOME%\config\server.properties檔案開啟遠端訪問功能
下期出個springboot的整合
完畢!!!