一个大家都在用的分布式的事件流平台…
quickStart
不建议在windows上瞎折腾,没有云主机就装个虚拟机,装个Ubantu玩一下吧。zookeeper在windows下总是莫名崩溃… 建议结合官网quickstart 阅读
下载安装包 直接官网下载 版本2的二进制安装包。版本3中自己集成raft的模式在生产中应用不多。
下载后解压到合适目录,设置kafka环境变量1 2 3 4 5 6 sudo vim /etc/profile.d/my env.sh export KAFKA HOME=/opt/module/kafkaexport PATH-SPATH:SKAFKA HOME/bin/etc/profilesource
修改kafkaServer配置文件 有几个重要配置需要check或修改
1 2 3 4 5 6 7 broker.id =0 log.dirs =/opt/module/kafka/dataszookeeper.connect =localhost:2181 /kafka
启动 从命令行来体会下基本的运用:创建主题、订阅、消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-topics.sh --bootstrap-server localhost:9092 --list bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic first bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
启动常见问题
Cluster ID xxx doesn’t match stored clusterId in meta.properties
Step1: 日志中找到异常ID:p2Ke6DSDzfdcxcfarkcxJscoQ
Step2:cat $KAFKA_HOME/config/server.properties | grep log.dir
Step3: 编辑meta.properties
并重启 1 2 3 4 cluster.id =P2Ka7bKGmJwBduCchqrhsPversion =0 broker.id =0
生产者与消费者 加入依赖
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.0.0</version > </dependency >
java生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Properties properties = new Properties(); properties.put(ProducerConfigBOOTSTRAP_SERVERS_CONFIG, "172.16.90.164:9092" ); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); Callback callM = (metadata, exception) -> { if (exception == null ) { System.out.println("主题:" + metadata.topic() + " ->" + "分区:" + metadata.partition() ); } else { System.out.println("成产消息异常" + exception.getMessage()); exception.printStackTrace(); } }; for (int i = 0 ; i < 100 ; i++) { String msg = "[序号]" +i + "[时间]" + LocalDateTime.now().toString(); Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>("third" , i%3 , String.valueOf(i), msg), callM); TimeUnit.MILLISECONDS.sleep(100L ); } kafkaProducer.close();
java消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.90.164:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_001" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("third" )); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true ); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000 ); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); if (consumerRecords.count() == 0 ) { continue ; } for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); TimeUnit.MILLISECONDS.sleep(100L ); } }
Kafka-Eagle监控 Eagle是一个带大屏的web监控,并支持新建主题、查看消息等强大功能
安装
Ubantu默认安装了Jdk,却不设置home
1 2 3 4 5 6 7 8 readlink -f `which java` sudo vim /etc/profile.d/env.sh export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$PATH :$JAVA_HOME /bin source /etc/profile
需要安装mysql
1 sudo apt install mysql-server
创建数据库和user
1 2 3 4 create database `ke` default character set utf8mb4 collate utf8mb4_general_ci; CREATE USER 'ke' @'localhost' IDENTIFIED BY 'ke@123456' ; GRANT ALL PRIVILEGES ON ke.* to 'ke' @'localhost' ; flush privileges;
修改配置文件
1 2 3 4 5 6 vim conf/system-config.properties efak.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181/kafka bin/ke.sh restart