kafka java 基于Java API 方式使用Kafka
kafka java 基于Java API 方式使用Kafka
用户不仅能通过命令行的形式操作Kafka服务,Kafka还提供了许多编程语言的客户端工具,用户在开发独立项目时,通过调用Kafka API 来操作Kafka集群kafka java,其核心API主要有以下5种。
(1) API:构建应用程序发送数据流到Kafka集群中的主题。(2) API:构建应用程序从Kafka集群中的主题读取数据流。(3):构建流处理程序的库,能够处理流式数据。
(4) API:实现连接器,用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。
(5):构建集群管理工具,查看Kafka集群组件信息。
Kafka作为流数据处理平台kafka java 基于Java API 方式使用Kafka,本身功能强大,技术难度较高,有兴趣的读者可以通过官网深入学习。本章将介绍常用 API 以及 API 来辅助学习 Spark实时计算框架。
在开发生产者客户端时,提供了类,该类的实例化对象用来代表一个生产者进程,生产者发送消息时,并不是直接发送给服务端,而是先在客户端中把消息存入队列中,然后由一个发送线程从队列中消费消息,并以批量的方式发送消息给服务端,关于类常用的方法如下图所示。
生产者客户端用来向Kafka集群中发送消息,消费者客户端则是从Kafka集群中消费消息。作为分布式消息系统,Kafka支支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同节点的不同分区上,消费费者也可以消费集群中多个节点的多个分区上的消息kafka java 基于Java API 方式使用Kafka,
消费者应用程序是由 对象代表的一个消费者客户端进程,
类常用的方法如下图所示。
接下来,以实例演示的分式介绍Kafka的Java API操作方式。
1、创建工程kafka java,添加以下依赖:
创建一个Maven工程kafka java,在pom.xml文件中添加Kafka依赖包。
org..kafka
kafka-
2.0.0
2、编写生产者客户端,代码如下:
org..kafka…;
org..kafka…;
java.util.;
class {
void main( args) {
props = new ();
//指定Kafka集群的IP地址和端口号
props.put(“.”,
“:9092,:9092,:9092”);
//指定等待所有副本节点的应答
props.put(“acks”,”all”);
//指定消息发送最大尝试次数
props.put(“”,0);
//指定一批消息处理大小
props.put(“batch.size”,16384);
//指定请求延时
props.put(“.ms”,1);
//指定缓存区内存大小
props.put(“.”,);
//设置key序列化
props.put(“key.”,”mon..”);
//设置value序列化
props.put(“value.”,
“mon..”);
//生产数据
= new (props);
for (int i =0; i < 50; i++){
.send(new
(“”,.(i),”hello world-” + i));
.close();
(1).:设置Kafka集群的IP地址和端口号。
2)acks:消息确认机制,该值设置为all,这种策略会保证只要有一个备份存活就不会丢失数据,这种方案是最安全可靠的,但同时效率也会降低。
(3):如果当前请求失败,则生产者可以自动重新连接,但是设置=0参数,则意味请求失败不会重复连接,这样可以避免消息重复发送的可能。
(4)batch.size:生产者为每个分区维护了未发送数据的内存缓冲区,该缓冲区设置的越大,吞吐量和效率就越高,但也会浪费更多的内存。
(5).ms:指定请求延时,意味着如果在缓冲区没有被填满的情况下,会增加1ms的延迟,等待更多的数据进入缓冲区从而增加内存利用率。在默认情况下,即使缓冲区中有其他未使用的空间,也可以立即发送缓冲区。
(6).:指定缓冲区大小。
(7)key.、value.:数据在网络中传输需要进行序列化。
第27~32行代码,作用是模拟消息源,向名为的主题中发送消息数据。向 Kafka集群发送消息数据时,只需要调用类的send(()方法,该方法是异步的,调用时,它会将消息数据添加到待处理消息数据发送的缓冲区中,最终以批处理的方式处理消息数据,从而提高效率。send()方法中有3个参数,第1个参数是指定发送主题,第2个参数是设置消息的Key,第3个参数是消息的Value。
3、编写消费者客户端
通过Kafka API 创建对象,用来消费Kafka集群中名为主题的消息数据。在工程下创建.java文件,代码如下:
org..kafka…;
org..kafka…;
java.util.;
class {
void main( args) {
props = new ();
//指定Kafka集群的IP地址和端口号
props.put(“.”,
“:9092,:9092,:9092”);
//指定等待所有副本节点的应答
props.put(“acks”,”all”);
//指定消息发送最大尝试次数
props.put(“”,0);
//指定一批消息处理大小
props.put(“batch.size”,16384);
//指定请求延时
props.put(“.ms”,1);
//指定缓存区内存大小
props.put(“.”,);
//设置key序列化
props.put(“key.”,”mon..”);
//设置value序列化
props.put(“value.”,
“mon..”);
//生产数据
= new (props);
for (int i =0; i < 50; i++){
.send(new
(“”,.(i),”hello world-” + i));
.close();
生产者客户端和消费者客户端编写完之后,就先运行生产者的代码,再运行消费者代码,此时消费者客户端并没有数据,需要再次运行生产者,运行结果如下:
+ Kafka +API
1. 本站所有资源来源于用户上传和网络,如有侵权请联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系站长处理!
6. 本站不售卖代码,资源标价只是站长收集整理的辛苦费!如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。
7. 站长QQ号码 2205675299
资源库 - 资源分享下载网 » kafka java 基于Java API 方式使用Kafka
常见问题FAQ
- 关于资源售价和售后服务的说明?
- 代码有没有售后服务和技术支持?
- 有没有搭建服务?
- 链接地址失效了怎么办?
- 关于解压密码