一个大家都在用的分布式的事件流平台…
 
quickStart 
不建议在windows上瞎折腾,没有云主机就装个虚拟机,装个Ubantu玩一下吧。zookeeper在windows下总是莫名崩溃… 建议结合官网quickstart 阅读
 
下载安装包 直接官网下载 版本2的二进制安装包。版本3中自己集成raft的模式在生产中应用不多。  
下载后解压到合适目录,设置kafka环境变量1 2 3 4 5 6 sudo vim /etc/profile.d/my env.sh export  KAFKA HOME=/opt/module/kafkaexport PATH-SPATH:SKAFKA HOME/bin/etc/profilesource 
 
 
修改kafkaServer配置文件 有几个重要配置需要check或修改
1 2 3 4 5 6 7 broker.id =0 log.dirs =/opt/module/kafka/dataszookeeper.connect =localhost:2181 /kafka
 
启动 从命令行来体会下基本的运用:创建主题、订阅、消费  
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  bin/kafka-server-start.sh  -daemon config/server.properties bin/kafka-topics.sh --bootstrap-server localhost:9092 --list bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic first bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic test  bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test  
 
启动常见问题 
Cluster ID xxx doesn’t match stored clusterId in meta.properties 
 
Step1:  日志中找到异常ID:p2Ke6DSDzfdcxcfarkcxJscoQ   
Step2:cat  $KAFKA_HOME/config/server.properties  | grep log.dir   
Step3: 编辑meta.properties并重启  1 2 3 4 cluster.id =P2Ka7bKGmJwBduCchqrhsPversion =0 broker.id =0 
 
 
生产者与消费者 加入依赖  
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.0.0</version > </dependency > 
 
java生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Properties properties = new  Properties(); properties.put(ProducerConfigBOOTSTRAP_SERVERS_CONFIG, "172.16.90.164:9092" ); KafkaProducer<String, String> kafkaProducer = new  KafkaProducer<String, String>(properties); Callback callM = (metadata, exception) -> {     if  (exception == null ) {                  System.out.println("主题:"  + metadata.topic()  + " ->"   + "分区:"  + metadata.partition() );     } else  {                  System.out.println("成产消息异常"  + exception.getMessage());         exception.printStackTrace();     } }; for  (int  i = 0 ; i < 100 ; i++) {         String msg = "[序号]" +i + "[时间]"  + LocalDateTime.now().toString();     Future<RecordMetadata> future = kafkaProducer.send(new  ProducerRecord<>("third" , i%3 , String.valueOf(i), msg), callM);     TimeUnit.MILLISECONDS.sleep(100L ); } kafkaProducer.close(); 
 
java消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Properties properties = new  Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.90.164:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_001" ); KafkaConsumer<String, String> kafkaConsumer = new  KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("third" )); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true ); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000 ); while  (true ) {         ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 ));     if  (consumerRecords.count() == 0 ) {         continue ;     }          for  (ConsumerRecord<String, String> consumerRecord : consumerRecords) {         System.out.println(consumerRecord);         TimeUnit.MILLISECONDS.sleep(100L );     } } 
 
Kafka-Eagle监控 Eagle是一个带大屏的web监控,并支持新建主题、查看消息等强大功能
安装 
Ubantu默认安装了Jdk,却不设置home  
 
1 2 3 4 5 6 7 8 readlink -f `which  java` sudo vim /etc/profile.d/env.sh     export  JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64     export  PATH=$PATH :$JAVA_HOME /bin source  /etc/profile
 
需要安装mysql  
 
1 sudo apt install mysql-server 
 
创建数据库和user  
 
1 2 3 4 create database `ke` default character set  utf8mb4 collate utf8mb4_general_ci; CREATE USER 'ke' @'localhost'  IDENTIFIED BY 'ke@123456' ; GRANT ALL PRIVILEGES ON ke.* to 'ke' @'localhost' ; flush privileges; 
 
修改配置文件
 
1 2 3 4 5 6 vim conf/system-config.properties efak.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181/kafka bin/ke.sh  restart