Merge pull request 'Polishing the application' (#6) from polishing into master

Reviewed-on: #6
This commit is contained in:
faraphel 2024-06-23 12:53:21 +00:00
commit 946b804bda
7 changed files with 119 additions and 90 deletions

View file

@ -1,61 +1,38 @@
# m1_pe_kafka # Kafka + Quarkus + Docker : Demonstration
A small university project to discover the [Quarkus Framework](https://quarkus.io/) to build the
application combined with the [Kafka platform](https://kafka.apache.org/), the whole compatible
with [Docker](https://www.docker.com/).
This project uses Quarkus, the Supersonic Subatomic Java Framework. ## Run
You can run the project very simply with either [Intellij IDEA](https://www.jetbrains.com/idea/),
or directly within a terminal.
If you want to learn more about Quarkus, please visit its website: https://quarkus.io/ . ### Intellij IDEA
Simply clone and load the project into the IDE and in the
[run configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html),
choose `compose`.
## Running the application in dev mode ### Terminal
Alternatively, you can build and run the project manually by using
[gradle](https://gradle.org/) and launching the application with [docker-compose](https://docs.docker.com/compose/).
You can run your application in dev mode that enables live coding using: ```shell
gradlew build
```shell script docker compose up
./gradlew quarkusDev
``` ```
> **_NOTE:_** Quarkus now ships with a Dev UI, which is available in dev mode only at http://localhost:8080/q/dev/. ## Configuration
If wished, you can modify the configuration of the `docker-compose.yaml` file to fit your need.
## Packaging and running the application The container `application` can be easily modified with the following environment variables :
The application can be packaged using: | Name | Required | Format | Default | Description |
|------------------------------|----------|-----------------------------------------------------|--------------------------------------------------------------------|---------------------------------------------------------------|
| KAFKA_BOOTSTRAP_SERVERS | true | \<ip>[:port] | / | The Kafka server address |
| TOPIC_TEMPERATURE_CELSIUS | false | string of alphanumeric characters, ".", "_" and "-" | temperature-celsius | The name of the Kafka topic for the temperature in Celsius |
| TOPIC_TEMPERATURE_FAHRENHEIT | false | string of alphanumeric characters, ".", "_" and "-" | temperature-fahrenheit | The name of the Kafka topic for the temperature in Fahrenheit | |
| TEMPERATURE_LOCATION | true | \<latitude>, \<longitude> | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates were to get the temperatures from |
```shell script ## Expectation
./gradlew build The `application` container shall print the current temperature at the selected place in
``` Fahrenheit every minute.
It produces the `quarkus-run.jar` file in the `build/quarkus-app/` directory.
Be aware that its not an _über-jar_ as the dependencies are copied into the `build/quarkus-app/lib/` directory.
The application is now runnable using `java -jar build/quarkus-app/quarkus-run.jar`.
If you want to build an _über-jar_, execute the following command:
```shell script
./gradlew build -Dquarkus.package.jar.type=uber-jar
```
The application, packaged as an _über-jar_, is now runnable using `java -jar build/*-runner.jar`.
## Creating a native executable
You can create a native executable using:
```shell script
./gradlew build -Dquarkus.native.enabled=true
```
Or, if you don't have GraalVM installed, you can run the native executable build in a container using:
```shell script
./gradlew build -Dquarkus.native.enabled=true -Dquarkus.native.container-build=true
```
You can then execute your native executable with: `./build/m1_pe_kafka-1.0-SNAPSHOT-runner`
If you want to learn more about building native executables, please consult https://quarkus.io/guides/gradle-tooling.
## Related Guides
- Apache Kafka Client ([guide](https://quarkus.io/guides/kafka)): Connect to Apache Kafka with its native API
- Kotlin ([guide](https://quarkus.io/guides/kotlin)): Write your services in Kotlin
- Apache Kafka Streams ([guide](https://quarkus.io/guides/kafka-streams)): Implement stream processing applications
based on Apache Kafka

View file

@ -34,6 +34,7 @@ services:
dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
TEMPERATURE_LOCATION: 49.9, 2.3
networks: networks:
- kafka - kafka
depends_on: depends_on:

View file

@ -10,52 +10,71 @@ import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain import io.quarkus.runtime.annotations.QuarkusMain
val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius"
val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit"
/** /**
* The entrypoint of the program * The main class.
* Contains the entrypoint of the program.
*/ */
@QuarkusMain @QuarkusMain
class Main : QuarkusApplication { class Main : QuarkusApplication {
/**
* The entrypoint of the program
* @param args command line arguments
* @return the result code of the program
*/
override fun run(vararg args: String?): Int { override fun run(vararg args: String?): Int {
println("starting...") // get the kafka server address
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
// get the topics name
val topicTemperatureCelsius: String = System.getenv("TOPIC_TEMPERATURE_CELSIUS")
?: "temperature-celsius"
val topicTemperatureFahrenheit: String = System.getenv("TOPIC_TEMPERATURE_FAHRENHEIT")
?: "temperature-fahrenheit"
// create an admin object // create an admin object
val admin = AdminUtils() val admin = AdminUtils(kafkaServer)
// create our topics // create our topics
admin.createTopics("temperature-celsius", "temperature-fahrenheit") admin.createTopics(topicTemperatureCelsius, topicTemperatureFahrenheit)
// get the location of the temperature to get
val location = System.getenv("TEMPERATURE_LOCATION")
?: throw IllegalArgumentException("Missing environment variable: TEMPERATURE_LOCATION")
// parse the location to get the latitude and longitude
val (latitude, longitude) = location.split(",").map { coordinate -> coordinate.trim().toDouble() }
// create a producer that will generate temperature values based on the current temperature at Amiens (France). // create a producer that will generate temperature values based on the current temperature at Amiens (France).
val producer = TemperatureProducer( val producer = TemperatureProducer(
latitude=49.9, server=kafkaServer,
longitude=2.3, latitude=latitude,
topic=TOPIC_TEMPERATURE_CELSIUS longitude=longitude,
topic=topicTemperatureCelsius
) )
// create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic // create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic
val converter = Converter( val converter = Converter(
inputTopic=TOPIC_TEMPERATURE_CELSIUS, server=kafkaServer,
outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT inputTopic=topicTemperatureCelsius,
outputTopic=topicTemperatureFahrenheit
) { temperature -> temperature.celsius.asFahrenheit } ) { temperature -> temperature.celsius.asFahrenheit }
// create a consumer that will print the received values in the Fahrenheit topic // create a consumer that will print the received values in the Fahrenheit topic
val consumer = PrintConsumer( val consumer = PrintConsumer(
topic=TOPIC_TEMPERATURE_FAHRENHEIT server=kafkaServer,
topic=topicTemperatureFahrenheit
) )
// wrap the producer and the consumer in threads
// TODO(Faraphel): should be native to the producer and consumer class ?
val threadProducer = Thread { producer.produceForever() }
val threadConsumer = Thread { consumer.consumeForever() }
// run all the clients // run all the clients
threadConsumer.start() producer.start()
consumer.start()
converter.start() converter.start()
threadProducer.start()
// wait for them to finish before closing // wait for them to finish before closing
threadConsumer.join() producer.join()
threadProducer.join() consumer.join()
// close the converter if the others clients are done
converter.stop() converter.stop()
return 0 return 0
@ -63,8 +82,9 @@ class Main : QuarkusApplication {
} }
/** /**
* The main function * The main function.
* Call the entrypoint directly. Used to run the program directly with Kotlin * Simply call the entrypoint.
* Used to run the program directly with Kotlin.
*/ */
fun main(args: Array<String>) { fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args) Quarkus.run(Main::class.java, *args)

View file

@ -10,18 +10,19 @@ import java.util.*
/** /**
* A wrapper around KafkaAdminClient to simplify its configuration. * A wrapper around KafkaAdminClient to simplify its configuration.
* @throws IllegalArgumentException the environment variables are not properly set. * @throws IllegalArgumentException the environment variables are not properly set.
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @see Admin * @see Admin
*/ */
class AdminUtils( class AdminUtils(
identifier: String = "admin" private val server: String,
private val identifier: String = "admin"
) { ) {
private val adminConfig = Properties().apply { private val adminConfig = Properties().apply {
// set the identifier // set the identifier
this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier
// set the server information // set the server information
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val admin = Admin.create(adminConfig) private val admin = Admin.create(adminConfig)

View file

@ -12,11 +12,13 @@ import java.util.*
/** /**
* A kafka converter. * A kafka converter.
* Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function.
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @param inputTopic the input kafka topic name * @param inputTopic the input kafka topic name
* @param outputTopic the output kafka topic name * @param outputTopic the output kafka topic name
*/ */
class Converter( class Converter(
private val server: String,
private val identifier: String = "converter", private val identifier: String = "converter",
private val inputTopic: String, private val inputTopic: String,
private val outputTopic: String, private val outputTopic: String,
@ -30,8 +32,7 @@ class Converter(
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name
// set the server information // set the server information
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
// define a topology for our streams model // define a topology for our streams model

View file

@ -4,24 +4,38 @@ package fr.faraphel.m1_pe_kafka.kafka
import org.apache.kafka.clients.consumer.* import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.DoubleDeserializer import org.apache.kafka.common.serialization.DoubleDeserializer
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import java.time.DateTimeException
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.Collections import java.util.Collections
import java.util.Properties import java.util.Properties
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration import kotlin.time.toJavaDuration
/** /**
* A basic consumer that print everything in the selected topic. * A basic consumer that print everything in the selected topic.
* @param server the kafka server address
* @param topic the topic to get the records from * @param topic the topic to get the records from
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @throws IllegalArgumentException the environment variables are not properly set. * @throws IllegalArgumentException the environment variables are not properly set.
*/ */
class PrintConsumer( class PrintConsumer(
private val server: String,
private val identifier: String = "consumer", private val identifier: String = "consumer",
private val topic: String, private val topic: String,
) { ) : Thread() {
companion object {
// create a formatter to convert the timestamp to a string
private val timeFormatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault())
}
private val properties: Properties = Properties().apply { private val properties: Properties = Properties().apply {
// identifier // identifier
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
@ -29,8 +43,7 @@ class PrintConsumer(
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
// the server information // the server information
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val consumer = KafkaConsumer<String, Double>(this.properties).apply { private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
@ -48,7 +61,10 @@ class PrintConsumer(
// print them with their timestamp and content // print them with their timestamp and content
messages.forEach { message -> messages.forEach { message ->
println("[${message.timestamp()}] ${message.value()}") // format the time of the message to a proper string
val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
// print the value
println("[${time}] ${message.value()}°F")
} }
} }
@ -59,4 +75,10 @@ class PrintConsumer(
fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) { fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) {
while (true) this.consume(timeout) while (true) this.consume(timeout)
} }
/**
* Thread entrypoint
* @see consumeForever
*/
override fun run() = this.consumeForever()
} }

View file

@ -17,17 +17,19 @@ import kotlin.time.toJavaDuration
/** /**
* A kafka temperature producer. * A kafka temperature producer.
* Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com)
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @param latitude the latitude of the temperature to fetch * @param latitude the latitude of the temperature to fetch
* @param longitude the longitude of the temperature to fetch * @param longitude the longitude of the temperature to fetch
* @param topic the kafka topic to publish the data to * @param topic the kafka topic to publish the data to
*/ */
class TemperatureProducer( class TemperatureProducer(
private val server: String,
private val identifier: String = "consumer", private val identifier: String = "consumer",
private val latitude: Double, private val latitude: Double,
private val longitude: Double, private val longitude: Double,
private val topic: String, private val topic: String,
) { ) : Thread() {
companion object { companion object {
private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API
.scheme("https") // use the https protocol .scheme("https") // use the https protocol
@ -46,8 +48,7 @@ class TemperatureProducer(
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name
// set the server information // set the server information
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer
@ -101,7 +102,13 @@ class TemperatureProducer(
// produce a data // produce a data
this.produce() this.produce()
// wait for the cooldown // wait for the cooldown
Thread.sleep(frequency) sleep(frequency)
} }
} }
/**
* Thread entrypoint
* @see produceForever
*/
override fun run() = this.produceForever()
} }