Kafka:分布式,支持分区,复制,基于zookeeper协调的消息中间件
Kafka结构: 生产者:生产信息,并将消息推送到指定的kafka节点的分区中 kafka-server:消息队列,存储生产者推送的消息,并将消息推送给消费者消费 topic:主题,对不同类型的消息可以通过创建不同的主题进行区分 partition:分区,消息的载体,是一个日志文件,文件中存储的是生产者推送的消息,文件中的消息被成功消费之后不会被清除,在指定的时间周期内会被保留,分区支持复制,可以在其他节点创建从分区,消息的接收和推动都是由主分区完成,存储同一类型消息的分区属于同一个主题,也就是说可以把一个主题划分为一个或多个分区存储消息 Offset:偏移量,因为被消费成功的消息不会在分区中删除,所以消费者可以通过指定偏移量来消费不同的消息 消费者组:消费者以组的形式向kafk获取消息,组中是多个消费者实例,要求属于同一个组的消费者不能消费同一个分区中的消息,不同组的消费者可以消费同一个分区中的消息 Zookeeper:服务注册中心,分布式架构中的协调程序,用于记录每隔kafka节点的信息以及创建的主题,分区,消费者所使用的偏移量的信息,实现kafka节点之间的通信
节点状态: follower:从节点,备份 leader:主节点,工作
服务器信息
IP
Hostname
172.16.1.12
zoo-kafka-A
172.16.1.13
zoo-kafka-B
172.16.1.14
zoo-kafka-C
加压软件包 1 2 3 4 5 6 #所有节点执行 tar zxf kafka_2.11-1.0.1.tgz tar zxf zookeeper-3.3.6.tar.gz mv zookeeper-3.3.6 /usr/local/zookeeper cp zoo_sample.cfg zoo.cfg mv kafka_2.11-1.0.1 /usr/local/kafka
配置zookeeper 创建数据、日志存储目录 1 2 mkdir /usr/local/zookeeper/data mkdir /usr/local/zookeeper/datalog
修改配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # The number of milliseconds of each tick tickTime=2000 # 节点之间发送心跳包的时间,单位毫秒 # The number of ticks that the initial # synchronization phase can take initLimit=10 # 单位个,乘以心跳包就是时间,新加入节点初始化的时间 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # 单位个,乘以心跳包就是时间,节点连接的超时等待时间 vi zoo.cfg dataDir=/usr/local/zookeeper/data # 数据存放目录 dataLogDir=/usr/local/zookeeper/datalog # 日志存放目录 clientPort=2181 # 集群信息 server.1=172.16.1.12:2888:3888 # 节点之间通讯的端口和节点之间选取leader的端口 server.2=172.16.1.13:2888:3888 server.3=172.16.1.14:2888:3888
2181端口是zookeeper和kafka通信的端口 2888端口是三个节点互相通信的端口 3888端口是三个节点之间选举主主节点的端口
拷贝配置文件到其他节点 1 2 scp /usr/local/zookeeper/conf/zoo.cfg root@172.16.1.13:/usr/local/zookeeper/conf/ scp /usr/local/zookeeper/conf/zoo.cfg root@172.16.1.14:/usr/local/zookeeper/conf/
进入data目录创建一个myid文件 三台节点都需要创建,每个节点的id不同
1 2 3 4 5 6 7 8 9 #NodeA: cd /usr/local/zookeeper/data/ echo 1 > myid #NodeB: cd /usr/local/zookeeper/data/ echo 2 > myid #NodeC: cd /usr/local/zookeeper/data/ echo 3 > myid
放行防火墙 1 2 3 4 5 firewall-cmd --add-port=2181/tcp --permanent firewall-cmd --add-port=2888/tcp --permanent firewall-cmd --add-port=3888/tcp --permanent firewall-cmd --reload setenforce 0
启动服务 1 2 3 4 5 6 7 8 9 10 11 12 #NodeA: ln -s /usr/local/zookeeper/bin/* zkServer.sh start zkServer.sh status #NodeB: ln -s /usr/local/zookeeper/bin/* zkServer.sh start zkServer.sh status #NodeC: ln -s /usr/local/zookeeper/bin/* zkServer.sh start zkServer.sh status # 查看状态,要确定有两个follower,一个leader
配置kafka 修改配置文件 1 2 3 4 5 6 7 8 9 vi /usr/local/kafka/config/server.properties broker.id=1 //kafka节点的唯一标识,与myid符合 listeners=PLAINTEXT://172.16.1.12:9092 //指定当前节点监听的IP与端口 log.dirs=/usr/local/kafka/data //指定该节点主题与分区存放位置,也就是消息存放位置需要创建 message.max.byte=1024000 //指定生产者推送单个消息的最大字节数 defaults.replication.factors=2 //默认follower的个数 replica.fetch.max.bytes=1024000 //指定消费者单个消息的最大字节数 num.rtition=1 //每个parttion创建一个队列 zookeeper.connect=172.16.1.12:2181,172.16.1.13:2181,172.16.1.14:2181 //指定zookeeper节点的IP与端口
将配置文件拷贝到其他节点 1 2 scp server.properties root@172.16.1.13:/usr/local/kafka/config/ scp server.properties root@172.16.1.14:/usr/local/kafka/config/
配置其他节点文件 1 2 3 4 5 6 7 [root@zoo-kafka-B ~]# vi /usr/local/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://172.16.1.13:9092 [root@zoo-kafka-C ~]# vi /usr/local/kafka/config/server.properties broker.id=3 listeners=PLAINTEXT://172.16.1.14:9092
创建日志文件存放目录 1 mkdir /usr/local/kafka/data
放行端口 1 2 firewall-cmd --add-port=9092/tcp --permanent firewall-cmd --reload
启动服务 1 2 3 4 5 6 7 8 9 ln -s /usr/local/kafka/bin/* /usr/local/bin [root@zoo-kafka-A ~]# cd /usr/local/kafka/bin [root@zoo-kafka-A bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties [root@zoo-kafka-B ~]# cd /usr/local/kafka/bin [root@zoo-kafka-B bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties [root@zoo-kafka-C ~]# cd /usr/local/kafka/bin/ [root@zoo-kafka-C bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties netstat -anpt | grep 9092
选项: –daemon:后台运行
创建主题分区 1 [root@zoo-kafka-A ~]# sh kafka-topics.sh --create --zookeeper 172.16.1.12:2181 --replication-factor 3 --partitions 1 --topic pay
选项: –create:创建 –zookeeper:指定将kafka节点信息注册到哪个zookeeper节点 –replication-factor:指定该主题和分区创建在几个几点上 –partitions:该主题创建几个分区 –topic:主题的名字
模拟生产者 1 [root@zoo-kafka-A ~]# sh kafka-console-producer.sh --broker-list 172.16.1.12:9092 --topic pay
选项: –broker-list:指定连接哪个kafka节点 –topic:指定将消息推送到哪个主题
模拟消费者 1 [root@zoo-kafka-B ~]# sh kafka-console-consumer.sh --zookeeper 172.16.1.12:2181 --from-beginning --topic pay
选项: –zookeeper:将消费者的偏移量记录到指定的zookeeper –from-beginning:消费者从第一条消息开始消费 –topic:指定从哪个主题消费信息