Merge pull request 'cleaned the code and made some structure more generic' (#4) from clean into master

Reviewed-on: #4
This commit is contained in:
faraphel 2024-06-22 17:08:16 +00:00
commit bc8d62eff6
9 changed files with 231 additions and 128 deletions

View file

@ -0,0 +1,71 @@
package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.kafka.AdminUtils
import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer
import fr.faraphel.m1_pe_kafka.kafka.Converter
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
import fr.faraphel.m1_pe_kafka.utils.celsius
import io.quarkus.runtime.Quarkus
import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain
val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius"
val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit"
/**
* The entrypoint of the program
*/
@QuarkusMain
class Main : QuarkusApplication {
override fun run(vararg args: String?): Int {
println("starting...")
// create an admin object
val admin = AdminUtils()
// create our topics
admin.createTopics("temperature-celsius", "temperature-fahrenheit")
// create a producer that will generate temperature values based on the current temperature at Amiens (France).
val producer = TemperatureProducer(
latitude=49.9,
longitude=2.3,
topic=TOPIC_TEMPERATURE_CELSIUS
)
// create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic
val converter = Converter(
inputTopic=TOPIC_TEMPERATURE_CELSIUS,
outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT
) { temperature -> temperature.celsius.asFahrenheit }
// create a consumer that will print the received values in the Fahrenheit topic
val consumer = PrintConsumer(
topic=TOPIC_TEMPERATURE_FAHRENHEIT
)
// wrap the producer and the consumer in threads
// TODO(Faraphel): should be native to the producer and consumer class ?
val threadProducer = Thread { producer.produceForever() }
val threadConsumer = Thread { consumer.consumeForever() }
// run all the clients
threadConsumer.start()
converter.start()
threadProducer.start()
// wait for them to finish before closing
threadConsumer.join()
threadProducer.join()
converter.stop()
return 0
}
}
/**
* The main function
* Call the entrypoint directly. Used to run the program directly with Kotlin
*/
fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args)
}

View file

@ -0,0 +1,14 @@
package fr.faraphel.m1_pe_kafka.error.http
import java.io.IOException
/**
* An HTTP exception.
* OkHttp does not implement its own exception but instead simply return the value.
* This exception allow to use the try / catch syntax to handle these errors if wanted.
*/
class HttpException(
code: Int,
message: String,
) : IOException("HTTP Exception $code: $message")

View file

