M1-PE-Kafka/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt
Faraphel 500cdbd95b [WIP] made the project more docker friendly
- fixed docker-compose.yaml
- using environment variable instead of hardcoded values
2024-06-20 08:57:15 +02:00

45 lines
1.4 KiB
Kotlin

package fr.faraphel.m1_pe_kafka
/**
* The consumer
*/
import org.apache.kafka.clients.consumer.*
import java.time.Duration
import java.util.Collections
import java.util.Properties
/**
* The consumer
*/
class Consumer {
private val properties: Properties = Properties()
private var consumer: KafkaConsumer<String, String>
private val topic: String = "mon_beau_topic"
init {
this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe"
this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
this.consumer = KafkaConsumer(properties);
}
fun main() {
this.consumer.subscribe(Collections.singletonList(topic));
// Afficher les messages reçu
while (true) {
val messages: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(500));
messages.forEach { message ->
println(">" + message.value())
}
}
}
}