Kafka消息队列在Debian上如何使用

950
2025/4/13 9:32:47
栏目: 智能运维
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

在Debian上使用Kafka消息队列,可以按照以下步骤进行安装和配置:

安装Kafka

  1. 更新包列表
sudo apt-get update
  1. 安装Kafka
sudo apt-get install kafka
  1. 启动Kafka服务
sudo systemctl start kafka
  1. 设置Kafka开机自启动
sudo systemctl enable kafka
  1. 验证Kafka是否正在运行
sudo systemctl status kafka

配置Kafka

Kafka的主要配置文件位于/etc/kafka/server.properties。你可以根据需要修改以下配置项:

  • broker.id:Kafka broker的唯一标识。
  • listeners:监听的地址和端口。
  • advertised.listeners:对外暴露的地址和端口。
  • zookeeper.connect:连接到Zookeeper的地址。

生产者和消费者示例

生产者示例(Java)

以下是一个简单的Java生产者示例,使用Spring Kafka库:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Properties;

@Service
public class KafkaProducerService {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        kafkaTemplate.send(new ProducerRecord<>(topic, message));
    }
}

消费者示例(Java)

以下是一个简单的Java消费者示例,使用Spring Kafka库:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

@Service
public class KafkaConsumerService {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String groupId;

    @KafkaListener(topics = "${kafka.consumer.topic}")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }

    public void startConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(groupId));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                listen(record);
            }
        }
    }
}

配置文件示例(application.properties)

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group-id=my-group
kafka.consumer.topic=my-topic

参考资料

  1. Kafka实现消息的自定义队列推送和消费
  2. Kafka 从入门到精通——分布式消息队列的全方位项目实战与性能优化
  3. 程序员必须了解的消息队列之王——Kafka

辰迅云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

推荐阅读: Node.js日志在Debian如何优化查询