Polishing the application #6

Merged
faraphel merged 4 commits from polishing into master 2024-06-23 14:53:22 +02:00
6 changed files with 90 additions and 39 deletions
Showing only changes of commit 009735dc63 - Show all commits

View file

@ -34,6 +34,7 @@ services:
dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
TEMPERATURE_LOCATION: 49.9, 2.3
networks: networks:
- kafka - kafka
depends_on: depends_on:

View file

@ -10,52 +10,71 @@ import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain import io.quarkus.runtime.annotations.QuarkusMain
val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius"
val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit"
/** /**
* The entrypoint of the program * The main class.
* Contains the entrypoint of the program.
*/ */
@QuarkusMain @QuarkusMain
class Main : QuarkusApplication { class Main : QuarkusApplication {
/**
* The entrypoint of the program
* @param args command line arguments
* @return the result code of the program
*/
override fun run(vararg args: String?): Int { override fun run(vararg args: String?): Int {
println("starting...") // get the kafka server address
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
// get the topics name
val topicTemperatureCelsius: String = System.getenv("TOPIC_TEMPERATURE_CELSIUS")
?: "temperature-celsius"
val topicTemperatureFahrenheit: String = System.getenv("TOPIC_TEMPERATURE_FAHRENHEIT")
?: "temperature-fahrenheit"
// create an admin object // create an admin object
val admin = AdminUtils() val admin = AdminUtils(kafkaServer)
// create our topics // create our topics
admin.createTopics("temperature-celsius", "temperature-fahrenheit") admin.createTopics(topicTemperatureCelsius, topicTemperatureFahrenheit)
// get the location of the temperature to get
val location = System.getenv("TEMPERATURE_LOCATION")
?: throw IllegalArgumentException("Missing environment variable: TEMPERATURE_LOCATION")
// parse the location to get the latitude and longitude
val (latitude, longitude) = location.split(",").map { coordinate -> coordinate.trim().toDouble() }
// create a producer that will generate temperature values based on the current temperature at Amiens (France). // create a producer that will generate temperature values based on the current temperature at Amiens (France).
val producer = TemperatureProducer( val producer = TemperatureProducer(
latitude=49.9, server=kafkaServer,
longitude=2.3, latitude=latitude,
topic=TOPIC_TEMPERATURE_CELSIUS longitude=longitude,
topic=topicTemperatureCelsius
) )
// create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic // create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic
val converter = Converter( val converter = Converter(
inputTopic=TOPIC_TEMPERATURE_CELSIUS, server=kafkaServer,
outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT inputTopic=topicTemperatureCelsius,
outputTopic=topicTemperatureFahrenheit
) { temperature -> temperature.celsius.asFahrenheit } ) { temperature -> temperature.celsius.asFahrenheit }
// create a consumer that will print the received values in the Fahrenheit topic // create a consumer that will print the received values in the Fahrenheit topic
val consumer = PrintConsumer( val consumer = PrintConsumer(
topic=TOPIC_TEMPERATURE_FAHRENHEIT server=kafkaServer,
topic=topicTemperatureFahrenheit
) )
// wrap the producer and the consumer in threads
// TODO(Faraphel): should be native to the producer and consumer class ?
val threadProducer = Thread { producer.produceForever() }
val threadConsumer = Thread { consumer.consumeForever() }
// run all the clients // run all the clients
threadConsumer.start() producer.start()
consumer.start()
converter.start() converter.start()
threadProducer.start()
// wait for them to finish before closing // wait for them to finish before closing
threadConsumer.join() producer.join()
threadProducer.join() consumer.join()
// close the converter if the others clients are done
converter.stop() converter.stop()
return 0 return 0
@ -63,8 +82,9 @@ class Main : QuarkusApplication {
} }
/** /**
* The main function * The main function.
* Call the entrypoint directly. Used to run the program directly with Kotlin * Simply call the entrypoint.
* Used to run the program directly with Kotlin.
*/ */
fun main(args: Array<String>) { fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args) Quarkus.run(Main::class.java, *args)

View file

@ -10,18 +10,19 @@ import java.util.*
/** /**
* A wrapper around KafkaAdminClient to simplify its configuration. * A wrapper around KafkaAdminClient to simplify its configuration.
* @throws IllegalArgumentException the environment variables are not properly set. * @throws IllegalArgumentException the environment variables are not properly set.
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @see Admin * @see Admin
*/ */
class AdminUtils( class AdminUtils(
identifier: String = "admin" private val server: String,
private val identifier: String = "admin"
) { ) {
private val adminConfig = Properties().apply { private val adminConfig = Properties().apply {
// set the identifier // set the identifier
this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier
// set the server information // set the server information
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val admin = Admin.create(adminConfig) private val admin = Admin.create(adminConfig)

View file

@ -12,11 +12,13 @@ import java.util.*
/** /**
* A kafka converter. * A kafka converter.
* Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function.
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @param inputTopic the input kafka topic name * @param inputTopic the input kafka topic name
* @param outputTopic the output kafka topic name * @param outputTopic the output kafka topic name
*/ */
class Converter( class Converter(
private val server: String,
private val identifier: String = "converter", private val identifier: String = "converter",
private val inputTopic: String, private val inputTopic: String,
private val outputTopic: String, private val outputTopic: String,
@ -30,8 +32,7 @@ class Converter(
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name
// set the server information // set the server information
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
// define a topology for our streams model // define a topology for our streams model

View file

@ -4,24 +4,38 @@ package fr.faraphel.m1_pe_kafka.kafka
import org.apache.kafka.clients.consumer.* import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.DoubleDeserializer import org.apache.kafka.common.serialization.DoubleDeserializer
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import java.time.DateTimeException
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.Collections import java.util.Collections
import java.util.Properties import java.util.Properties
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration import kotlin.time.toJavaDuration
/** /**
* A basic consumer that print everything in the selected topic. * A basic consumer that print everything in the selected topic.
* @param server the kafka server address
* @param topic the topic to get the records from * @param topic the topic to get the records from
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @throws IllegalArgumentException the environment variables are not properly set. * @throws IllegalArgumentException the environment variables are not properly set.
*/ */
class PrintConsumer( class PrintConsumer(
private val server: String,
private val identifier: String = "consumer", private val identifier: String = "consumer",
private val topic: String, private val topic: String,
) { ) : Thread() {
companion object {
// create a formatter to convert the timestamp to a string
private val timeFormatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault())
}
private val properties: Properties = Properties().apply { private val properties: Properties = Properties().apply {
// identifier // identifier
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
@ -29,8 +43,7 @@ class PrintConsumer(
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
// the server information // the server information
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val consumer = KafkaConsumer<String, Double>(this.properties).apply { private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
@ -48,8 +61,10 @@ class PrintConsumer(
// print them with their timestamp and content // print them with their timestamp and content
messages.forEach { message -> messages.forEach { message ->
// TODO(Faraphel): format the timestamp // format the time of the message to a proper string
println("[${message.timestamp()}] ${message.value()}") val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
// print the value
println("[${time}] ${message.value()}°F")
} }
} }
@ -60,4 +75,10 @@ class PrintConsumer(
fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) { fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) {
while (true) this.consume(timeout) while (true) this.consume(timeout)
} }
/**
* Thread entrypoint
* @see consumeForever
*/
override fun run() = this.consumeForever()
} }

View file

@ -17,17 +17,19 @@ import kotlin.time.toJavaDuration
/** /**
* A kafka temperature producer. * A kafka temperature producer.
* Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com)
* @param server the kafka server address
* @param identifier the kafka identifier for the configuration * @param identifier the kafka identifier for the configuration
* @param latitude the latitude of the temperature to fetch * @param latitude the latitude of the temperature to fetch
* @param longitude the longitude of the temperature to fetch * @param longitude the longitude of the temperature to fetch
* @param topic the kafka topic to publish the data to * @param topic the kafka topic to publish the data to
*/ */
class TemperatureProducer( class TemperatureProducer(
private val server: String,
private val identifier: String = "consumer", private val identifier: String = "consumer",
private val latitude: Double, private val latitude: Double,
private val longitude: Double, private val longitude: Double,
private val topic: String, private val topic: String,
) { ) : Thread() {
companion object { companion object {
private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API
.scheme("https") // use the https protocol .scheme("https") // use the https protocol
@ -46,8 +48,7 @@ class TemperatureProducer(
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name
// set the server information // set the server information
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
} }
private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer
@ -101,7 +102,13 @@ class TemperatureProducer(
// produce a data // produce a data
this.produce() this.produce()
// wait for the cooldown // wait for the cooldown
Thread.sleep(frequency) sleep(frequency)
} }
} }
/**
* Thread entrypoint
* @see produceForever
*/
override fun run() = this.produceForever()
} }