搭建Zookeeper、Kafka集群 Zookeeper、Kafka集群系统环境配置 配置IP 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 ssh root@192.168.1.190 "rm -rf /etc/machine-id; systemd-machine-id-setup;reboot" ssh root@192.168.1.192 "rm -rf /etc/machine-id; systemd-machine-id-setup;reboot" ssh root@192.168.1.194 "rm -rf /etc/machine-id; systemd-machine-id-setup;reboot" ssh root@192.168.1.190 "nmcli con delete uuid d1141403-18c6-3149-907c-ed5f09663a7f;nmcli con add type ethernet ifname ens160 con-name ens160;nmcli con up ens160" ssh root@192.168.1.192 "nmcli con delete uuid d1141403-18c6-3149-907c-ed5f09663a7f;nmcli con add type ethernet ifname ens160 con-name ens160;nmcli con up ens160" ssh root@192.168.1.194 "nmcli con delete uuid d1141403-18c6-3149-907c-ed5f09663a7f;nmcli con add type ethernet ifname ens160 con-name ens160;nmcli con up ens160" ssh root@192.168.1.190 "nmcli con mod ens160 ipv4.addresses 192.168.1.61/24; nmcli con mod ens160 ipv4.gateway 192.168.1.1; nmcli con mod ens160 ipv4.method manual; nmcli con mod ens160 ipv4.dns "8.8.8.8"; nmcli con up ens160" ssh root@192.168.1.192 "nmcli con mod ens160 ipv4.addresses 192.168.1.62/24; nmcli con mod ens160 ipv4.gateway 192.168.1.1; nmcli con mod ens160 ipv4.method manual; nmcli con mod ens160 ipv4.dns "8.8.8.8"; nmcli con up ens160" ssh root@192.168.1.194 "nmcli con mod ens160 ipv4.addresses 192.168.1.63/24; nmcli con mod ens160 ipv4.gateway 192.168.1.1; nmcli con mod ens160 ipv4.method manual; nmcli con mod ens160 ipv4.dns "8.8.8.8"; nmcli con up ens160" ssh root@192.168.1.61 "nmcli con mod ens160 ipv6.addresses fc00::61/8; nmcli con up ens160" ssh root@192.168.1.62 "nmcli con mod ens160 ipv6.addresses fc00::62/8; nmcli con up ens160" ssh root@192.168.1.63 "nmcli con mod ens160 ipv6.addresses fc00::63/8; nmcli con up ens160" [root@localhost ~]# cat /etc/NetworkManager/system-connections/ens160.nmconnection [connection] id=ens160 uuid=94c63fdf-cd5a-427c-9846-5a447de2a4f5 type=ethernet interface-name=ens160 timestamp=1744436596 [ethernet] [ipv4] address1=192.168.1.61/24,192.168.1.1 dns=192.168.1.99; method=manual [ipv6] addr-gen-mode=default address1=fc00::61/8 method=auto [proxy]
设置主机名 1 2 3 hostnamectl set-hostname zk-1 hostnamectl set-hostname zk-2 hostnamectl set-hostname zk-3
关闭防火墙、selinux 1 2 3 4 5 # 关闭防火墙 systemctl disable --now firewalld # 关闭selinux setenforce 0 sed -i 's#SELINUX=enforcing#SELINUX=disabled#g' /etc/selinux/config
安装JDK 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # 查看是否存在jdk、如果安装先卸载jdk rpm -qa|grep jdk # 下载jdk https://www.oracle.com/java/technologies/downloads/#java8 # 解压jdk tar -xvf jdk-8u441-linux-x64.tar.gz # 移动到安装目录 mv jdk1.8.0_441/ /usr/local/ # 编辑环境变量 vim /etc/profile export JAVA_HOME=/usr/local/jdk1.8.0_441 export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin # 刷新环境变量 source /etc/profile # 查看版本 [root@zk-1 ~]# java -version java version "1.8.0_441" Java(TM) SE Runtime Environment (build 1.8.0_441-b07) Java HotSpot(TM) 64-Bit Server VM (build 25.441-b07, mixed mode)
安装Zookeeper集群 1 2 3 4 5 6 7 8 9 10 11 12 13 # 下载Zookeeper # https://archive.apache.org/dist/zookeeper/ wget https://archive.apache.org/dist/zookeeper/zookeeper-3.9.3/apache-zookeeper-3.9.3-bin.tar.gz # 创建应用目录 mkdir -vp /app/zookeeper-cluster cd /app/zookeeper-cluster # 解压安装 mv /root/apache-zookeeper-3.9.3-bin.tar.gz . tar xvf apache-zookeeper-3.9.3-bin.tar.gz mv apache-zookeeper-3.9.3-bin zk
修改zookeeper配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 创建目录来存放数据和日志 cd zk mkdir data logs # 拷贝配置样例 cd conf cp zoo_sample.cfg zoo.cfg # 修改配置文件 vi zoo.cfg # 设置日志和存储目录 dataDir=/app/zookeeper-cluster/zk/data dataLogDir=/app/zookeeper-cluster/zk/logs # 添加节点信息 server.1=192.168.1.61:2888:3888 server.2=192.168.1.62:2888:3888 server.3=192.168.1.63:2888:3888 # 添加sasl authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true
我的完整配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 [root@zk-1 conf]# cat zoo.cfg | grep -Ev '^$|#' tickTime=2000 initLimit=10 syncLimit=5 dataDir=/app/zookeeper-cluster/zk/data dataLogDir=/app/zookeeper-cluster/zk/logs clientPort=2181 server.1=192.168.1.61:2888:3888 server.2=192.168.1.62:2888:3888 server.3=192.168.1.63:2888:3888 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true
设置sasl认证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 设置sasl认证 cat <<EOF | tee /app/zookeeper-cluster/zk/conf/zk_jaas.conf Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_cby="Cby123.."; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="cby" password="Cby123.."; }; EOF # 修改zookeeper环境变量 # 在最下添加变量 vim zkEnv.sh export JVMFLAGS="-Djava.security.auth.login.config=/app/zookeeper-cluster/zk/conf/zk_jaas.conf ${JVMFLAGS}" # 我的结果 [root@zk-1 bin]# cat zkEnv.sh | grep JVMFLAGS export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS" export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS" export JVMFLAGS="-Djava.security.auth.login.config=/app/zookeeper-cluster/zk/conf/zk_jaas.conf ${JVMFLAGS}"
设置启动方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # 在data目录中创建myid文件 # 节点1上配置 echo 1 >/app/zookeeper-cluster/zk/data/myid # 节点2上配置 echo 2 >/app/zookeeper-cluster/zk/data/myid # 节点3上配置 echo 3 >/app/zookeeper-cluster/zk/data/myid # 创建系统启动文件 cat <<EOF | tee /usr/lib/systemd/system/zookeeper.service [Unit] Description=zookeeper After=network.target [Service] Type=forking Environment=JAVA_HOME=/usr/local/jdk1.8.0_441 ExecStart=/app/zookeeper-cluster/zk/bin/zkServer.sh start ExecStop=/app/zookeeper-cluster/zk/bin/zkServer.sh stop PIDFile=/app/zookeeper-cluster/zk/data/zookeeper_server.pid KillMode=none User=root Group=root Restart=on-failure [Install] WantedBy=multi-user.target EOF
启动停止 1 2 3 4 5 6 7 8 9 10 # 重载 systemctl daemon-reload # 启动 systemctl start zookeeper # 停止 systemctl stop zookeeper # 设置开机自启并启动 systemctl enable --now zookeeper # 查看状态 systemctl status zookeeper
使用测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 登录每个节点 进行设置sasl /app/zookeeper-cluster/zk/bin/zkCli.sh -server 192.168.1.61:2181 /app/zookeeper-cluster/zk/bin/zkCli.sh -server 192.168.1.62:2181 /app/zookeeper-cluster/zk/bin/zkCli.sh -server 192.168.1.63:2181 # 设置Acl读写权限 setAcl / sasl:cby:crdwa # 创建目录及Acl验证命令: addauth digest super:cby create /cby getAcl / ......略..... [zk: 192.168.1.63:2181(CONNECTED) 0] setAcl / sasl:cby:crdwa [zk: 192.168.1.63:2181(CONNECTED) 1] [zk: 192.168.1.63:2181(CONNECTED) 1] addauth digest super:cby [zk: 192.168.1.63:2181(CONNECTED) 2] create /cby Node already exists: /cby [zk: 192.168.1.63:2181(CONNECTED) 3] getAcl / 'sasl,'cby : cdrwa [zk: 192.168.1.63:2181(CONNECTED) 4]
安装Kafka集群 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 下载Kafka # http://kafka.apache.org/downloads # wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.0/kafka_2.13-3.9.0.tgz # 创建应用目录 mkdir -vp /app/kafka-cluster cd /app/kafka-cluster # 解压安装 mv /root/kafka_2.13-3.9.0.tgz . tar xvf kafka_2.13-3.9.0.tgz mv kafka_2.13-3.9.0 kafka mkdir -p /app/kafka-cluster/kafka/kafka-logs
修改配置 1 2 3 4 5 6 7 8 9 10 11 12 # 修改配置 cd /app/kafka-cluster/kafka/config vim server.properties # 修改项:broker.id、log.dirs、zookeeper.connect、listeners、advertised.listeners,并新增以下配置 # broker.id 三台设置为不一致的 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false super.users=User:cby
我的配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # 我的配置 [root@zk-1 config]# cat server.properties | grep -Ev '^$|#' broker.id=1 listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://192.168.1.61:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/app/kafka-cluster/kafka/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.1.61:2181,192.168.1.62:2181,192.168.1.63:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false super.users=User:cby # 修改配置 cd /app/kafka-cluster/kafka/config vim producer.properties [root@zk-1 config]# cat producer.properties | grep -Ev '^$|#' bootstrap.servers=192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 compression.type=none
服务配置认证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 服务端配置登录认证 cat <<EOF | tee /app/kafka-cluster/kafka/config/kafka_server_jaas.conf KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="cby" password="Cby123.." user_cby="Cby123.."; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="cby" password="Cby123.."; }; EOF
客户配置认证 1 2 3 4 5 6 7 8 9 # 客户端配置登录认证 cat <<EOF | tee /app/kafka-cluster/kafka/config/kafka_client_jaas.conf KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="cby" password="Cby123.."; }; EOF
配置启动认证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # 修改kafka-server-start.sh脚本,新增 -Djava.security.auth.login.config=/app/kafka-cluster/kafka/config/kafka_server_jaas.conf vim kafka-server-start.sh [root@zk-3 bin]# cat kafka-server-start.sh | grep -Ev '^$|#' then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/app/kafka-cluster/kafka/config/kafka_server_jaas.conf" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" [root@zk-3 bin]#
配置生产消费认证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # 配置生产和消费者的登录验证 # 在export KAFKA_HEAP_OPTS="-Xmx512M后添加 -Djava.security.auth.login.config=/app/kafka-cluster/kafka/config/kafka_client_jaas.conf vim kafka-console-consumer.sh [root@zk-1 bin]# cat kafka-console-consumer.sh | grep -Ev '^$|#' if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/app/kafka-cluster/kafka/config/kafka_client_jaas.conf" fi exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.ConsoleConsumer "$@" [root@zk-1 bin]# vim kafka-console-producer.sh [root@zk-1 bin]# cat kafka-console-producer.sh | grep -Ev '^$|#' if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/app/kafka-cluster/kafka/config/kafka_client_jaas.conf" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@" [root@zk-1 bin]#
配置启动停止 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # 创建系统启动文件 cat <<EOF | tee /usr/lib/systemd/system/kafka.service [Unit] Description=kafka-node01 After=network.target [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_441/bin" User=root Group=root LimitNOFILE=100000 ExecStart=/app/kafka-cluster/kafka/bin/kafka-server-start.sh /app/kafka-cluster/kafka/config/server.properties ExecStop=/app/kafka-cluster/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target EOF
启动停止 1 2 3 4 5 6 7 8 9 10 11 # 重载 systemctl daemon-reload # 启动 systemctl start kafka # 停止 systemctl stop kafka # 设置开机自启并启动 systemctl enable --now kafka # 查看状态 systemctl status kafka
配置账号密码 1 2 3 4 5 6 7 # 因为配置了SSL所以需要配置加密认证文件 cat <<EOF | tee /app/kafka-cluster/kafka/config/admin.conf security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="cby" password="Cby123.."; EOF
测试使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # 创建topic ./kafka-topics.sh --create --topic cby --replication-factor 3 --partitions 3 --bootstrap-server 192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 --command-config ../config/admin.conf # 查看topic ./kafka-topics.sh --list --bootstrap-server 192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 --command-config ../config/admin.conf # 发送消息 ./kafka-console-producer.sh --broker-list 192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 --topic cby --producer.config ../config/admin.conf > 123 > 321 # 查看消息 ./kafka-console-consumer.sh --bootstrap-server 192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 --topic cby --from-beginning --consumer.config ../config/admin.conf 123 321 # 删除topic ./kafka-topics.sh --delete --topic cby --bootstrap-server 192.168.1.61:9092,192.168.1.62:9092,192.168.1.63:9092 --command-config ../config/admin.conf
关于
https://www.oiox.cn/
https://www.oiox.cn/index.php/start-page.html
CSDN、GitHub、知乎、开源中国、思否、掘金、简书、华为云、阿里云、腾讯云、哔哩哔哩、今日头条、新浪微博、个人博客
全网可搜《小陈运维》