From d4104e577164d4a41a71045fad90e7c58c686c83 Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sun, 23 Jun 2024 11:18:32 +0200 Subject: [PATCH 1/4] improved the README file with the run instruction --- README.md | 75 +++++-------------- .../m1_pe_kafka/kafka/PrintConsumer.kt | 1 + 2 files changed, 21 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 2a82e53..753ef80 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,26 @@ -# m1_pe_kafka +# Kafka + Quarkus + Docker : Demonstration +A small university project to discover the [Quarkus Framework](https://quarkus.io/) to build the +application combined with the [Kafka platform](https://kafka.apache.org/), the whole compatible +with [Docker](https://www.docker.com/). -This project uses Quarkus, the Supersonic Subatomic Java Framework. +## Run +You can run the project very simply with either [Intellij IDEA](https://www.jetbrains.com/idea/), +or directly with the terminal. -If you want to learn more about Quarkus, please visit its website: https://quarkus.io/ . +### Intellij IDEA +Simply clone and load the project into the IDE and in the +[run configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html), +and run the `compose` configuration. -## Running the application in dev mode +### Terminal +Alternatively, you can build and run the project manually by using +[gradle](https://gradle.org/) and launching with [docker-compose](https://docs.docker.com/compose/). -You can run your application in dev mode that enables live coding using: - -```shell script -./gradlew quarkusDev +```shell +gradlew build +docker compose up ``` -> **_NOTE:_** Quarkus now ships with a Dev UI, which is available in dev mode only at http://localhost:8080/q/dev/. - -## Packaging and running the application - -The application can be packaged using: - -```shell script -./gradlew build -``` - -It produces the `quarkus-run.jar` file in the `build/quarkus-app/` directory. -Be aware that it’s not an _über-jar_ as the dependencies are copied into the `build/quarkus-app/lib/` directory. - -The application is now runnable using `java -jar build/quarkus-app/quarkus-run.jar`. - -If you want to build an _über-jar_, execute the following command: - -```shell script -./gradlew build -Dquarkus.package.jar.type=uber-jar -``` - -The application, packaged as an _über-jar_, is now runnable using `java -jar build/*-runner.jar`. - -## Creating a native executable - -You can create a native executable using: - -```shell script -./gradlew build -Dquarkus.native.enabled=true -``` - -Or, if you don't have GraalVM installed, you can run the native executable build in a container using: - -```shell script -./gradlew build -Dquarkus.native.enabled=true -Dquarkus.native.container-build=true -``` - -You can then execute your native executable with: `./build/m1_pe_kafka-1.0-SNAPSHOT-runner` - -If you want to learn more about building native executables, please consult https://quarkus.io/guides/gradle-tooling. - -## Related Guides - -- Apache Kafka Client ([guide](https://quarkus.io/guides/kafka)): Connect to Apache Kafka with its native API -- Kotlin ([guide](https://quarkus.io/guides/kotlin)): Write your services in Kotlin -- Apache Kafka Streams ([guide](https://quarkus.io/guides/kafka-streams)): Implement stream processing applications - based on Apache Kafka +## Expectation +The `application` container shall print the current temperature at the selected place in +Fahrenheit every minute. diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt index ac7687b..9d5fe19 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt @@ -48,6 +48,7 @@ class PrintConsumer( // print them with their timestamp and content messages.forEach { message -> + // TODO(Faraphel): format the timestamp println("[${message.timestamp()}] ${message.value()}") } } -- 2.45.2 From ddc8097779bf3a30f2270a13163f34756dc18a35 Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sun, 23 Jun 2024 12:49:09 +0200 Subject: [PATCH 2/4] fixed some typos --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 753ef80..b19f812 100644 --- a/README.md +++ b/README.md @@ -5,16 +5,16 @@ with [Docker](https://www.docker.com/). ## Run You can run the project very simply with either [Intellij IDEA](https://www.jetbrains.com/idea/), -or directly with the terminal. +or directly within a terminal. ### Intellij IDEA Simply clone and load the project into the IDE and in the [run configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html), -and run the `compose` configuration. +choose `compose`. ### Terminal Alternatively, you can build and run the project manually by using -[gradle](https://gradle.org/) and launching with [docker-compose](https://docs.docker.com/compose/). +[gradle](https://gradle.org/) and launching the application with [docker-compose](https://docs.docker.com/compose/). ```shell gradlew build -- 2.45.2 From 009735dc63140ec8db1825f37d7fc381a624595d Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sun, 23 Jun 2024 14:49:41 +0200 Subject: [PATCH 3/4] cleaned the code to be as configurable as possible --- docker-compose.yaml | 1 + .../kotlin/fr/faraphel/m1_pe_kafka/Main.kt | 70 ++++++++++++------- .../faraphel/m1_pe_kafka/kafka/AdminUtils.kt | 7 +- .../faraphel/m1_pe_kafka/kafka/Converter.kt | 5 +- .../m1_pe_kafka/kafka/PrintConsumer.kt | 31 ++++++-- .../m1_pe_kafka/kafka/TemperatureProducer.kt | 15 ++-- 6 files changed, 90 insertions(+), 39 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 90cd2ea..a020af1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -34,6 +34,7 @@ services: dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + TEMPERATURE_LOCATION: 49.9, 2.3 networks: - kafka depends_on: diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt index 4dea472..8bed7f0 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt @@ -10,52 +10,71 @@ import io.quarkus.runtime.QuarkusApplication import io.quarkus.runtime.annotations.QuarkusMain -val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius" -val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit" - - /** - * The entrypoint of the program + * The main class. + * Contains the entrypoint of the program. */ @QuarkusMain class Main : QuarkusApplication { + /** + * The entrypoint of the program + * @param args command line arguments + * @return the result code of the program + */ override fun run(vararg args: String?): Int { - println("starting...") + // get the kafka server address + val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + + // get the topics name + val topicTemperatureCelsius: String = System.getenv("TOPIC_TEMPERATURE_CELSIUS") + ?: "temperature-celsius" + val topicTemperatureFahrenheit: String = System.getenv("TOPIC_TEMPERATURE_FAHRENHEIT") + ?: "temperature-fahrenheit" // create an admin object - val admin = AdminUtils() + val admin = AdminUtils(kafkaServer) // create our topics - admin.createTopics("temperature-celsius", "temperature-fahrenheit") + admin.createTopics(topicTemperatureCelsius, topicTemperatureFahrenheit) + + // get the location of the temperature to get + val location = System.getenv("TEMPERATURE_LOCATION") + ?: throw IllegalArgumentException("Missing environment variable: TEMPERATURE_LOCATION") + + // parse the location to get the latitude and longitude + val (latitude, longitude) = location.split(",").map { coordinate -> coordinate.trim().toDouble() } // create a producer that will generate temperature values based on the current temperature at Amiens (France). val producer = TemperatureProducer( - latitude=49.9, - longitude=2.3, - topic=TOPIC_TEMPERATURE_CELSIUS + server=kafkaServer, + latitude=latitude, + longitude=longitude, + topic=topicTemperatureCelsius ) + // create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic val converter = Converter( - inputTopic=TOPIC_TEMPERATURE_CELSIUS, - outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT + server=kafkaServer, + inputTopic=topicTemperatureCelsius, + outputTopic=topicTemperatureFahrenheit ) { temperature -> temperature.celsius.asFahrenheit } + // create a consumer that will print the received values in the Fahrenheit topic val consumer = PrintConsumer( - topic=TOPIC_TEMPERATURE_FAHRENHEIT + server=kafkaServer, + topic=topicTemperatureFahrenheit ) - // wrap the producer and the consumer in threads - // TODO(Faraphel): should be native to the producer and consumer class ? - val threadProducer = Thread { producer.produceForever() } - val threadConsumer = Thread { consumer.consumeForever() } - // run all the clients - threadConsumer.start() + producer.start() + consumer.start() converter.start() - threadProducer.start() // wait for them to finish before closing - threadConsumer.join() - threadProducer.join() + producer.join() + consumer.join() + + // close the converter if the others clients are done converter.stop() return 0 @@ -63,8 +82,9 @@ class Main : QuarkusApplication { } /** - * The main function - * Call the entrypoint directly. Used to run the program directly with Kotlin + * The main function. + * Simply call the entrypoint. + * Used to run the program directly with Kotlin. */ fun main(args: Array) { Quarkus.run(Main::class.java, *args) diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt index b78ecb0..e4321ea 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt @@ -10,18 +10,19 @@ import java.util.* /** * A wrapper around KafkaAdminClient to simplify its configuration. * @throws IllegalArgumentException the environment variables are not properly set. + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @see Admin */ class AdminUtils( - identifier: String = "admin" + private val server: String, + private val identifier: String = "admin" ) { private val adminConfig = Properties().apply { // set the identifier this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier // set the server information - this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val admin = Admin.create(adminConfig) diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt index 33cf019..6b8bec3 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt @@ -12,11 +12,13 @@ import java.util.* /** * A kafka converter. * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @param inputTopic the input kafka topic name * @param outputTopic the output kafka topic name */ class Converter( + private val server: String, private val identifier: String = "converter", private val inputTopic: String, private val outputTopic: String, @@ -30,8 +32,7 @@ class Converter( this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name // set the server information - this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = server } // define a topology for our streams model diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt index 9d5fe19..7e956e3 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt @@ -4,24 +4,38 @@ package fr.faraphel.m1_pe_kafka.kafka import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.serialization.DoubleDeserializer import org.apache.kafka.common.serialization.StringDeserializer +import java.time.DateTimeException import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.format.DateTimeFormatter import java.util.Collections import java.util.Properties +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration /** * A basic consumer that print everything in the selected topic. + * @param server the kafka server address * @param topic the topic to get the records from * @param identifier the kafka identifier for the configuration * @throws IllegalArgumentException the environment variables are not properly set. */ class PrintConsumer( + private val server: String, private val identifier: String = "consumer", private val topic: String, -) { +) : Thread() { + companion object { + // create a formatter to convert the timestamp to a string + private val timeFormatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()) + } + private val properties: Properties = Properties().apply { // identifier this[ConsumerConfig.GROUP_ID_CONFIG] = identifier @@ -29,8 +43,7 @@ class PrintConsumer( this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name // the server information - this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val consumer = KafkaConsumer(this.properties).apply { @@ -48,8 +61,10 @@ class PrintConsumer( // print them with their timestamp and content messages.forEach { message -> - // TODO(Faraphel): format the timestamp - println("[${message.timestamp()}] ${message.value()}") + // format the time of the message to a proper string + val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp())) + // print the value + println("[${time}] ${message.value()}°F") } } @@ -60,4 +75,10 @@ class PrintConsumer( fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) { while (true) this.consume(timeout) } + + /** + * Thread entrypoint + * @see consumeForever + */ + override fun run() = this.consumeForever() } diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt index 7604dd1..6436119 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt @@ -17,17 +17,19 @@ import kotlin.time.toJavaDuration /** * A kafka temperature producer. * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) + * @param server the kafka server address * @param identifier the kafka identifier for the configuration * @param latitude the latitude of the temperature to fetch * @param longitude the longitude of the temperature to fetch * @param topic the kafka topic to publish the data to */ class TemperatureProducer( + private val server: String, private val identifier: String = "consumer", private val latitude: Double, private val longitude: Double, private val topic: String, -) { +) : Thread() { companion object { private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API .scheme("https") // use the https protocol @@ -46,8 +48,7 @@ class TemperatureProducer( this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name // set the server information - this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = server } private val kafkaProducer: KafkaProducer = KafkaProducer(this.properties) ///< the kafka producer @@ -101,7 +102,13 @@ class TemperatureProducer( // produce a data this.produce() // wait for the cooldown - Thread.sleep(frequency) + sleep(frequency) } } + + /** + * Thread entrypoint + * @see produceForever + */ + override fun run() = this.produceForever() } -- 2.45.2 From bf3e1310af667a7d29b6df8ca74be704fa80c10a Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sun, 23 Jun 2024 14:51:09 +0200 Subject: [PATCH 4/4] added the configuration section to the README file --- README.md | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b19f812..3934654 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,18 @@ gradlew build docker compose up ``` +## Configuration +If wished, you can modify the configuration of the `docker-compose.yaml` file to fit your need. + +The container `application` can be easily modified with the following environment variables : + +| Name | Required | Format | Default | Description | +|------------------------------|----------|-----------------------------------------------------|--------------------------------------------------------------------|---------------------------------------------------------------| +| KAFKA_BOOTSTRAP_SERVERS | true | \[:port] | / | The Kafka server address | +| TOPIC_TEMPERATURE_CELSIUS | false | string of alphanumeric characters, ".", "_" and "-" | temperature-celsius | The name of the Kafka topic for the temperature in Celsius | +| TOPIC_TEMPERATURE_FAHRENHEIT | false | string of alphanumeric characters, ".", "_" and "-" | temperature-fahrenheit | The name of the Kafka topic for the temperature in Fahrenheit | | +| TEMPERATURE_LOCATION | true | \, \ | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates were to get the temperatures from | + ## Expectation -The `application` container shall print the current temperature at the selected place in -Fahrenheit every minute. +The `application` container shall print the current temperature at the selected place in +Fahrenheit every minute. \ No newline at end of file -- 2.45.2