> 文章列表 > 【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

文章目录

spring-kafka

添加依赖:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.3</version></dependency>

添加spring-kafka相关配置:

#=============== 集群通用配置 ================
spring.kafka.bootstrap-servers=kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="zhurunhua" password="pwd";#=============== producer  =================
spring.kafka.producer.retries=5
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.buffer-memory=1000000
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  ==================
spring.kafka.consumer.group-id=zhurunhua-test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费者:

@Component
public class TestPlainConsumer {@KafkaListener(topics = {"zhurunhua-test-topic"})public void consumer(ConsumerRecord<String, String> record) {System.out.println(record.value());}}

topic可以从配置文件读取,代码中通过${}的方式获取配置的topic:

@Component
public class TestPlainConsumer {@KafkaListener(topics = {"${kafka.subscribe.topic}"})public void consumer(ConsumerRecord<String, String> record) {System.out.println(record.value());}}

生产者:

@Component
public class TestPlainProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${kafka.subscribe.topic}")private String topic;public void send(String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {System.err.println(throwable);}@Overridepublic void onSuccess(SendResult<String, String> sendResult) {System.out.println("发送结果:" + sendResult);}});}
}

原生客户端

引入依赖

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version><scope>compile</scope></dependency>

客户端代码

public class TestSaslClient {private final static String TOPIC = "zhurunhua-test-topic";private final static String BROKERS = "kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092";private static KafkaConsumer<String, String> consumer;private static KafkaProducer<String, String> producer;static {Properties c = initConfig();consumer = new KafkaConsumer<>(c);producer = new KafkaProducer<>(c);}/* 初始化配置 @return java.util.Properties* @date 2023/04/17 5:51 下午*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhurunhua-test-1");// SASL/PLAIN 认证配置props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT);props.put(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM);props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"zhurunhua\\" password=\\"pwd\\";");// 可选设置属性props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 自动提交offset,每1s提交一次props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);return props;}/* 消费者示例 @date 2023/04/17 5:51 下午*/public void subscribe() {consumer.subscribe(Collections.singleton(TOPIC));AtomicBoolean running = new AtomicBoolean(Boolean.TRUE);try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}}} catch (Exception e) {System.out.println("consumer error : " + e);} finally {consumer.close();}}/* 生产者示例 @date 2023/04/17 5:52 下午*/public void send() {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "zhurunhua", "测试消息");producer.send(record, (recordMetadata, e) -> {if (Objects.nonNull(e)) {System.out.println("send error: " + e);} else {System.out.println(recordMetadata);}});}
}

Tips

  1. SecurityProtocol定义在枚举:org.apache.kafka.common.security.auth.SecurityProtocol中:

【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

  1. sasl.mechanism配置项有:
    1. GSSAPI
    2. PLAIN
    3. SCRAM-SHA-256
    4. SCRAM-SHA-512
    5. OAUTHBEARER

更多安全和认证相关资料,参考:https://kafka.apache.org/documentation/#security_overview

  1. 若使用ssl,相关配置示例:

    spring.kafka.ssl.trust-store-type=JKS
    spring.kafka.ssl.key-store-type=JKS
    spring.kafka.ssl.protocol=SSL
    spring.kafka.ssl.key-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.trust-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.key-store-location=/kafka-jks/keystore.jks
    spring.kafka.ssl.trust-store-location=/kafka-jks/keystore.jks
    
  2. 使用不同的sasl.mechanism时,需要注意sasl.jaas.config的配置中,LoginModule不同:

    1. PLAIN对应的是:PlainLoginModule
    2. SCRAM相关对应的是:ScramLoginModule
  3. kafka认证的参数相对来说比较复杂,需要理解每个参数的含义,错一个就会失败,启动日志会打印客户端配置,可用于协助排查问题