博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在Java中使用Kafka
阅读量:6168 次
发布时间:2019-06-21

本文共 10148 字,大约阅读时间需要 33 分钟。

Producer部分

Producer在实例化后, 对外提供send方法, 用于将数据送到指定的topic和partition; 以及在退出时需要的destroy方法.

接口 KafkaProducer.java

import java.util.List;import java.util.Properties;public interface KafkaProducer
{ default void init() { } default void destroy() { } boolean send(String topic, D data); boolean send(String topic, Integer partition, D data); boolean send(String topic, List
dataList); boolean send(String topic, Integer partition, List
dataList); /** * 默认配置 */ default Properties getDefaultProps() { Properties props = new Properties(); props.put("acks", "1"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 32 * 1024 * 1024L); return props; }}

参数说明

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. props.put("acks", "all"); // If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details). props.put("retries", 0); // The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition). props.put("batch.size", 16384); // By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch.props.put("linger.ms", 1); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException props.put("buffer.memory", 33554432); // The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

实现 KafkaProducerImpl.java

import com.google.common.base.Strings;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;import java.util.Map;import java.util.Properties;public class KafkaProducerImpl
implements KafkaProducer
{ private static final Logger logger = LoggerFactory.getLogger(KafkaProducerImpl.class); private final Producer
producer; public KafkaProducerImpl() { Properties props = this.getDefaultProps(); props.put("bootstrap.servers", servers); props.put("key.serializer", serializer); props.put("value.serializer", serializer); producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); } @Override public void destroy() { if (producer != null) { producer.close(); } } @Override public boolean send(String topic, D data) { boolean isSuc = true; try { producer.send(new ProducerRecord<>(topic, data)); } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e); } return isSuc; } @Override public boolean send(String topic, Integer partition, D data) { boolean isSuc = true; try { producer.send(new ProducerRecord<>(topic, partition, null, data)); } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e); } return isSuc; } @Override public boolean send(String topic, List
dataList) { boolean isSuc = true; try { if (dataList != null) { dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, item))); } } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],dataList:[%s]", topic, dataList), e); } return isSuc; } @Override public boolean send(String topic, Integer partition, List
dataList) { boolean isSuc = true; try { if (dataList != null) { dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, partition, null, item))); } } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],partition[%s],dataList:[%s]", topic, partition, dataList), e); } return isSuc; }}

 

Consumer 部分

Consumer 在实例化后, 负责将ConsumerListener添加到列表, 并订阅指定的topic, 启动一个阻塞的循环, 在收到消息后依次调用ConsumerListener进行处理

接口 KafkaConsumer.java

import java.util.Properties;public interface KafkaConsumer {    default void init() {    }    default void destroy() {    }    void start();    /**     * 默认配置     */    default Properties getDefaultProps() {        Properties props = new Properties();        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("session.timeout.ms", "30000");        return props;    }}  

参数说明

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms.props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");// The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer
consumer = new KafkaConsumer<>(props);// This consumer is subscribing to the topics foo and bar as part of a group of consumers called test as configured with group.id. consumer.subscribe(Arrays.asList("foo", "bar"));while (true) { ConsumerRecords
records = consumer.poll(100); for (ConsumerRecord
record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}

实现 KafkaConsumerImpl.java

import com.google.common.base.Strings;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;public class KafkaConsumerImpl
implements KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerImpl.class); private final List
> consumerListeners = new ArrayList<>(); private Consumer
consumer; private boolean running = true; private final int waitingTimeout = 100; public KafkaConsumerImpl(String topic, String groupId, String deserializer) { Properties props = this.getDefaultProps(); props.put("group.id", groupId); props.put("bootstrap.servers", servers); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); } public void setConsumerListeners(List
> consumerListeners) { synchronized (this) { this.consumerListeners.clear(); if (null != consumerListeners && 0 != consumerListeners.size()) { consumerListeners.forEach(this.consumerListeners::add); } } } public void addConsumerListener(KafkaConsumerListener
consumerListener) { synchronized (this) { if (null != consumerListener && !this.consumerListeners.contains(consumerListener)) { this.consumerListeners.add(consumerListener); } } } public void removeConsumerListener(KafkaConsumerListener
consumerListener) { synchronized (this) { if (null != consumerListener && this.consumerListeners.contains(consumerListener)) { this.consumerListeners.remove(consumerListener); } } } @Override public void init() { this.start(); } @Override public void destroy() { running = false; } @Override public void start() { new Thread(() -> { while (running) { ConsumerRecords
records = consumer.poll(waitingTimeout); for (ConsumerRecord
record : records) { if (consumerListeners != null) { K key = record.key(); if (key == null) consumerListeners.forEach(consumer -> consumer.consume(record.value())); else consumerListeners.forEach(consumer -> consumer.consume(record.key(), record.value())); } } } //should use consumer in different thread, or it will throw ConcurrentModificationException if (consumer != null) { try { logger.info("start to close consumer."); consumer.close(); } catch (Exception e) { logger.error("close kafka consumer error.", e); } consumer = null; } }).start(); }}

接口 KafkaConsumerListener.java

public interface KafkaConsumerListener
{ void consume(V value); default void consume(K key, V value) { consume(value); }}

.

 

转载地址:http://hojba.baihongyu.com/

你可能感兴趣的文章
富士通仍执着SPARC架构芯片 将坚持推新
查看>>
易宪容:企业要利用大数据挖掘潜在需求
查看>>
微软声称Win10周年更新为Edge浏览器带来更好电池寿命
查看>>
混合云是企业IT的未来吗?
查看>>
LINE在日本取得成功 但全球化之路还很长
查看>>
红帽云套件新增QuickStart Cloud Installer,加快私有云部署
查看>>
MapXtreme 2005 学习心得 一些问题(八)
查看>>
流量精细化运营时代,营销SaaS之使命——流量掘金
查看>>
哥伦比亚大学牙科学院使用RFID系统,更好管理牙科器械
查看>>
雅虎同意出售核心资产
查看>>
Win10大丰收的节奏 微软收编iOS全部150万应用
查看>>
智慧城市要除“城市病” 中兴通讯开辟新增长极
查看>>
华平蝉联“视频会议十大卓越品牌”
查看>>
Opera已确认解散iOS开发团队
查看>>
DevOps:新的业务浪潮
查看>>
CERT:启用EMET的Windows 7比Windows 10更加安全
查看>>
LINE上市:一场迟到、勇敢又无奈的IPO
查看>>
OA选型:OA系统工作流是核心
查看>>
如何发现“利用DNS放大攻击”的服务器
查看>>
《Arduino开发实战指南:LabVIEW卷》——第2章 Arduino软件
查看>>