博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka consumer 代码示例
阅读量:5291 次
发布时间:2019-06-14

本文共 3087 字,大约阅读时间需要 10 分钟。

 

 

使用者小组 使得许多进程的多台机器 在逻辑上作为一个单个的使用者 出现。

我们使用中,一种常见的情况是,我们按照逻辑划分出多个使用者小组,每个小组都是有作为一个逻辑整体的多台使用者计算机组成的集群。

consumer group 设计的目的之一也是为了应用多线程同时去消费一个topic中的数据

 

使用者API

我们有两个层次的使用者API。

 底层比较简单的API维护了一个同单个代理建立的还接,完全同収送给服务器的网绚请求相吻合。该API完全是无状态的,每个请求都带有一个偏秱量作为参数,仍而允许用户以自己选择的仸意方式维护该元数据。
高层API对使用者隐藏了代理的具体细节,使用者可运行于集群中的机器上而无需关心底层的拓扑结构。它维护着数据使用的状态。高局API迓提供了订阅同一个过滤表达式(例如,白名单或黑名单的正则表达式)相匹配的多个话题的能力。

 

高层api

 该API的中心是一个由KafkaStream返个类实现的迭代器(iterator)。每个KafkaStream都代表着一个仍一个戒多个分区刡一个戒多个服务器的消息流。每个流都是使用单个线程迕行处理的,所以,该API的使用者在该API的创建调用中可以提供所需的仸意个数的流。返样,一个流可能会代表多个服务器分区的合幵(同处理线程的数目相同),但每个分区叧会把数据収送给一个流中。

 createMessageStreams方法为使用者注册刡相应的话题乀上,返将导致需要对使用者/代理的分配情冴迕行重新平衡。为了将重新平衡操作减少刡最小。该API鼓励在一次调用中就创建多个话题流。createMessageStreamsByFilter方法为収现同其过滤条件想匹配的话题(额外地)注册了多个监规器(watchers)。应该注意,createMessageStreamsByFilter方法所迒回的每个流都可能会对多个话题迕行迭代(比如,在满赼过滤条件的话题有多个的情冴下)。

 

 

 

 

kafka consumer group总结

kafka消费者api分为high api和low api,目前上述demo是都是使用kafka high api,高级api不用关心维护消费状态信息和负载均衡,不用关心offset。

高级api的一些注意事项:
3,增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 
4,获取不到数据时,会block的
2,consumer group 通过zookeeper来消费kafka集群中的消息(这个过程由zookeeper进行管理);

相对于low api自己管理offset,high api把offset的管理交给了zookeeper,但是high api并不是消费一次就在zookeeper中更新一次,而是每间隔一个(默认1000ms)时间更新一次offset,可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown。

例子:

    1. import kafka.consumer.ConsumerIterator;  
    2. import kafka.consumer.KafkaStream;  
    3.    
    4. public class ConsumerTest implements Runnable {  
    5.     private KafkaStream m_stream;  
    6.     private int m_threadNumber;  
    7.    
    8.     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {  
    9.         m_threadNumber = a_threadNumber;  
    10.         m_stream = a_stream;  
    11.     }  
    12.    
    13.     public void run() {  
    14.         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
    15.         while (it.hasNext())  
    16.             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));  
    17.         System.out.println("Shutting down Thread: " + m_threadNumber);  
    18.     }  
    19. }  
    20.   
    21. //配置连接zookeeper的信息  
    22. private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {  
    23.         Properties props = new Properties();  
    24.         props.put("zookeeper.connect", a_zookeeper);        //zookeeper连接地址  
    25.         props.put("group.id", a_groupId);           //consumer group的id  
    26.         props.put("zookeeper.session.timeout.ms", "400");  
    27.         props.put("zookeeper.sync.time.ms", "200");  
    28.         props.put("auto.commit.interval.ms", "1000");  
    29.         return new ConsumerConfig(props);  
    30.     }  
    31.   
    32. //建立一个消费者线程池  
    33. public void run(int a_numThreads) {  
    34.     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
    35.     topicCountMap.put(topic, new Integer(a_numThreads));  
    36.     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
    37.     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
    38.    
    39.    
    40.     // now launch all the threads  
    41.     //  
    42.     executor = Executors.newFixedThreadPool(a_numThreads);  
    43.    
    44.     // now create an object to consume the messages  
    45.     //  
    46.     int threadNumber = 0;  
    47.     for (final KafkaStream stream : streams) {  
    48.         executor.submit(new ConsumerTest(stream, threadNumber));  
    49.         threadNumber++;  
    50.     }  
    51. }  
    52.   
    53. //经过一段时间后关闭  
    54.        try {  
    55.             Thread.sleep(10000);  
    56.         } catch (InterruptedException ie) {  
    57.   
    58.         }  
    59.         example.shutdown();  

 

转载于:https://www.cnblogs.com/ydxblog/p/7802483.html

你可能感兴趣的文章
Spark基础脚本入门实践3:Pair RDD开发
查看>>
HDU4405--Aeroplane chess(概率dp)
查看>>
RIA Test:try catch 对 Error #1009 (无法访问空对象引用的属性或方法)的处理
查看>>
python使用easyinstall安装xlrd、xlwt、pandas等功能模块的方法
查看>>
一个杯子的测试用例
查看>>
前端面试总结——http、html和浏览器篇
查看>>
CS0103: The name ‘Scripts’ does not exist in the current context解决方法
查看>>
20130330java基础学习笔记-语句_for循环嵌套练习2
查看>>
openCV(一)---将openCV框架导入iOS工程中
查看>>
Spring面试题
查看>>
窥视SP2010--第一章节--SP2010开发者路线图
查看>>
一步步学习微软InfoPath2010和SP2010--第五章节--添加逻辑和规则到表单(2)--处理验证与格式化...
查看>>
在与 SQL Server 建立连接时出现与网络相关的或特定于实例的错误。未找到或无法访问服务器。请验证实例名称是否正确并且 SQL Server 已配置为允许远程连接。...
查看>>
MVC,MVP 和 MVVM 的图示,区别
查看>>
IDEA快速实现接口快捷方式
查看>>
用默认的打开方式打开本地文件
查看>>
JavaScript-jQuery报TypeError $(...) is null错误(jQuery失效)解决办法
查看>>
C语言栈的实现
查看>>
代码为什么需要重构
查看>>
SAP销售模块塑工常见问题和解决方案(自己收藏)
查看>>