diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt new file mode 100644 index 0000000..4dea472 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt @@ -0,0 +1,71 @@ +package fr.faraphel.m1_pe_kafka + +import fr.faraphel.m1_pe_kafka.kafka.AdminUtils +import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer +import fr.faraphel.m1_pe_kafka.kafka.Converter +import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer +import fr.faraphel.m1_pe_kafka.utils.celsius +import io.quarkus.runtime.Quarkus +import io.quarkus.runtime.QuarkusApplication +import io.quarkus.runtime.annotations.QuarkusMain + + +val TOPIC_TEMPERATURE_CELSIUS: String = "temperature-celsius" +val TOPIC_TEMPERATURE_FAHRENHEIT: String = "temperature-fahrenheit" + + +/** + * The entrypoint of the program + */ +@QuarkusMain +class Main : QuarkusApplication { + override fun run(vararg args: String?): Int { + println("starting...") + + // create an admin object + val admin = AdminUtils() + // create our topics + admin.createTopics("temperature-celsius", "temperature-fahrenheit") + + // create a producer that will generate temperature values based on the current temperature at Amiens (France). + val producer = TemperatureProducer( + latitude=49.9, + longitude=2.3, + topic=TOPIC_TEMPERATURE_CELSIUS + ) + // create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic + val converter = Converter( + inputTopic=TOPIC_TEMPERATURE_CELSIUS, + outputTopic=TOPIC_TEMPERATURE_FAHRENHEIT + ) { temperature -> temperature.celsius.asFahrenheit } + // create a consumer that will print the received values in the Fahrenheit topic + val consumer = PrintConsumer( + topic=TOPIC_TEMPERATURE_FAHRENHEIT + ) + + // wrap the producer and the consumer in threads + // TODO(Faraphel): should be native to the producer and consumer class ? + val threadProducer = Thread { producer.produceForever() } + val threadConsumer = Thread { consumer.consumeForever() } + + // run all the clients + threadConsumer.start() + converter.start() + threadProducer.start() + + // wait for them to finish before closing + threadConsumer.join() + threadProducer.join() + converter.stop() + + return 0 + } +} + +/** + * The main function + * Call the entrypoint directly. Used to run the program directly with Kotlin + */ +fun main(args: Array) { + Quarkus.run(Main::class.java, *args) +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/error/http/HttpException.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/error/http/HttpException.kt new file mode 100644 index 0000000..8ef9daf --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/error/http/HttpException.kt @@ -0,0 +1,14 @@ +package fr.faraphel.m1_pe_kafka.error.http + +import java.io.IOException + + +/** + * An HTTP exception. + * OkHttp does not implement its own exception but instead simply return the value. + * This exception allow to use the try / catch syntax to handle these errors if wanted. + */ +class HttpException( + code: Int, + message: String, +) : IOException("HTTP Exception $code: $message") diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt new file mode 100644 index 0000000..b78ecb0 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/AdminUtils.kt @@ -0,0 +1,41 @@ +package fr.faraphel.m1_pe_kafka.kafka + +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.CreateTopicsResult +import org.apache.kafka.clients.admin.NewTopic +import java.util.* + + +/** + * A wrapper around KafkaAdminClient to simplify its configuration. + * @throws IllegalArgumentException the environment variables are not properly set. + * @param identifier the kafka identifier for the configuration + * @see Admin + */ +class AdminUtils( + identifier: String = "admin" +) { + private val adminConfig = Properties().apply { + // set the identifier + this[AdminClientConfig.CLIENT_ID_CONFIG] = identifier + // set the server information + this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + + private val admin = Admin.create(adminConfig) + + /** + * Create the topics in the kafka server. + * @param topics the names of the topics to create. + * @return the result of the operation. + * @see Admin.createTopics + */ + fun createTopics(vararg topics: String): CreateTopicsResult { + // convert the topics name into the corresponding operation + val operations = topics.map { topic -> NewTopic(topic, 1, 1) } + // run the command + return this.admin.createTopics(operations) + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt deleted file mode 100644 index 5a71818..0000000 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt +++ /dev/null @@ -1,45 +0,0 @@ -package fr.faraphel.m1_pe_kafka.kafka - -/** - * The consumer - */ - -import org.apache.kafka.clients.consumer.* -import org.apache.kafka.common.serialization.DoubleDeserializer -import org.apache.kafka.common.serialization.StringDeserializer - -import java.time.Duration -import java.util.Collections -import java.util.Properties - - -/** - * The consumer - */ -class Consumer( - private val topic: String -) { - private val properties: Properties = Properties().apply { - this[ConsumerConfig.GROUP_ID_CONFIG] = "consumer-group" - this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name - this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name - this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") - } - - private val consumer = KafkaConsumer(this.properties).apply { - this.subscribe(Collections.singletonList(topic)) - } - - fun consume() { - val messages: ConsumerRecords = this.consumer.poll(Duration.ofMillis(500)); - - messages.forEach { message -> - println(">" + message.value()) - } - } - - fun consumeForever() { - while (true) this.consume() - } -} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt index 55ff439..33cf019 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt @@ -12,51 +12,64 @@ import java.util.* /** * A kafka converter. * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. + * @param identifier the kafka identifier for the configuration * @param inputTopic the input kafka topic name * @param outputTopic the output kafka topic name */ class Converter( + private val identifier: String = "converter", private val inputTopic: String, private val outputTopic: String, + private val converter: (Double) -> Double, ) { - private val properties = Properties().apply { ///< the kafka properties - this[StreamsConfig.APPLICATION_ID_CONFIG] = "converter" + private val properties = Properties().apply { + // set the identifier + this[StreamsConfig.APPLICATION_ID_CONFIG] = identifier + // set the serializers this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name + // set the server information this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") } - private val topologyBuilder = StreamsBuilder().apply { ///< the topology builder - // input - val inputStream: KStream = this.stream(inputTopic) ///< the input stream - // output - val outputStream: KStream = inputStream ///< the output stream - .mapValues { value -> value * 2 } // convert the values of the pair stream with this function | TODO(Faraphel): use a custom function - outputStream.to(outputTopic) // set the output of the stream to this topic + // define a topology for our streams model + private val topologyBuilder = StreamsBuilder().apply { + // the input stream take values from the input topic + val inputStream: KStream = this.stream(inputTopic) + // define an output stream as a stream taking values from the input stream and applying the conversion function + val outputStream: KStream = inputStream.mapValues(converter) + // set the output of the stream to the configured setup + outputStream.to(outputTopic) } - private val topology: Topology = this.topologyBuilder.build() ///< the stream topology + // create this topology + private val topology: Topology = this.topologyBuilder.build() - private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) ///< the stream + // create the streams system from the topology and the properties + private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) /** * Start the conversion process + * @see KafkaStreams.start */ fun start() = streams.start() /** * Pause the conversion process + * @see KafkaStreams.pause */ fun pause() = streams.pause() /** * Resume a paused conversion process + * @see KafkaStreams.resume */ fun resume() = streams.resume() /** * Stop a conversion process + * @see KafkaStreams.close */ fun stop() = streams.close() -} \ No newline at end of file +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt new file mode 100644 index 0000000..ac7687b --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt @@ -0,0 +1,62 @@ +package fr.faraphel.m1_pe_kafka.kafka + + +import org.apache.kafka.clients.consumer.* +import org.apache.kafka.common.serialization.DoubleDeserializer +import org.apache.kafka.common.serialization.StringDeserializer + +import java.time.Duration +import java.util.Collections +import java.util.Properties +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration + + +/** + * A basic consumer that print everything in the selected topic. + * @param topic the topic to get the records from + * @param identifier the kafka identifier for the configuration + * @throws IllegalArgumentException the environment variables are not properly set. + */ +class PrintConsumer( + private val identifier: String = "consumer", + private val topic: String, +) { + private val properties: Properties = Properties().apply { + // identifier + this[ConsumerConfig.GROUP_ID_CONFIG] = identifier + // serializers + this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name + // the server information + this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + + private val consumer = KafkaConsumer(this.properties).apply { + // subscribe this consumer to the selected topic to receive its messages + this.subscribe(Collections.singletonList(topic)) + } + + /** + * Look for new messages in the selected topic and display them. + * @param timeout the timeout for the messages to be read + */ + fun consume(timeout: Duration = 1.seconds.toJavaDuration()) { + // receive the new messages + val messages: ConsumerRecords = this.consumer.poll(timeout) + + // print them with their timestamp and content + messages.forEach { message -> + println("[${message.timestamp()}] ${message.value()}") + } + } + + /** + * Look indefinitely for new messages in the selected topic and display them. + * @param timeout the timeout for the messages to be read + */ + fun consumeForever(timeout: Duration = 1.seconds.toJavaDuration()) { + while (true) this.consume(timeout) + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt index e1410e1..7604dd1 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt @@ -2,13 +2,14 @@ package fr.faraphel.m1_pe_kafka.kafka import com.google.gson.Gson import com.google.gson.JsonObject +import fr.faraphel.m1_pe_kafka.error.http.HttpException import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.DoubleSerializer import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties -import kotlin.time.Duration +import java.time.Duration import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -16,11 +17,13 @@ import kotlin.time.toJavaDuration /** * A kafka temperature producer. * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) + * @param identifier the kafka identifier for the configuration * @param latitude the latitude of the temperature to fetch * @param longitude the longitude of the temperature to fetch * @param topic the kafka topic to publish the data to */ class TemperatureProducer( + private val identifier: String = "consumer", private val latitude: Double, private val longitude: Double, private val topic: String, @@ -37,11 +40,12 @@ class TemperatureProducer( private val jsonParser: Gson = Gson() ///< the json parser private val properties: Properties = Properties().apply { ///< the kafka producer properties - this[ProducerConfig.CLIENT_ID_CONFIG] = "producer" - // configure the serializers + // set the identifier + this[ProducerConfig.CLIENT_ID_CONFIG] = identifier + // set the serializers this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name - // configure the kafka server + // set the server information this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") } @@ -66,13 +70,15 @@ class TemperatureProducer( /** * Produce the current temperature at the given position into a kafka server. + * @throws HttpException the API call encountered an HTTP error. */ fun produce() { // request the current weather data val response = httpClient.newCall(this.apiRequest).execute() // check for a successful response if (!response.isSuccessful) - TODO("Faraphel: throw an error") + // in case of error, raise a http exception + throw HttpException(response.code, response.message) // parse the response into a map of string to string val data: JsonObject = jsonParser.fromJson(response.body!!.string(), JsonObject::class.java) @@ -88,13 +94,14 @@ class TemperatureProducer( /** * indefinitely produce temperature data. * @param frequency the refresh rate of the data + * @throws HttpException the API encountered an HTTP error. */ - fun produceForever(frequency: Duration = 60.seconds) { + fun produceForever(frequency: Duration = 60.seconds.toJavaDuration()) { while (true) { // produce a data this.produce() // wait for the cooldown - Thread.sleep(frequency.toJavaDuration()) + Thread.sleep(frequency) } } } diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt deleted file mode 100644 index d350b90..0000000 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt +++ /dev/null @@ -1,60 +0,0 @@ -package fr.faraphel.m1_pe_kafka - -import fr.faraphel.m1_pe_kafka.kafka.Consumer -import fr.faraphel.m1_pe_kafka.kafka.Converter -import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer -import io.quarkus.runtime.Quarkus -import io.quarkus.runtime.QuarkusApplication -import io.quarkus.runtime.annotations.QuarkusMain -import okhttp3.internal.wait -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.NewTopic -import java.util.* - - -/** - * The entrypoint of the program - */ -@QuarkusMain -class Main : QuarkusApplication { - override fun run(vararg args: String?): Int { - println("starting...") - - val adminConfig = Properties().apply { - this[AdminClientConfig.CLIENT_ID_CONFIG] = "main" - this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") - } - val admin = AdminClient.create(adminConfig) - admin.createTopics(mutableListOf( - NewTopic("temperature-celsius", 1, 1), - NewTopic("temperature-fahrenheit", 1, 1), - )).all().get() - - val producer = TemperatureProducer(49.9, 2.3, "temperature-celsius") - val converter = Converter("temperature-celsius", "temperature-fahrenheit") - val consumer = Consumer("temperature-fahrenheit") - - val threadProducer = Thread { producer.produceForever() } - val threadConsumer = Thread { consumer.consumeForever() } - - threadConsumer.start() - converter.start() - threadProducer.start() - - threadConsumer.join() - threadProducer.join() - converter.stop() - - return 0 - } -} - -/** - * The main function - * Call the entrypoint directly. Used to run the program directly with Kotlin - */ -fun main(args: Array) { - Quarkus.run(Main::class.java, *args) -} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/Temperature.kt similarity index 89% rename from src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt rename to src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/Temperature.kt index 43a0180..c06da1b 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/Temperature.kt @@ -10,13 +10,13 @@ class Temperature( private val value: Double ) { // convert Temperature to Double (Kelvin) - val kelvin: Double + val asKelvin: Double get() = this.value - // convert Temperature to Double (Celcius) - val celcius: Double + // convert Temperature to Double (Celsius) + val asCelsius: Double get() = this.value - 275.15 // convert Temperature to Double (Fahrenheit) - val fahrenheit: Double + val asFahrenheit: Double get() = ((this.value - 273.15) * 1.8) + 32 }