From 009735dc63140ec8db1825f37d7fc381a624595d Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sun, 23 Jun 2024 14:49:41 +0200 Subject: [PATCH] cleaned the code to be as configurable as possible --- docker-compose.yaml | 1 + .../kotlin/fr/faraphel/m1_pe_kafka/Main.kt | 70 ++++++++++++------- .../faraphel/m1_pe_kafka/kafka/AdminUtils.kt | 7 +- .../faraphel/m1_pe_kafka/kafka/Converter.kt | 5 +- .../m1_pe_kafka/kafka/PrintConsumer.kt | 31 ++++++-- .../m1_pe_kafka/kafka/TemperatureProducer.kt | 15 ++-- 6 files changed, 90 insertions(+), 39 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 90cd2ea..a020af1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -34,6 +34,7 @@ services: dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + TEMPERATURE_LOCATION: 49.9, 2.3 networks: - kafka depends_on: diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt index 4dea472..8bed7f0 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt @@ -10,52 +10,71 @@ import io.quarkus.runtime.QuarkusApplication import io.quarkus.runtime.annotations.QuarkusMain -val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius" -val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit" - - /** - * The entrypoint of the program + * The main class. + * Contains the entrypoint of the program. */ @QuarkusMain class Main : QuarkusApplication { + /** + * The entrypoint of the program + * @param args command line arguments + * @return the result code of the program + */ override fun run(vararg args: String?): Int { - println("starting...") + // get the kafka server address + val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + + // get the topics name + val topicTemperatureCelsius: String = System.getenv("TOPIC_TEMPERATURE_CELSIUS") + ?: "temperature-celsius" + val topicTemperatureFahrenheit: String = System.getenv("TOPIC_TEMPERATURE_FAHRENHEIT") + ?: "temperature-fahrenheit" // create an admin object - val admin = AdminUtils() + val admin = AdminUtils(kafkaServer) // create our topics - admin.createTopics("temperature-celsius", "temperature-fahrenheit") + admin.createTopics(topicTemperatureCelsius, topicTemperatureFahrenheit) + + // get the location of the temperature to get + val location = System.getenv("TEMPERATURE_LOCATION") + ?: throw IllegalArgumentException("Missing environment variable: TEMPERATURE_LOCATION") + + // parse the location to get the latitude and longitude + val (latitude, longitude) = location.split(",").map { coordinate -> coordinate.trim().toDouble() } // create a producer that will generate temperature values based on the current temperature at Amiens (France). val producer = TemperatureProducer( - latitude=49.9, - longitude=2.3, - topic=TOPIC_TEMPERATURE_CELSIUS + server=kafkaServer, + latitude=latitude, + longitude=longitude, + topic=topicTemperatureCelsius ) + // create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic val converter = Converter( - inputTopic=TOPIC_TEMPERATURE_CELSIUS, - outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT + server=kafkaServer, + inputTopic=topicTemperatureCelsius, + outputTopic=topicTemperatureFahrenheit ) { temperature -> temperature.celsius.asFahrenheit } + // create a consumer that will print the received values in the Fahrenheit topic val consumer = PrintConsumer( - topic=TOPIC_TEMPERATURE_FAHRENHEIT + server=kafkaServer, + topic=topicTemperatureFahrenheit ) - // wrap the producer and the consumer in threads - // TODO(Faraphel): should be native to the producer and consumer class ? - val threadProducer = Thread { producer.produceForever() } - val threadConsumer = Thread { consumer.consumeForever() } - // run all the clients - threadConsumer.start() + producer.start() + consumer.start() converter.start() - threadProducer.start() // wait for them to finish before closing - threadConsumer.join() - threadProducer.join() + producer.join() + consumer.join() + + // close the converter if the others clients are done converter.stop() return 0 @@ -63,8 +82,9 @@ class Main : QuarkusApplication { } /** - * The main function - * Call the entrypoint directly. Used to run the program directly with Kotlin + * The main function. + * Simply call the entrypoint. + * Used to run the program directly with Kotlin. */ fun main(args: Array) { Quarkus.run(Main::class.java, *args) diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt index b78ecb0..e4321ea 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt @@ -10,18 +10,19 @@ import java.util.* /** * A wrapper around KafkaAdminClient to simplify its configuration. * @throws IllegalArgumentException the environment variables are not properly set. + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @see Admin */ class AdminUtils( - identifier: String = "admin" + private val server: String, + private val identifier: String = "admin" ) { private val adminConfig = Properties().apply { // set the identifier this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier // set the server information - this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val admin = Admin.create(adminConfig) diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt index 33cf019..6b8bec3 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt @@ -12,11 +12,13 @@ import java.util.* /** * A kafka converter. * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @param inputTopic the input kafka topic name * @param outputTopic the output kafka topic name */ class Converter( + private val server: String, private val identifier: String = "converter", private val inputTopic: String, private val outputTopic: String, @@ -30,8 +32,7 @@ class Converter( this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name // set the server information - this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = server } // define a topology for our streams model diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt index 9d5fe19..7e956e3 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt @@ -4,24 +4,38 @@ package fr.faraphel.m1_pe_kafka.kafka import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.serialization.DoubleDeserializer import org.apache.kafka.common.serialization.StringDeserializer +import java.time.DateTimeException import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.format.DateTimeFormatter import java.util.Collections import java.util.Properties +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration /** * A basic consumer that print everything in the selected topic. + * @param server the kafka server address * @param topic the topic to get the records from * @param identifier the kafka identifier for the configuration * @throws IllegalArgumentException the environment variables are not properly set. */ class PrintConsumer( + private val server: String, private val identifier: String = "consumer", private val topic: String, -) { +) : Thread() { + companion object { + // create a formatter to convert the timestamp to a string + private val timeFormatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()) + } + private val properties: Properties = Properties().apply { // identifier this[ConsumerConfig.GROUP_ID_CONFIG] = identifier @@ -29,8 +43,7 @@ class PrintConsumer( this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name // the server information - this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val consumer = KafkaConsumer(this.properties).apply { @@ -48,8 +61,10 @@ class PrintConsumer( // print them with their timestamp and content messages.forEach { message -> - // TODO(Faraphel): format the timestamp - println("[${message.timestamp()}] ${message.value()}") + // format the time of the message to a proper string + val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp())) + // print the value + println("[${time}] ${message.value()}°F") } } @@ -60,4 +75,10 @@ class PrintConsumer( fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) { while (true) this.consume(timeout) } + + /** + * Thread entrypoint + * @see consumeForever + */ + override fun run() = this.consumeForever() } diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt index 7604dd1..6436119 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt @@ -17,17 +17,19 @@ import kotlin.time.toJavaDuration /** * A kafka temperature producer. * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @param latitude the latitude of the temperature to fetch * @param longitude the longitude of the temperature to fetch * @param topic the kafka topic to publish the data to */ class TemperatureProducer( + private val server: String, private val identifier: String = "consumer", private val latitude: Double, private val longitude: Double, private val topic: String, -) { +) : Thread() { companion object { private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API .scheme("https") // use the https protocol @@ -46,8 +48,7 @@ class TemperatureProducer( this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name // set the server information - this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val kafkaProducer: KafkaProducer = KafkaProducer(this.properties) ///< the kafka producer @@ -101,7 +102,13 @@ class TemperatureProducer( // produce a data this.produce() // wait for the cooldown - Thread.sleep(frequency) + sleep(frequency) } } + + /** + * Thread entrypoint + * @see produceForever + */ + override fun run() = this.produceForever() }