Producer部分
Producer在实例化后, 对外提供send方法, 用于将数据送到指定的topic和partition; 以及在退出时需要的destroy方法.
接口 KafkaProducer.java
import java.util.List;import java.util.Properties;public interface KafkaProducer{ default void init() { } default void destroy() { } boolean send(String topic, D data); boolean send(String topic, Integer partition, D data); boolean send(String topic, List dataList); boolean send(String topic, Integer partition, List dataList); /** * 默认配置 */ default Properties getDefaultProps() { Properties props = new Properties(); props.put("acks", "1"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 32 * 1024 * 1024L); return props; }}
参数说明
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. props.put("acks", "all"); // If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details). props.put("retries", 0); // The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition). props.put("batch.size", 16384); // By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch.props.put("linger.ms", 1); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException props.put("buffer.memory", 33554432); // The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
实现 KafkaProducerImpl.java
import com.google.common.base.Strings;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;import java.util.Map;import java.util.Properties;public class KafkaProducerImplimplements KafkaProducer { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerImpl.class); private final Producer producer; public KafkaProducerImpl() { Properties props = this.getDefaultProps(); props.put("bootstrap.servers", servers); props.put("key.serializer", serializer); props.put("value.serializer", serializer); producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); } @Override public void destroy() { if (producer != null) { producer.close(); } } @Override public boolean send(String topic, D data) { boolean isSuc = true; try { producer.send(new ProducerRecord<>(topic, data)); } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e); } return isSuc; } @Override public boolean send(String topic, Integer partition, D data) { boolean isSuc = true; try { producer.send(new ProducerRecord<>(topic, partition, null, data)); } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e); } return isSuc; } @Override public boolean send(String topic, List dataList) { boolean isSuc = true; try { if (dataList != null) { dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, item))); } } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],dataList:[%s]", topic, dataList), e); } return isSuc; } @Override public boolean send(String topic, Integer partition, List dataList) { boolean isSuc = true; try { if (dataList != null) { dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, partition, null, item))); } } catch (Exception e) { isSuc = false; logger.error(String.format("KafkaStringProducer send error.topic:[%s],partition[%s],dataList:[%s]", topic, partition, dataList), e); } return isSuc; }}
Consumer 部分
Consumer 在实例化后, 负责将ConsumerListener添加到列表, 并订阅指定的topic, 启动一个阻塞的循环, 在收到消息后依次调用ConsumerListener进行处理
接口 KafkaConsumer.java
import java.util.Properties;public interface KafkaConsumer { default void init() { } default void destroy() { } void start(); /** * 默认配置 */ default Properties getDefaultProps() { Properties props = new Properties(); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); return props; }}
参数说明
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms.props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");// The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumerconsumer = new KafkaConsumer<>(props);// This consumer is subscribing to the topics foo and bar as part of a group of consumers called test as configured with group.id. consumer.subscribe(Arrays.asList("foo", "bar"));while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
实现 KafkaConsumerImpl.java
import com.google.common.base.Strings;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;public class KafkaConsumerImplimplements KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerImpl.class); private final List > consumerListeners = new ArrayList<>(); private Consumer consumer; private boolean running = true; private final int waitingTimeout = 100; public KafkaConsumerImpl(String topic, String groupId, String deserializer) { Properties props = this.getDefaultProps(); props.put("group.id", groupId); props.put("bootstrap.servers", servers); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); } public void setConsumerListeners(List > consumerListeners) { synchronized (this) { this.consumerListeners.clear(); if (null != consumerListeners && 0 != consumerListeners.size()) { consumerListeners.forEach(this.consumerListeners::add); } } } public void addConsumerListener(KafkaConsumerListener consumerListener) { synchronized (this) { if (null != consumerListener && !this.consumerListeners.contains(consumerListener)) { this.consumerListeners.add(consumerListener); } } } public void removeConsumerListener(KafkaConsumerListener consumerListener) { synchronized (this) { if (null != consumerListener && this.consumerListeners.contains(consumerListener)) { this.consumerListeners.remove(consumerListener); } } } @Override public void init() { this.start(); } @Override public void destroy() { running = false; } @Override public void start() { new Thread(() -> { while (running) { ConsumerRecords records = consumer.poll(waitingTimeout); for (ConsumerRecord record : records) { if (consumerListeners != null) { K key = record.key(); if (key == null) consumerListeners.forEach(consumer -> consumer.consume(record.value())); else consumerListeners.forEach(consumer -> consumer.consume(record.key(), record.value())); } } } //should use consumer in different thread, or it will throw ConcurrentModificationException if (consumer != null) { try { logger.info("start to close consumer."); consumer.close(); } catch (Exception e) { logger.error("close kafka consumer error.", e); } consumer = null; } }).start(); }}
接口 KafkaConsumerListener.java
public interface KafkaConsumerListener{ void consume(V value); default void consume(K key, V value) { consume(value); }}
.