@ -0,0 +1,41 @@
package fr.faraphel.m1_pe_kafka.kafka
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
/**
* A wrapper around KafkaAdminClient to simplify its configuration.
* @throws IllegalArgumentException the environment variables are not properly set.
* @param identifier the kafka identifier for the configuration
* @see Admin
*/
class AdminUtils(
identifier: String = "admin"
) {
private val adminConfig = Properties().apply {
// set the identifier
this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier
// set the server information
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val admin = Admin.create(adminConfig)
/**
* Create the topics in the kafka server.
* @param topics the names of the topics to create.
* @return the result of the operation.
* @see Admin.createTopics
*/
fun createTopics(vararg topics: String): CreateTopicsResult {
// convert the topics name into the corresponding operation
val operations = topics.map { topic -> NewTopic(topic, 1, 1) }
// run the command
return this.admin.createTopics(operations)
}
}

View file

@ -1,45 +0,0 @@
package fr.faraphel.m1_pe_kafka.kafka
/**
* The consumer
*/
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.DoubleDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Collections
import java.util.Properties
/**
* The consumer
*/
class Consumer(
private val topic: String
) {
private val properties: Properties = Properties().apply {
this[ConsumerConfig.GROUP_ID_CONFIG] = "consumer-group"
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
this.subscribe(Collections.singletonList(topic))
}
fun consume() {
val messages: ConsumerRecords<String, Double> = this.consumer.poll(Duration.ofMillis(500));
messages.forEach { message ->
println(">" + message.value())
}
}
fun consumeForever() {
while (true) this.consume()
}
}

View file

@ -12,51 +12,64 @@ import java.util.*
/**
* A kafka converter.
* Take a value from a topic and convert it into another value in a second topic thanks to a conversion function.
* @param identifier the kafka identifier for the configuration
* @param inputTopic the input kafka topic name
* @param outputTopic the output kafka topic name
*/
class Converter(
private val identifier: String = "converter",
private val inputTopic: String,
private val outputTopic: String,
private val converter: (Double) -> Double,
) {
private val properties = Properties().apply { ///< the kafka properties
this[StreamsConfig.APPLICATION_ID_CONFIG] = "converter"
private val properties = Properties().apply {
// set the identifier
this[StreamsConfig.APPLICATION_ID_CONFIG] = identifier
// set the serializers
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name
// set the server information
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val topologyBuilder = StreamsBuilder().apply { ///< the topology builder
// input
val inputStream: KStream<String, Double> = this.stream(inputTopic) ///< the input stream
// output
val outputStream: KStream<String, Double> = inputStream ///< the output stream
.mapValues { value -> value * 2 } // convert the values of the pair stream with this function | TODO(Faraphel): use a custom function
outputStream.to(outputTopic) // set the output of the stream to this topic
// define a topology for our streams model
private val topologyBuilder = StreamsBuilder().apply {
// the input stream take values from the input topic
val inputStream: KStream<String, Double> = this.stream(inputTopic)
// define an output stream as a stream taking values from the input stream and applying the conversion function
val outputStream: KStream<String, Double> = inputStream.mapValues(converter)
// set the output of the stream to the configured setup
outputStream.to(outputTopic)
}
private val topology: Topology = this.topologyBuilder.build() ///< the stream topology
// create this topology
private val topology: Topology = this.topologyBuilder.build()
private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) ///< the stream
// create the streams system from the topology and the properties
private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties)
/**
* Start the conversion process
* @see KafkaStreams.start
*/
fun start() = streams.start()
/**
* Pause the conversion process
* @see KafkaStreams.pause
*/
fun pause() = streams.pause()
/**
* Resume a paused conversion process
* @see KafkaStreams.resume
*/
fun resume() = streams.resume()
/**
* Stop a conversion process
* @see KafkaStreams.close
*/
fun stop() = streams.close()
}
}

View file

@ -0,0 +1,62 @@
package fr.faraphel.m1_pe_kafka.kafka
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.DoubleDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Collections
import java.util.Properties
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
/**
* A basic consumer that print everything in the selected topic.
* @param topic the topic to get the records from
* @param identifier the kafka identifier for the configuration
* @throws IllegalArgumentException the environment variables are not properly set.
*/
class PrintConsumer(
private val identifier: String = "consumer",
private val topic: String,
) {
private val properties: Properties = Properties().apply {
// identifier
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
// serializers
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
// the server information
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
// subscribe this consumer to the selected topic to receive its messages
this.subscribe(Collections.singletonList(topic))
}
/**
* Look for new messages in the selected topic and display them.
* @param timeout the timeout for the messages to be read
*/
fun consume(timeout: Duration = 1.seconds.toJavaDuration()) {
// receive the new messages
val messages: ConsumerRecords<String, Double> = this.consumer.poll(timeout)
// print them with their timestamp and content
messages.forEach { message ->
println("[${message.timestamp()}] ${message.value()}")
}
}
/**
* Look indefinitely for new messages in the selected topic and display them.
* @param timeout the timeout for the messages to be read
*/
fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) {
while (true) this.consume(timeout)
}
}

View file

