在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
Apache Kafka - 简介在大数据中,使用了大量的数据。 关于数据,我们有两个主要挑战。第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据。 为了克服这些挑战,您必须需要一个消息系统。 Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。 什么是消息系统?消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 分布式消息传递基于可靠消息队列的概念。 消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 大多数消息模式遵循 pub-sub 。 点对点消息系统在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。 一旦消费者读取队列中的消息,它就从该队列中消失。 该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。 下图描述了结构。 发布 - 订阅消息系统在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。 一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。 什么是Kafka?Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。 好处以下是Kafka的几个好处 -
Kafka非常快,并保证零停机和零数据丢失。 用例Kafka可以在许多用例中使用。 其中一些列出如下 -
需要KafkaKafka是一个统一的平台,用于处理所有实时数据Feed。 Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。 Kafka非常快,执行2百万写/秒。 Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存。 这使得将数据从页面缓存传输到网络套接字非常有效。 Apache Kafka - 基础在深入学习Kafka之前,您必须了解主题,经纪人,生产者和消费者等主要术语。 下图说明了主要术语,表格详细描述了图表组件。 在上图中,主题配置为三个分区。 分区1具有两个偏移因子0和1.分区2具有四个偏移因子0,1,2和3.分区3具有一个偏移因子0.副本的id与承载它的服务器的id相同。 假设,如果主题的复制因子设置为3,那么Kafka将创建每个分区的3个相同的副本,并将它们放在集群中以使其可用于其所有操作。 为了平衡集群中的负载,每个代理都存储一个或多个这些分区。 多个生产者和消费者可以同时发布和检索消息。
Apache Kafka - 集群架构看看下面的插图。 它显示Kafka的集群图。 下表描述了上图中显示的每个组件。
Apache Kafka - WorkFlow到目前为止,我们讨论了Kafka的核心概念。 让我们现在来看一下Kafka的工作流程。 Kafka只是分为一个或多个分区的主题的集合。 Kafka分区是消息的线性有序序列,其中每个消息由它们的索引(称为偏移)来标识。 Kafka集群中的所有数据都是不相连的分区联合。 传入消息写在分区的末尾,消息由消费者顺序读取。 通过将消息复制到不同的代理提供持久性。 Kafka以快速,可靠,持久,容错和零停机的方式提供基于pub-sub和队列的消息系统。 在这两种情况下,生产者只需将消息发送到主题,消费者可以根据自己的需要选择任何一种类型的消息传递系统。 让我们按照下一节中的步骤来了解消费者如何选择他们选择的消息系统。 发布 - 订阅消息的工作流程以下是Pub-Sub消息的逐步工作流程 -
队列消息/用户组的工作流在队列消息传递系统而不是单个消费者中,具有相同
ZooKeeper的作用Apache Kafka的一个关键依赖是Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。 由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。 要了解有关Zookeeper的详细信息,请参阅 zookeeper 让我们继续进一步关于如何在您的机器上安装Java,ZooKeeper和Kafka在下一章。 Apache Kafka - 安装步骤以下是在机器上安装Java的步骤。 步骤1 - 验证Java安装希望你已经在你的机器上安装了java,所以你只需使用下面的命令验证它。 $ java -version 如果java在您的机器上成功安装,您可以看到已安装的Java的版本。 步骤1.1 - 下载JDK如果没有下载Java,请通过访问以下链接并下载最新版本来下载最新版本的JDK。 http://www.oracle.com/technetwork/java/javase/downloads/index.html现在最新的版本是JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz"。 请在您的机器上下载该文件。 步骤1.2 - 提取文件通常,正在下载的文件存储在下载文件夹中,验证它并使用以下命令提取tar设置。 $ cd /go/to/download/path $ tar -zxf jdk-8u60-linux-x64.gz 步骤1.3 - 移动到选择目录要将java提供给所有用户,请将提取的java内容移动到 $ su password: (type password of root user) $ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/ 步骤1.4 - 设置路径要设置路径和JAVA_HOME变量,请将以下命令添加到〜/ .bashrc文件。 export JAVA_HOME =/usr/jdk/jdk-1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin 现在将所有更改应用到当前运行的系统。 $ source ~/.bashrc 步骤1.5 - Java替代使用以下命令更改Java Alternatives。 update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100 步骤1.6 - 现在使用第1步中说明的验证命令(java -version)验证java。 步骤2 - ZooKeeper框架安装步骤2.1 - 下载ZooKeeper要在您的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。 http://zookeeper.apache.org/releases.html现在,最新版本的ZooKeeper是3.4.6(ZooKeeper-3.4.6.tar.gz)。 步骤2.2 - 提取tar文件使用以下命令提取tar文件 $ cd opt/ $ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6 $ mkdir data 步骤2.3 - 创建配置文件使用命令vi“conf / zoo.cfg"打开名为 $ vi conf/zoo.cfg tickTime=2000 dataDir=/path/to/zookeeper/data clientPort=2181 initLimit=5 syncLimit=2 一旦配置文件成功保存并再次返回终端,您可以启动zookeeper服务器。 步骤2.4 - 启动ZooKeeper服务器$ bin/zkServer.sh start 执行此命令后,您将得到如下所示的响应 - $ JMX enabled by default $ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED 步骤2.5 - 启动CLI$ bin/zkCli.sh 输入上面的命令后,您将被连接到zookeeper服务器,并将获得以下响应。 Connecting to localhost:2181 ................ ................ ................ Welcome to ZooKeeper! ................ ................ WATCHER:: WatchedEvent state:SyncConnected type: None path:null [zk: localhost:2181(CONNECTED) 0] 步骤2.6 - 停止Zookeeper服务器连接服务器并执行所有操作后,可以使用以下命令停止zookeeper服务器 - $ bin/zkServer.sh stop 现在你已经在你的机器上成功安装了Java和ZooKeeper。 让我们看看安装Apache Kafka的步骤。 步骤3 - Apache Kafka安装让我们继续以下步骤在您的机器上安装Kafka。 步骤3.1 - 下载Kafka要在您的机器上安装Kafka,请点击以下链接 - https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz现在最新版本,即 - kafka_2.11_0.9.0.0.tgz 将下载到您的计算机上。 步骤3.2 - 解压tar文件使用以下命令提取tar文件 - $ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz $ cd kafka_2.11.0.9.0.0 现在您已经在您的机器上下载了最新版本的Kafka。 步骤3.3 - 启动服务器您可以通过给出以下命令来启动服务器 - $ bin/kafka-server-start.sh config/server.properties 服务器启动后,您会在屏幕上看到以下响应: $ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. ……………………………………………. 步骤4 - 停止服务器执行所有操作后,可以使用以下命令停止服务器 - $ bin/kafka-server-stop.sh config/server.properties 现在我们已经讨论了Kafka安装,我们可以在下一章中学习如何对Kafka执行基本操作。 Apache Kafka - 基本操作首先让我们开始实现 希望你现在可以在你的机器上安装Java,ZooKeeper和Kafka。 在迁移到Kafka Cluster Setup之前,首先需要启动ZooKeeper,因为Kafka Cluster使用ZooKeeper。 启动ZooKeeper打开一个新终端并键入以下命令 - bin/zookeeper-server-start.sh config/zookeeper.properties 要启动Kafka Broker,请键入以下命令 - bin/kafka-server-start.sh config/server.properties 启动Kafka Broker后,在ZooKeeper终端上键入命令 821 QuorumPeerMain 928 Kafka 931 Jps 现在你可以看到两个守护进程运行在终端上,QuorumPeerMain是ZooKeeper守护进程,另一个是Kafka守护进程。 单节点 - 单代理配置在此配置中,您有一个ZooKeeper和代理id实例。 以下是配置它的步骤 - 创建Kafka主题 - Kafka提供了一个名为 语法 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name 示例 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka 我们刚刚创建了一个名为 输出 - 创建主题 创建主题后,您可以在Kafka代理终端窗口中获取通知,并在config / server.properties文件中的“/ tmp / kafka-logs /"中指定的创建主题的日志。 主题列表要获取Kafka服务器中的主题列表,可以使用以下命令 - 语法 bin/kafka-topics.sh --list --zookeeper localhost:2181 输出 Hello-Kafka 由于我们已经创建了一个主题,它将仅列出 启动生产者以发送消息语法 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name 从上面的语法,生产者命令行客户端需要两个主要参数 - 代理列表 - 我们要发送邮件的代理列表。 在这种情况下,我们只有一个代理。 Config / server.properties文件包含代理端口ID,因为我们知道我们的代理正在侦听端口9092,因此您可以直接指定它。 主题名称 - 以下是主题名称的示例。 示例 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka 生产者将等待来自stdin的输入并发布到Kafka集群。 默认情况下,每个新行都作为新消息发布,然后在 输出 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message My second message 启动消费者以接收消息与生产者类似,在 语法 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning 示例 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning 输出 Hello My first message My second message 最后,您可以从制作商的终端输入消息,并看到他们出现在消费者的终端。 到目前为止,您对具有单个代理的单节点群集有非常好的了解。 现在让我们继续讨论多个代理配置。 单节点多代理配置在进入多个代理集群设置之前,首先启动ZooKeeper服务器。 创建多个Kafka Brokers - 我们在配置/ server.properties中已有一个Kafka代理实例。 现在我们需要多个代理实例,因此将现有的server.prop-erties文件复制到两个新的配置文件中,并将其重命名为server-one.properties和server-two.properties。 然后编辑这两个新文件并分配以下更改 - config / server-one.properties# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1 config / server-two.properties# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2 启动多个代理 - 在三台服务器上进行所有更改后,打开三个新终端,逐个启动每个代理。 Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties 现在我们有三个不同的经纪人在机器上运行。 自己尝试,通过在ZooKeeper终端上键入 jps 检查所有守护程序,然后您将看到响应。 创建主题让我们为此主题将复制因子值指定为三个,因为我们有三个不同的代理运行。 如果您有两个代理,那么分配的副本值将是两个。 语法 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name 示例 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication 输出 created topic “Multibrokerapplication"
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation 输出 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1 从上面的输出,我们可以得出结论,第一行给出所有分区的摘要,显示主题名称,分区数量和我们已经选择的复制因子。 在第二行中,每个节点将是分区的随机选择部分的领导者。 在我们的例子中,我们看到我们的第一个broker(with broker.id 0)是领导者。 然后Replicas:0,2,1意味着所有代理复制主题最后 启动生产者以发送消息此过程保持与单代理设置中相同。 示例 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication 输出 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message 启动消费者以接收消息此过程保持与单代理设置中所示的相同。 示例 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning 输出 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message 基本主题操作在本章中,我们将讨论各种基本主题操作。 修改主题您已经了解如何在Kafka Cluster中创建主题。 现在让我们使用以下命令修改已创建的主题 语法 bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count 示例 We have already created a topic “Hello-Kafka" with single partition count and one replica factor. Now using “alter" command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2 输出 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! 删除主题要删除主题,可以使用以下语法。 语法 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name 示例 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka 输出 > Topic Hello-kafka marked for deletion 注意 - 如果 delete.topic.enable 未设置为true,则此操作不会产生任何影响 Apache Kafka - 简单生产者示例让我们使用Java客户端创建一个用于发布和使用消息的应用程序。 Kafka生产者客户端包括以下API。 KafkaProducer API让我们了解本节中最重要的一组Kafka生产者API。 KafkaProducer API的中心部分是
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
public void flush()
public Map metrics() 它返回由生产者维护的内部度量的映射。
生产者API生产者API的中心部分是 生产者类生产者类提供send方法以使用以下签名向单个或多个主题发送消息。 public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop); 有两种类型的生产者 - 同步和异步。 相同的API配置也适用于 public void close()生产者类提供关闭方法以关闭与所有Kafka代理的生产者池连接。 配置设置下表列出了Producer API的主要配置设置,以便更好地理解 -
ProducerRecord APIProducerRecord是发送到Kafka cluster.ProducerRecord类构造函数的键/值对,用于使用以下签名创建具有分区,键和值对的记录。 public ProducerRecord (string topic, int partition, k key, v value)
public ProducerRecord (string topic, k key, v value) ProducerRecord类构造函数用于创建带有键,值对和无分区的记录。
public ProducerRecord (string topic, v value) ProducerRecord类创建一个没有分区和键的记录。
ProducerRecord类方法列在下表中 -
SimpleProducer应用程序在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建自己的主题。 之后,创建一个名为 //import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully"); producer.close(); } } 编译 - 可以使用以下命令编译应用程序。 javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java 执行 - 可以使用以下命令执行应用程序。 java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name> 输出 Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10 简单消费者示例到目前为止,我们已经创建了一个发送消息到Kafka集群的生产者。 现在让我们创建一个消费者来消费Kafka集群的消息。 KafkaConsumer API用于消费来自Kafka集群的消息。 KafkaConsumer类的构造函数定义如下。 public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs) configs - 返回消费者配置的地图。 KafkaConsumer类具有下表中列出的以下重要方法。
ConsumerRecord APIConsumerRecord API用于从Kafka集群接收记录。 此API由主题名称,分区号(从中接收记录)和指向Kafka分区中的记录的偏移量组成。 ConsumerRecord类用于创建具有特定主题名称,分区计数和< key,value>的消费者记录。 对。 它有以下签名。 public ConsumerRecord(string topic,int partition, long offset,K key, V value)
ConsumerRecords APIConsumerRecords API充当ConsumerRecord的容器。 此API用于保存特定主题的每个分区的ConsumerRecord列表。 它的构造器定义如下。 public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
ConsumerRecords类定义了以下方法。
配置设置Consumer客户端API主配置设置的配置设置如下所示 -
SimpleConsumer应用程序生产者应用程序步骤在此保持不变。 首先,启动你的ZooKeeper和Kafka代理。 然后使用名为 import java.util.Properties; import java.util.Arrays; import org.apache.kafka |
请发表评论