From fe39acd48a97e47a2ed95f19a481696003d13a9a Mon Sep 17 00:00:00 2001 From: Faraphel Date: Sat, 22 Jun 2024 11:27:30 +0200 Subject: [PATCH] simplified and improved the docker configuration added a public run IDEA task to easily test the project --- build.gradle.kts | 16 ++- gradle.properties | 8 +- .../fr/faraphel/m1_pe_kafka/Consumer.kt | 45 -------- .../kotlin/fr/faraphel/m1_pe_kafka/Main.kt | 27 ----- .../fr/faraphel/m1_pe_kafka/Producer.kt | 35 ------ .../fr/faraphel/m1_pe_kafka/kafka/Consumer.kt | 45 ++++++++ .../faraphel/m1_pe_kafka/kafka/Converter.kt | 62 +++++++++++ .../m1_pe_kafka/kafka/TemperatureProducer.kt | 100 ++++++++++++++++++ .../kotlin/fr/faraphel/m1_pe_kafka/main.kt | 60 +++++++++++ src/main/resources/application.properties | 1 + 10 files changed, 284 insertions(+), 115 deletions(-) delete mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt delete mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt delete mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt diff --git a/build.gradle.kts b/build.gradle.kts index cbc82c6..0931aae 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,14 +14,22 @@ val quarkusPlatformArtifactId: String by project val quarkusPlatformVersion: String by project dependencies { - implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")) - implementation("io.quarkus:quarkus-kafka-client") - implementation("io.quarkus:quarkus-kotlin") - implementation("io.quarkus:quarkus-kafka-streams") + // language implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") + + // quarkus + implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")) + implementation("io.quarkus:quarkus-kotlin") implementation("io.quarkus:quarkus-arc") testImplementation("io.quarkus:quarkus-junit5") + + // libraries + implementation("io.quarkus:quarkus-kafka-client") + implementation("io.quarkus:quarkus-kafka-streams") implementation("io.quarkus:quarkus-messaging-kafka") + implementation("com.squareup.okhttp3:okhttp:4.12.0") + implementation("com.google.code.gson:gson:2.8.9") + } group = "fr.faraphel" diff --git a/gradle.properties b/gradle.properties index 9966905..f391fe5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,11 +1,11 @@ -#Gradle properties +# Gradle properties quarkusPluginId=io.quarkus quarkusPluginVersion=3.11.1 quarkusPlatformGroupId=io.quarkus.platform quarkusPlatformArtifactId=quarkus-bom quarkusPlatformVersion=3.11.1 -#Kafka Properties -kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} +# Kafka Properties +quarkus.analytics.disabled=true quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} -quarkus.kafka-streams.application-id=my-quarkus-app +kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt deleted file mode 100644 index 199b4fd..0000000 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt +++ /dev/null @@ -1,45 +0,0 @@ -package fr.faraphel.m1_pe_kafka - -/** - * The consumer - */ - -import org.apache.kafka.clients.consumer.* - -import java.time.Duration -import java.util.Collections -import java.util.Properties - - -/** - * The consumer - */ -class Consumer { - private val properties: Properties = Properties() - private var consumer: KafkaConsumer - private val topic: String = "mon_beau_topic" - - init { - this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" - this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" - this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe" - this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" - this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") - - this.consumer = KafkaConsumer(properties); - } - - fun main() { - this.consumer.subscribe(Collections.singletonList(topic)); - - // Afficher les messages reçu - while (true) { - val messages: ConsumerRecords = consumer.poll(Duration.ofMillis(500)); - - messages.forEach { message -> - println(">" + message.value()) - } - } - } -} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt deleted file mode 100644 index 5072108..0000000 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt +++ /dev/null @@ -1,27 +0,0 @@ -package fr.faraphel.m1_pe_kafka - -import fr.faraphel.m1_pe_kafka.utils.Temperature -import fr.faraphel.m1_pe_kafka.utils.celsius - - -/** - * The main function - */ -fun main() { - println("Debut du programme") - - val temperature: Temperature = 0.celsius - println(temperature.kelvin) - - val producer = Producer() - val consumer = Consumer() - - val thread1 = Thread { producer.main() } - val thread2 = Thread { consumer.main() } - - thread1.start() - thread2.start() - - thread1.join() - thread2.join() -} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt deleted file mode 100644 index 837a32a..0000000 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt +++ /dev/null @@ -1,35 +0,0 @@ -package fr.faraphel.m1_pe_kafka - -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import java.util.Properties - - -/** - * The producer - */ -class Producer { - private val properties: Properties = Properties() - private var producer: KafkaProducer - private val topic: String = "Température_Celsius" - - init { - this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" - this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" - this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") - ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") - - this.producer = KafkaProducer(properties) - } - - fun main() { - while (true) { - // create a new record - val record: ProducerRecord = ProducerRecord(this.topic, "bonjour el mundo") - // send the record - this.producer.send(record) - this.producer.flush() - } - } -} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt new file mode 100644 index 0000000..5a71818 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt @@ -0,0 +1,45 @@ +package fr.faraphel.m1_pe_kafka.kafka + +/** + * The consumer + */ + +import org.apache.kafka.clients.consumer.* +import org.apache.kafka.common.serialization.DoubleDeserializer +import org.apache.kafka.common.serialization.StringDeserializer + +import java.time.Duration +import java.util.Collections +import java.util.Properties + + +/** + * The consumer + */ +class Consumer( + private val topic: String +) { + private val properties: Properties = Properties().apply { + this[ConsumerConfig.GROUP_ID_CONFIG] = "consumer-group" + this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name + this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + + private val consumer = KafkaConsumer(this.properties).apply { + this.subscribe(Collections.singletonList(topic)) + } + + fun consume() { + val messages: ConsumerRecords = this.consumer.poll(Duration.ofMillis(500)); + + messages.forEach { message -> + println(">" + message.value()) + } + } + + fun consumeForever() { + while (true) this.consume() + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt new file mode 100644 index 0000000..55ff439 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt @@ -0,0 +1,62 @@ +package fr.faraphel.m1_pe_kafka.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsBuilder +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.kstream.KStream +import java.util.* + + +/** + * A kafka converter. + * Take a value from a topic and convert it into another value in a second topic thanks to a conversion function. + * @param inputTopic the input kafka topic name + * @param outputTopic the output kafka topic name + */ +class Converter( + private val inputTopic: String, + private val outputTopic: String, +) { + + private val properties = Properties().apply { ///< the kafka properties + this[StreamsConfig.APPLICATION_ID_CONFIG] = "converter" + this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name + this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name + this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + + private val topologyBuilder = StreamsBuilder().apply { ///< the topology builder + // input + val inputStream: KStream = this.stream(inputTopic) ///< the input stream + // output + val outputStream: KStream = inputStream ///< the output stream + .mapValues { value -> value * 2 } // convert the values of the pair stream with this function | TODO(Faraphel): use a custom function + outputStream.to(outputTopic) // set the output of the stream to this topic + } + private val topology: Topology = this.topologyBuilder.build() ///< the stream topology + + private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) ///< the stream + + /** + * Start the conversion process + */ + fun start() = streams.start() + + /** + * Pause the conversion process + */ + fun pause() = streams.pause() + + /** + * Resume a paused conversion process + */ + fun resume() = streams.resume() + + /** + * Stop a conversion process + */ + fun stop() = streams.close() +} \ No newline at end of file diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt new file mode 100644 index 0000000..e1410e1 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/TemperatureProducer.kt @@ -0,0 +1,100 @@ +package fr.faraphel.m1_pe_kafka.kafka + +import com.google.gson.Gson +import com.google.gson.JsonObject +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.DoubleSerializer +import org.apache.kafka.common.serialization.StringSerializer +import java.util.Properties +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration + + +/** + * A kafka temperature producer. + * Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com) + * @param latitude the latitude of the temperature to fetch + * @param longitude the longitude of the temperature to fetch + * @param topic the kafka topic to publish the data to + */ +class TemperatureProducer( + private val latitude: Double, + private val longitude: Double, + private val topic: String, +) { + companion object { + private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API + .scheme("https") // use the https protocol + .host("api.open-meteo.com") // the api url + .addPathSegment("v1") // version 1 + .addPathSegment("forecast") // forecast section of the API + .addQueryParameter("current", "temperature") // request the current temperature + } + + private val jsonParser: Gson = Gson() ///< the json parser + + private val properties: Properties = Properties().apply { ///< the kafka producer properties + this[ProducerConfig.CLIENT_ID_CONFIG] = "producer" + // configure the serializers + this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name + this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name + // configure the kafka server + this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + + private val kafkaProducer: KafkaProducer = KafkaProducer(this.properties) ///< the kafka producer + + private val httpClient: okhttp3.OkHttpClient = okhttp3.OkHttpClient.Builder() ///< the HTTP client + .callTimeout(10.seconds.toJavaDuration()) + .retryOnConnectionFailure(true) + .build() + + private val apiUrl: okhttp3.HttpUrl = BASE_API_BUILDER // the Weather API url + .addQueryParameter("latitude", this.latitude.toString()) + .addQueryParameter("longitude", this.longitude.toString()) + .build() + + private val apiRequest: okhttp3.Request = okhttp3.Request.Builder() // to API request to retrieve the data + .get() + .url(this.apiUrl) + .build() + + + /** + * Produce the current temperature at the given position into a kafka server. + */ + fun produce() { + // request the current weather data + val response = httpClient.newCall(this.apiRequest).execute() + // check for a successful response + if (!response.isSuccessful) + TODO("Faraphel: throw an error") + + // parse the response into a map of string to string + val data: JsonObject = jsonParser.fromJson(response.body!!.string(), JsonObject::class.java) + // access the current temperature + val currentTemperature: Double = data["current"].asJsonObject["temperature"].asDouble + + // create a new record + val record: ProducerRecord = ProducerRecord(this.topic, currentTemperature) + // send the record + this.kafkaProducer.send(record) + } + + /** + * indefinitely produce temperature data. + * @param frequency the refresh rate of the data + */ + fun produceForever(frequency: Duration = 60.seconds) { + while (true) { + // produce a data + this.produce() + // wait for the cooldown + Thread.sleep(frequency.toJavaDuration()) + } + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt new file mode 100644 index 0000000..d350b90 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt @@ -0,0 +1,60 @@ +package fr.faraphel.m1_pe_kafka + +import fr.faraphel.m1_pe_kafka.kafka.Consumer +import fr.faraphel.m1_pe_kafka.kafka.Converter +import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer +import io.quarkus.runtime.Quarkus +import io.quarkus.runtime.QuarkusApplication +import io.quarkus.runtime.annotations.QuarkusMain +import okhttp3.internal.wait +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import java.util.* + + +/** + * The entrypoint of the program + */ +@QuarkusMain +class Main : QuarkusApplication { + override fun run(vararg args: String?): Int { + println("starting...") + + val adminConfig = Properties().apply { + this[AdminClientConfig.CLIENT_ID_CONFIG] = "main" + this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + } + val admin = AdminClient.create(adminConfig) + admin.createTopics(mutableListOf( + NewTopic("temperature-celsius", 1, 1), + NewTopic("temperature-fahrenheit", 1, 1), + )).all().get() + + val producer = TemperatureProducer(49.9, 2.3, "temperature-celsius") + val converter = Converter("temperature-celsius", "temperature-fahrenheit") + val consumer = Consumer("temperature-fahrenheit") + + val threadProducer = Thread { producer.produceForever() } + val threadConsumer = Thread { consumer.consumeForever() } + + threadConsumer.start() + converter.start() + threadProducer.start() + + threadConsumer.join() + threadProducer.join() + converter.stop() + + return 0 + } +} + +/** + * The main function + * Call the entrypoint directly. Used to run the program directly with Kotlin + */ +fun main(args: Array) { + Quarkus.run(Main::class.java, *args) +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e69de29..9817ace 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -0,0 +1 @@ +quarkus.kafka-streams.application-id=fr.faraphel.m1_pe_kafka