博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka实战-简单示例
阅读量:5223 次
发布时间:2019-06-14

本文共 5363 字,大约阅读时间需要 17 分钟。

1.概述

  上一篇博客《》中,为大家介绍了Kafka集群的安装部署,以及对Kafka集群Producer/Consumer、HA等做了相关测试,今天我们来开发一个Kafka示例,练习如何在Kafka中进行编程,下面是今天的分享的目录结构:

  • 开发环境
  • ConfigureAPI
  • Consumer
  • Producer
  • 截图预览

  下面开始今天的内容分享。

2.开发环境

  在开发Kafka相关应用之前,我们得将Kafka得开发环境搭建完成,这里我所使用得开发环境如下所示:

基础软件 工具名称
IDE JBoss Studio 8
JDK 1.7

  关于基础软件的下载及相关配置,大家可参考我写的《》一文的相关赘述,这里就不多做介绍了。在安装好相关基础软件后,我们开始项目工程的创建,这里我们所使用的工程结构是Maven,关于Maven环境的相关配置信息,可参考我在《》一文对Maven环境配置的赘述。

  在准备完成相关基础软件以及Maven环境后,我们大家创建的工程,在pom.xml文件中,添加Kafka的依赖包,添加代码如下所示:

org.apache.kafka
kafka_2.11
0.8.2.1

  下面开始编写今天的代码示例。

3.ConfigureAPI

  首先是一个配置结构类文件,配置Kafka的相关参数,代码如下所示:

package cn.hadoop.hdfs.conf;/** * @Date Apr 28, 2015 * * @Author dengjie * * @Note Set param path */public class ConfigureAPI {    public interface KafkaProperties {        public final static String ZK = "10.211.55.15:2181,10.211.55.17:2181,10.211.55.18:2181";        public final static String GROUP_ID = "test_group1";        public final static String TOPIC = "test2";        public final static String BROKER_LIST = "10.211.55.15:9092,10.211.55.17:9092,10.211.55.18:9092";        public final static int BUFFER_SIZE = 64 * 1024;        public final static int TIMEOUT = 20000;        public final static int INTERVAL = 10000;    }}

4.Consumer

  然后是一个消费程序,用于消费Kafka的消息,代码如下所示:

  • JConsumer

package cn.hadoop.hdfs.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;/** * @Date May 22, 2015 * * @Author dengjie * * @Note Kafka Consumer */public class JConsumer extends Thread {    private ConsumerConnector consumer;    private String topic;    private final int SLEEP = 1000 * 3;    public JConsumer(String topic) {        consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());        this.topic = topic;    }    private ConsumerConfig consumerConfig() {        Properties props = new Properties();        props.put("zookeeper.connect", KafkaProperties.ZK);        props.put("group.id", KafkaProperties.GROUP_ID);        props.put("zookeeper.session.timeout.ms", "40000");        props.put("zookeeper.sync.time.ms", "200");        props.put("auto.commit.interval.ms", "1000");        return new ConsumerConfig(props);    }    @Override    public void run() {        Map
topicCountMap = new HashMap
(); topicCountMap.put(topic, new Integer(1)); Map
>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream
stream = consumerMap.get(topic).get(0); ConsumerIterator
it = stream.iterator(); while (it.hasNext()) { System.out.println("Receive->[" + new String(it.next().message()) + "]"); try { sleep(SLEEP); } catch (Exception ex) { ex.printStackTrace(); } } }}

5.Producer

  接着是Kafka的生产消息程序,用于产生Kafka的消息供Consumer去消费,具体代码如下所示:

  • JProducer

package cn.hadoop.hdfs.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/** * @Date May 22, 2015 * * @Author dengjie * * @Note Kafka JProducer */public class JProducer extends Thread {    private Producer
producer; private String topic; private Properties props = new Properties(); private final int SLEEP = 1000 * 3; public JProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.211.55.18:9092"); producer = new Producer
(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int offsetNo = 1; while (true) { String msg = new String("Message_" + offsetNo); System.out.println("Send->[" + msg + "]"); producer.send(new KeyedMessage
(topic, msg)); offsetNo++; try { sleep(SLEEP); } catch (Exception ex) { ex.printStackTrace(); } } }}

6.截图预览

  在开发完Consumer和Producer的代码后,我们来测试相关应用,下面给大家编写了一个Client去测试Consumer和Producer,具体代码如下所示:

  • KafkaClient

package cn.hadoop.hdfs.kafka.client;import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;import cn.hadoop.hdfs.kafka.JConsumer;import cn.hadoop.hdfs.kafka.JProducer;/** * @Date May 22, 2015 * * @Author dengjie * * @Note To run Kafka Code */public class KafkaClient {    public static void main(String[] args) {        JProducer pro = new JProducer(KafkaProperties.TOPIC);        pro.start();        JConsumer con = new JConsumer(KafkaProperties.TOPIC);        con.start();    }}

  运行截图如下所示:

7.总结

  大家在开发Kafka的应用时,需要注意相关事项。若是使用Maven项目工程,在添加相关Kafka依赖JAR包时,有可能依赖JAR会下载失败,若出现这种情况,可手动将Kafka的依赖JAR包添加到Maven仓库即可,在编写Consumer和Producer程序,这里只是给出一个示例让大家先熟悉Kafka的代码如何去编写,后面会给大家更加详细复杂的代码模块案例。

8.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

 

  

 

转载于:https://www.cnblogs.com/smartloli/p/4543211.html

你可能感兴趣的文章
JVM常见配置汇总
查看>>
HDU 5278 PowMod 数论公式推导
查看>>
MIT研发的新型匿名网络Riffle,下一个Tor
查看>>
怎样编写高质量的java代码
查看>>
django 连接mssql 数据库(django 1.11.11 sql server 2008 R2)
查看>>
python subprocess 小例子
查看>>
第8910章
查看>>
ORACLE 取字段值首字母
查看>>
Access-Control-Allow-Origin跨域请求处理
查看>>
STM32个人笔记
查看>>
linux虚拟机mysql5.6.4-m7搭建MS主从复制。
查看>>
Codeforces 712E Memory and Casinos
查看>>
ABB Server(服务端)代码解析
查看>>
[11-3] Gradient Boosting regression
查看>>
leetcode 翻转二叉树
查看>>
MySQL 数据类型
查看>>
javascript高级程序设计第一章至第三章读书笔记
查看>>
[Luogu5241]序列(DP)
查看>>
第10课_dg
查看>>
【读书分享】读腾讯《在你身边,为你设计》有感
查看>>