Merge pull request 'implemented the different kafka clients' (#3) from clients into master

Reviewed-on: #3
This commit is contained in:
faraphel 2024-06-22 09:33:42 +00:00
commit 1cbfa574e1
10 changed files with 284 additions and 115 deletions

View file

@ -14,14 +14,22 @@ val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project val quarkusPlatformVersion: String by project
dependencies { dependencies {
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")) // language
implementation("io.quarkus:quarkus-kafka-client")
implementation("io.quarkus:quarkus-kotlin")
implementation("io.quarkus:quarkus-kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// quarkus
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-kotlin")
implementation("io.quarkus:quarkus-arc") implementation("io.quarkus:quarkus-arc")
testImplementation("io.quarkus:quarkus-junit5") testImplementation("io.quarkus:quarkus-junit5")
// libraries
implementation("io.quarkus:quarkus-kafka-client")
implementation("io.quarkus:quarkus-kafka-streams")
implementation("io.quarkus:quarkus-messaging-kafka") implementation("io.quarkus:quarkus-messaging-kafka")
implementation("com.squareup.okhttp3:okhttp:4.12.0")
implementation("com.google.code.gson:gson:2.8.9")
} }
group = "fr.faraphel" group = "fr.faraphel"

View file

@ -1,11 +1,11 @@
#Gradle properties # Gradle properties
quarkusPluginId=io.quarkus quarkusPluginId=io.quarkus
quarkusPluginVersion=3.11.1 quarkusPluginVersion=3.11.1
quarkusPlatformGroupId=io.quarkus.platform quarkusPlatformGroupId=io.quarkus.platform
quarkusPlatformArtifactId=quarkus-bom quarkusPlatformArtifactId=quarkus-bom
quarkusPlatformVersion=3.11.1 quarkusPlatformVersion=3.11.1
#Kafka Properties # Kafka Properties
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} quarkus.analytics.disabled=true
quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
quarkus.kafka-streams.application-id=my-quarkus-app kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}

View file

@ -1,45 +0,0 @@
package fr.faraphel.m1_pe_kafka
/**
* The consumer
*/
import org.apache.kafka.clients.consumer.*
import java.time.Duration
import java.util.Collections
import java.util.Properties
/**
* The consumer
*/
class Consumer {
private val properties: Properties = Properties()
private var consumer: KafkaConsumer<String, String>
private val topic: String = "mon_beau_topic"
init {
this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe"
this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
this.consumer = KafkaConsumer(properties);
}
fun main() {
this.consumer.subscribe(Collections.singletonList(topic));
// Afficher les messages reçu
while (true) {
val messages: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(500));
messages.forEach { message ->
println(">" + message.value())
}
}
}
}

View file

@ -1,27 +0,0 @@
package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.utils.Temperature
import fr.faraphel.m1_pe_kafka.utils.celsius
/**
* The main function
*/
fun main() {
println("Debut du programme")
val temperature: Temperature = 0.celsius
println(temperature.kelvin)
val producer = Producer()
val consumer = Consumer()
val thread1 = Thread { producer.main() }
val thread2 = Thread { consumer.main() }
thread1.start()
thread2.start()
thread1.join()
thread2.join()
}

View file

@ -1,35 +0,0 @@
package fr.faraphel.m1_pe_kafka
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties
/**
* The producer
*/
class Producer {
private val properties: Properties = Properties()
private var producer: KafkaProducer<String, String>
private val topic: String = "Température_Celsius"
init {
this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
this.producer = KafkaProducer(properties)
}
fun main() {
while (true) {
// create a new record
val record: ProducerRecord<String, String> = ProducerRecord(this.topic, "bonjour el mundo")
// send the record
this.producer.send(record)
this.producer.flush()
}
}
}

View file

@ -0,0 +1,45 @@
package fr.faraphel.m1_pe_kafka.kafka
/**
* The consumer
*/
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.DoubleDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Collections
import java.util.Properties
/**
* The consumer
*/
class Consumer(
private val topic: String
) {
private val properties: Properties = Properties().apply {
this[ConsumerConfig.GROUP_ID_CONFIG] = "consumer-group"
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
this.subscribe(Collections.singletonList(topic))
}
fun consume() {
val messages: ConsumerRecords<String, Double> = this.consumer.poll(Duration.ofMillis(500));
messages.forEach { message ->
println(">" + message.value())
}
}
fun consumeForever() {
while (true) this.consume()
}
}

View file

