1. 引入 jar
< dependency>
< groupId> org. springframework. kafka < / groupId>
< artifactId> spring- kafka < / artifactId>
< / dependency>
2. yml 配置
spring:
kafka :
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
producer:
retries: 0
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org. apache. kafka . common. serialization. StringSerializer
value-serializer: org. apache. kafka . common. serialization. StringSerializer
consumer:
enable-auto - commit: false
auto-commit-interval: 100
auto-offset-reset: earliest
max-poll-records: 500
key-deserializer: org. apache. kafka . common. serialization. StringDeserializer
value-deserializer: org. apache. kafka . common. serialization. StringDeserializer
group-id : ${ APP_NAME}
listener:
ack-mode: manual_immediate
poll-timeout: 500ms
3. 实现代码
生产者:
@Slf4j
@Component
public class KafkaProducer {
private final ExecutorService executorService = Executors . newFixedThreadPool ( 10 ) ;
@Resource
private KafkaTemplate < String , String > kafka Template;
public void send ( String topic, String msg) {
executorService. submit ( ( ) -> kafka Template. send ( topic, msg)
. addCallback ( result -> {
if ( result != null && result. getRecordMetadata ( ) != null ) {
log. info ( "消息发送成功,offset = {}" , result. getRecordMetadata ( ) . offset ( ) ) ;
} else {
log. warn ( "消息发送完成,但结果或其元数据为空" ) ;
}
} , throwable -> log. error ( "消息发送失败,原因 = {}" , throwable. getMessage ( ) ) ) ) ;
}
@PreDestroy
public void shutdown ( ) {
executorService. shutdown ( ) ;
try {
if ( ! executorService. awaitTermination ( 500 , TimeUnit . MILLISECONDS ) ) {
executorService. shutdownNow ( ) ;
}
} catch ( InterruptedException e) {
executorService. shutdownNow ( ) ;
Thread . currentThread ( ) . interrupt ( ) ;
}
}
}
消费者:
@Component
public class KafkaConsumer {
@KafkaListener ( topics = "myTopic" , groupId = "xxx" , properties = "max.poll.records:5" , concurrency = "3" )
public void listen ( ConsumerRecord < ? , ? > record) {
ack. acknowledge ( ) ;
}
}