@ -2,13 +2,14 @@ package fr.faraphel.m1_pe_kafka.kafka
import com.google.gson.Gson
import com.google.gson.JsonObject
import fr.faraphel.m1_pe_kafka.error.http.HttpException
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.DoubleSerializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
import kotlin.time.Duration
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@ -16,11 +17,13 @@ import kotlin.time.toJavaDuration
/**
* A kafka temperature producer.
* Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com)
* @param identifier the kafka identifier for the configuration
* @param latitude the latitude of the temperature to fetch
* @param longitude the longitude of the temperature to fetch
* @param topic the kafka topic to publish the data to
*/
class TemperatureProducer(
private val identifier: String = "consumer",
private val latitude: Double,
private val longitude: Double,
private val topic: String,
@ -37,11 +40,12 @@ class TemperatureProducer(
private val jsonParser: Gson = Gson() ///< the json parser
private val properties: Properties = Properties().apply { ///< the kafka producer properties
this[ProducerConfig.CLIENT_ID_CONFIG] = "producer"
// configure the serializers
// set the identifier
this[ProducerConfig.CLIENT_ID_CONFIG] = identifier
// set the serializers
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name
// configure the kafka server
// set the server information
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
@ -66,13 +70,15 @@ class TemperatureProducer(
/**
* Produce the current temperature at the given position into a kafka server.
* @throws HttpException the API call encountered an HTTP error.
*/
fun produce() {
// request the current weather data
val response = httpClient.newCall(this.apiRequest).execute()
// check for a successful response
if (!response.isSuccessful)
TODO("Faraphel: throw an error")
// in case of error, raise a http exception
throw HttpException(response.code, response.message)
// parse the response into a map of string to string
val data: JsonObject = jsonParser.fromJson(response.body!!.string(), JsonObject::class.java)
@ -88,13 +94,14 @@ class TemperatureProducer(
/**
* indefinitely produce temperature data.
* @param frequency the refresh rate of the data
* @throws HttpException the API encountered an HTTP error.
*/
fun produceForever(frequency: Duration = 60.seconds) {
fun produceForever(frequency: Duration = 60.seconds.toJavaDuration()) {
while (true) {
// produce a data
this.produce()
// wait for the cooldown
Thread.sleep(frequency.toJavaDuration())
Thread.sleep(frequency)
}
}
}

View file

@ -1,60 +0,0 @@
package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.kafka.Consumer
import fr.faraphel.m1_pe_kafka.kafka.Converter
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
import io.quarkus.runtime.Quarkus
import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain
import okhttp3.internal.wait
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
/**
* The entrypoint of the program
*/
@QuarkusMain
class Main : QuarkusApplication {
override fun run(vararg args: String?): Int {
println("starting...")
val adminConfig = Properties().apply {
this[AdminClientConfig.CLIENT_ID_CONFIG] = "main"
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
val admin = AdminClient.create(adminConfig)
admin.createTopics(mutableListOf(
NewTopic("temperature-celsius", 1, 1),
NewTopic("temperature-fahrenheit", 1, 1),
)).all().get()
val producer = TemperatureProducer(49.9, 2.3, "temperature-celsius")
val converter = Converter("temperature-celsius", "temperature-fahrenheit")
val consumer = Consumer("temperature-fahrenheit")
val threadProducer = Thread { producer.produceForever() }
val threadConsumer = Thread { consumer.consumeForever() }
threadConsumer.start()
converter.start()
threadProducer.start()
threadConsumer.join()
threadProducer.join()
converter.stop()
return 0
}
}
/**
* The main function
* Call the entrypoint directly. Used to run the program directly with Kotlin
*/
fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args)
}

View file

@ -10,13 +10,13 @@ class Temperature(
private val value: Double
) {
// convert Temperature to Double (Kelvin)
val kelvin: Double
val asKelvin: Double
get() = this.value
// convert Temperature to Double (Celcius)
val celcius: Double
// convert Temperature to Double (Celsius)
val asCelsius: Double
get() = this.value - 275.15
// convert Temperature to Double (Fahrenheit)
val fahrenheit: Double
val asFahrenheit: Double
get() = ((this.value - 273.15) * 1.8) + 32
}