@ -0,0 +1,62 @@
package fr.faraphel.m1_pe_kafka.kafka
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import java.util.*
/**
* A kafka converter.
* Take a value from a topic and convert it into another value in a second topic thanks to a conversion function.
* @param inputTopic the input kafka topic name
* @param outputTopic the output kafka topic name
*/
class Converter(
private val inputTopic: String,
private val outputTopic: String,
) {
private val properties = Properties().apply { ///< the kafka properties
this[StreamsConfig.APPLICATION_ID_CONFIG] = "converter"
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val topologyBuilder = StreamsBuilder().apply { ///< the topology builder
// input
val inputStream: KStream<String, Double> = this.stream(inputTopic) ///< the input stream
// output
val outputStream: KStream<String, Double> = inputStream ///< the output stream
.mapValues { value -> value * 2 } // convert the values of the pair stream with this function | TODO(Faraphel): use a custom function
outputStream.to(outputTopic) // set the output of the stream to this topic
}
private val topology: Topology = this.topologyBuilder.build() ///< the stream topology
private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) ///< the stream
/**
* Start the conversion process
*/
fun start() = streams.start()
/**
* Pause the conversion process
*/
fun pause() = streams.pause()
/**
* Resume a paused conversion process
*/
fun resume() = streams.resume()
/**
* Stop a conversion process
*/
fun stop() = streams.close()
}

View file

@ -0,0 +1,100 @@
package fr.faraphel.m1_pe_kafka.kafka
import com.google.gson.Gson
import com.google.gson.JsonObject
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.DoubleSerializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
/**
* A kafka temperature producer.
* Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com)
* @param latitude the latitude of the temperature to fetch
* @param longitude the longitude of the temperature to fetch
* @param topic the kafka topic to publish the data to
*/
class TemperatureProducer(
private val latitude: Double,
private val longitude: Double,
private val topic: String,
) {
companion object {
private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API
.scheme("https") // use the https protocol
.host("api.open-meteo.com") // the api url
.addPathSegment("v1") // version 1
.addPathSegment("forecast") // forecast section of the API
.addQueryParameter("current", "temperature") // request the current temperature
}
private val jsonParser: Gson = Gson() ///< the json parser
private val properties: Properties = Properties().apply { ///< the kafka producer properties
this[ProducerConfig.CLIENT_ID_CONFIG] = "producer"
// configure the serializers
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name
// configure the kafka server
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer
private val httpClient: okhttp3.OkHttpClient = okhttp3.OkHttpClient.Builder() ///< the HTTP client
.callTimeout(10.seconds.toJavaDuration())
.retryOnConnectionFailure(true)
.build()
private val apiUrl: okhttp3.HttpUrl = BASE_API_BUILDER // the Weather API url
.addQueryParameter("latitude", this.latitude.toString())
.addQueryParameter("longitude", this.longitude.toString())
.build()
private val apiRequest: okhttp3.Request = okhttp3.Request.Builder() // to API request to retrieve the data
.get()
.url(this.apiUrl)
.build()
/**
* Produce the current temperature at the given position into a kafka server.
*/
fun produce() {
// request the current weather data
val response = httpClient.newCall(this.apiRequest).execute()
// check for a successful response
if (!response.isSuccessful)
TODO("Faraphel: throw an error")
// parse the response into a map of string to string
val data: JsonObject = jsonParser.fromJson(response.body!!.string(), JsonObject::class.java)
// access the current temperature
val currentTemperature: Double = data["current"].asJsonObject["temperature"].asDouble
// create a new record
val record: ProducerRecord<String, Double> = ProducerRecord(this.topic, currentTemperature)
// send the record
this.kafkaProducer.send(record)
}
/**
* indefinitely produce temperature data.
* @param frequency the refresh rate of the data
*/
fun produceForever(frequency: Duration = 60.seconds) {
while (true) {
// produce a data
this.produce()
// wait for the cooldown
Thread.sleep(frequency.toJavaDuration())
}
}
}

View file

@ -0,0 +1,60 @@
package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.kafka.Consumer
import fr.faraphel.m1_pe_kafka.kafka.Converter
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
import io.quarkus.runtime.Quarkus
import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain
import okhttp3.internal.wait
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
/**
* The entrypoint of the program
*/
@QuarkusMain
class Main : QuarkusApplication {
override fun run(vararg args: String?): Int {
println("starting...")
val adminConfig = Properties().apply {
this[AdminClientConfig.CLIENT_ID_CONFIG] = "main"
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
}
val admin = AdminClient.create(adminConfig)
admin.createTopics(mutableListOf(
NewTopic("temperature-celsius", 1, 1),
NewTopic("temperature-fahrenheit", 1, 1),
)).all().get()
val producer = TemperatureProducer(49.9, 2.3, "temperature-celsius")
val converter = Converter("temperature-celsius", "temperature-fahrenheit")
val consumer = Consumer("temperature-fahrenheit")
val threadProducer = Thread { producer.produceForever() }
val threadConsumer = Thread { consumer.consumeForever() }
threadConsumer.start()
converter.start()
threadProducer.start()
threadConsumer.join()
threadProducer.join()
converter.stop()
return 0
}
}
/**
* The main function
* Call the entrypoint directly. Used to run the program directly with Kotlin
*/
fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args)
}

View file

@ -0,0 +1 @@
quarkus.kafka-streams.application-id=fr.faraphel.m1_pe_kafka