AMQ Streams

功能概述

AMQ Streams 基于 Strimzi(http://strimzi.io/) 社区产品,核心组件包括 Apache Kafka 和 Apache Zookeeper,是一个可大规模扩展,分布式和高性能的数据流平台。详细组件如下所示:

amq streams 478987 0918 01.png

名称 功能

Zookeeper

高可靠分布式协调器

Kafka Broker

可靠消息传输代理,负责将生产者创建的消息传输给消费者

Kafka Connect

流数据小工具,将 Kafka Broker 和其它系统通过 Connector 插件连接

Kafka Consumer and Producer APIs

Java API,用来向 Kafka Broker 发送消息或从 Kafka Broker 接收消息

Kafka Streams API

开发流处理应用

本地运行

安装与部署

1. 创建 kafka 用户和组
# groupadd kafka
# useradd -g kafka kafka
# passwd kafka
2. 创建 /opt/kafka 路径
# mkdir /opt/kafka
3. 解压安装
# unzip amq-streams-1.0.0-bin.zip -d /opt/kafka
4. 设定 /opt/kafka 组属关系
# chown -R kafka:kafka /opt/kafka/
5. 创建 /var/lib/zookeeper,并设定组属关系
# mkdir /var/lib/zookeeper
# chown -R kafka:kafka /var/lib/zookeeper/
6. 创建 /var/lib/kafka,并设定组属关系
# mkdir /var/lib/kafka
# chown -R kafka:kafka /var/lib/kafka/

单节点运行

1. 切换到 kafka 用户
# su - kafka
2. 编辑 /opt/kafka/config/zookeeper.properties,设定 dataDir
dataDir=/var/lib/zookeeper
3. 启动 Zookeeper
$ /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
4. 确保 Zookeeper 运行
$ jcmd | grep zookeeper
12433 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/kafka/config/zookeeper.properties
5. 编辑 /opt/kafka/config/server.properties,设定 log.dirs
log.dirs=/var/lib/kafka
6. 启动 Kafka
$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
7. 确保 Kafka 运行
$ jcmd | grep kafka
12778 kafka.Kafka /opt/kafka/config/server.properties
8. 向 my-topic 发送消息
$ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
Hello World Kafka
9. 从 my-topic 接收消息
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Hllo World Kafka
10. 停止 Kafka
$ /opt/kafka/bin/kafka-server-stop.sh
$ /opt/kafka/bin/zookeeper-server-stop.sh

OpenShift 上部署 Kafka

集群部署

1. 下载安装脚本
# wget https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.8.2/strimzi-0.8.2.zip
2. 创建工程
# oc new-project streams
3. 部署 Cluster Operator
# sed -i 's/namespace: .*/namespace: streams/' install/cluster-operator/*RoleBinding*.yaml
# oc apply -f install/cluster-operator
# oc apply -f examples/templates/cluster-operator
4. 部署 Kafka cluster
# oc apply -f examples/kafka/kafka-persistent.yaml
5. 部署 Kafka Connect
# oc apply -f examples/kafka-connect/kafka-connect.yaml
6. 验证 Kafka cluster 启动状况
# oc get pods
NAME                                          READY     STATUS    RESTARTS   AGE
my-cluster-entity-operator-54457f4548-x8mvq   3/3       Running   1          32m
my-cluster-kafka-0                            2/2       Running   1          34m
my-cluster-kafka-1                            2/2       Running   1          34m
my-cluster-kafka-2                            2/2       Running   1          34m
my-cluster-zookeeper-0                        2/2       Running   0          36m
my-cluster-zookeeper-1                        2/2       Running   0          36m
my-cluster-zookeeper-2                        2/2       Running   0          36m
my-connect-cluster-connect-64669fb655-6dpzb   1/1       Running   0          22m
strimzi-cluster-operator-696658566-bfmbf      1/1       Running   0          43m

部署测试

1. 发送消息到 my-topic
# oc run kafka-producer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic
>Hello World
>AMQ STREAMS
>Kafka
2. 从 my-topic 接收消息
# oc run kafka-consumer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
Hello World
AMQ STREAMS
Kafka
**
**

results matching ""

    No results matching ""