simplified and improved the docker configuration
added a public run IDEA task to easily test the project
This commit is contained in:
parent
90d64434a9
commit
fe39acd48a
10 changed files with 284 additions and 115 deletions
|
@ -14,14 +14,22 @@ val quarkusPlatformArtifactId: String by project
|
|||
val quarkusPlatformVersion: String by project
|
||||
|
||||
dependencies {
|
||||
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
|
||||
implementation("io.quarkus:quarkus-kafka-client")
|
||||
implementation("io.quarkus:quarkus-kotlin")
|
||||
implementation("io.quarkus:quarkus-kafka-streams")
|
||||
// language
|
||||
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
|
||||
|
||||
// quarkus
|
||||
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
|
||||
implementation("io.quarkus:quarkus-kotlin")
|
||||
implementation("io.quarkus:quarkus-arc")
|
||||
testImplementation("io.quarkus:quarkus-junit5")
|
||||
|
||||
// libraries
|
||||
implementation("io.quarkus:quarkus-kafka-client")
|
||||
implementation("io.quarkus:quarkus-kafka-streams")
|
||||
implementation("io.quarkus:quarkus-messaging-kafka")
|
||||
implementation("com.squareup.okhttp3:okhttp:4.12.0")
|
||||
implementation("com.google.code.gson:gson:2.8.9")
|
||||
|
||||
}
|
||||
|
||||
group = "fr.faraphel"
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
#Gradle properties
|
||||
# Gradle properties
|
||||
quarkusPluginId=io.quarkus
|
||||
quarkusPluginVersion=3.11.1
|
||||
quarkusPlatformGroupId=io.quarkus.platform
|
||||
quarkusPlatformArtifactId=quarkus-bom
|
||||
quarkusPlatformVersion=3.11.1
|
||||
|
||||
#Kafka Properties
|
||||
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
|
||||
# Kafka Properties
|
||||
quarkus.analytics.disabled=true
|
||||
quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
|
||||
quarkus.kafka-streams.application-id=my-quarkus-app
|
||||
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
package fr.faraphel.m1_pe_kafka
|
||||
|
||||
/**
|
||||
* The consumer
|
||||
*/
|
||||
|
||||
import org.apache.kafka.clients.consumer.*
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.Collections
|
||||
import java.util.Properties
|
||||
|
||||
|
||||
/**
|
||||
* The consumer
|
||||
*/
|
||||
class Consumer {
|
||||
private val properties: Properties = Properties()
|
||||
private var consumer: KafkaConsumer<String, String>
|
||||
private val topic: String = "mon_beau_topic"
|
||||
|
||||
init {
|
||||
this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
|
||||
this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
|
||||
this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe"
|
||||
this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
|
||||
this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
|
||||
this.consumer = KafkaConsumer(properties);
|
||||
}
|
||||
|
||||
fun main() {
|
||||
this.consumer.subscribe(Collections.singletonList(topic));
|
||||
|
||||
// Afficher les messages reçu
|
||||
while (true) {
|
||||
val messages: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(500));
|
||||
|
||||
messages.forEach { message ->
|
||||
println(">" + message.value())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
package fr.faraphel.m1_pe_kafka
|
||||
|
||||
import fr.faraphel.m1_pe_kafka.utils.Temperature
|
||||
import fr.faraphel.m1_pe_kafka.utils.celsius
|
||||
|
||||
|
||||
/**
|
||||
* The main function
|
||||
*/
|
||||
fun main() {
|
||||
println("Debut du programme")
|
||||
|
||||
val temperature: Temperature = 0.celsius
|
||||
println(temperature.kelvin)
|
||||
|
||||
val producer = Producer()
|
||||
val consumer = Consumer()
|
||||
|
||||
val thread1 = Thread { producer.main() }
|
||||
val thread2 = Thread { consumer.main() }
|
||||
|
||||
thread1.start()
|
||||
thread2.start()
|
||||
|
||||
thread1.join()
|
||||
thread2.join()
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
package fr.faraphel.m1_pe_kafka
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import java.util.Properties
|
||||
|
||||
|
||||
/**
|
||||
* The producer
|
||||
*/
|
||||
class Producer {
|
||||
private val properties: Properties = Properties()
|
||||
private var producer: KafkaProducer<String, String>
|
||||
private val topic: String = "Température_Celsius"
|
||||
|
||||
init {
|
||||
this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
|
||||
this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
|
||||
this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
|
||||
this.producer = KafkaProducer(properties)
|
||||
}
|
||||
|
||||
fun main() {
|
||||
while (true) {
|
||||
// create a new record
|
||||
val record: ProducerRecord<String, String> = ProducerRecord(this.topic, "bonjour el mundo")
|
||||
// send the record
|
||||
this.producer.send(record)
|
||||
this.producer.flush()
|
||||
}
|
||||
}
|
||||
}
|
45
src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt
Normal file
45
src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt
Normal file
|
@ -0,0 +1,45 @@
|
|||
package fr.faraphel.m1_pe_kafka.kafka
|
||||
|
||||
/**
|
||||
* The consumer
|
||||
*/
|
||||
|
||||
import org.apache.kafka.clients.consumer.*
|
||||
import org.apache.kafka.common.serialization.DoubleDeserializer
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.Collections
|
||||
import java.util.Properties
|
||||
|
||||
|
||||
/**
|
||||
* The consumer
|
||||
*/
|
||||
class Consumer(
|
||||
private val topic: String
|
||||
) {
|
||||
private val properties: Properties = Properties().apply {
|
||||
this[ConsumerConfig.GROUP_ID_CONFIG] = "consumer-group"
|
||||
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
|
||||
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DoubleDeserializer::class.java.name
|
||||
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
}
|
||||
|
||||
private val consumer = KafkaConsumer<String, Double>(this.properties).apply {
|
||||
this.subscribe(Collections.singletonList(topic))
|
||||
}
|
||||
|
||||
fun consume() {
|
||||
val messages: ConsumerRecords<String, Double> = this.consumer.poll(Duration.ofMillis(500));
|
||||
|
||||
messages.forEach { message ->
|
||||
println(">" + message.value())
|
||||
}
|
||||
}
|
||||
|
||||
fun consumeForever() {
|
||||
while (true) this.consume()
|
||||
}
|
||||
}
|
62
src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt
Normal file
62
src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Converter.kt
Normal file
|
@ -0,0 +1,62 @@
|
|||
package fr.faraphel.m1_pe_kafka.kafka
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes
|
||||
import org.apache.kafka.streams.KafkaStreams
|
||||
import org.apache.kafka.streams.StreamsBuilder
|
||||
import org.apache.kafka.streams.StreamsConfig
|
||||
import org.apache.kafka.streams.Topology
|
||||
import org.apache.kafka.streams.kstream.KStream
|
||||
import java.util.*
|
||||
|
||||
|
||||
/**
|
||||
* A kafka converter.
|
||||
* Take a value from a topic and convert it into another value in a second topic thanks to a conversion function.
|
||||
* @param inputTopic the input kafka topic name
|
||||
* @param outputTopic the output kafka topic name
|
||||
*/
|
||||
class Converter(
|
||||
private val inputTopic: String,
|
||||
private val outputTopic: String,
|
||||
) {
|
||||
|
||||
private val properties = Properties().apply { ///< the kafka properties
|
||||
this[StreamsConfig.APPLICATION_ID_CONFIG] = "converter"
|
||||
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.StringSerde::class.java.name
|
||||
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.DoubleSerde::class.java.name
|
||||
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
}
|
||||
|
||||
private val topologyBuilder = StreamsBuilder().apply { ///< the topology builder
|
||||
// input
|
||||
val inputStream: KStream<String, Double> = this.stream(inputTopic) ///< the input stream
|
||||
// output
|
||||
val outputStream: KStream<String, Double> = inputStream ///< the output stream
|
||||
.mapValues { value -> value * 2 } // convert the values of the pair stream with this function | TODO(Faraphel): use a custom function
|
||||
outputStream.to(outputTopic) // set the output of the stream to this topic
|
||||
}
|
||||
private val topology: Topology = this.topologyBuilder.build() ///< the stream topology
|
||||
|
||||
private val streams: KafkaStreams = KafkaStreams(this.topology, this.properties) ///< the stream
|
||||
|
||||
/**
|
||||
* Start the conversion process
|
||||
*/
|
||||
fun start() = streams.start()
|
||||
|
||||
/**
|
||||
* Pause the conversion process
|
||||
*/
|
||||
fun pause() = streams.pause()
|
||||
|
||||
/**
|
||||
* Resume a paused conversion process
|
||||
*/
|
||||
fun resume() = streams.resume()
|
||||
|
||||
/**
|
||||
* Stop a conversion process
|
||||
*/
|
||||
fun stop() = streams.close()
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package fr.faraphel.m1_pe_kafka.kafka
|
||||
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.JsonObject
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.serialization.DoubleSerializer
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import java.util.Properties
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
|
||||
/**
|
||||
* A kafka temperature producer.
|
||||
* Produced values are fetched from the [Open-Meteo API](https://api.open-meteo.com)
|
||||
* @param latitude the latitude of the temperature to fetch
|
||||
* @param longitude the longitude of the temperature to fetch
|
||||
* @param topic the kafka topic to publish the data to
|
||||
*/
|
||||
class TemperatureProducer(
|
||||
private val latitude: Double,
|
||||
private val longitude: Double,
|
||||
private val topic: String,
|
||||
) {
|
||||
companion object {
|
||||
private val BASE_API_BUILDER: okhttp3.HttpUrl.Builder = okhttp3.HttpUrl.Builder() ///< the Url builder for the API
|
||||
.scheme("https") // use the https protocol
|
||||
.host("api.open-meteo.com") // the api url
|
||||
.addPathSegment("v1") // version 1
|
||||
.addPathSegment("forecast") // forecast section of the API
|
||||
.addQueryParameter("current", "temperature") // request the current temperature
|
||||
}
|
||||
|
||||
private val jsonParser: Gson = Gson() ///< the json parser
|
||||
|
||||
private val properties: Properties = Properties().apply { ///< the kafka producer properties
|
||||
this[ProducerConfig.CLIENT_ID_CONFIG] = "producer"
|
||||
// configure the serializers
|
||||
this[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
|
||||
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = DoubleSerializer::class.java.name
|
||||
// configure the kafka server
|
||||
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
}
|
||||
|
||||
private val kafkaProducer: KafkaProducer<String, Double> = KafkaProducer(this.properties) ///< the kafka producer
|
||||
|
||||
private val httpClient: okhttp3.OkHttpClient = okhttp3.OkHttpClient.Builder() ///< the HTTP client
|
||||
.callTimeout(10.seconds.toJavaDuration())
|
||||
.retryOnConnectionFailure(true)
|
||||
.build()
|
||||
|
||||
private val apiUrl: okhttp3.HttpUrl = BASE_API_BUILDER // the Weather API url
|
||||
.addQueryParameter("latitude", this.latitude.toString())
|
||||
.addQueryParameter("longitude", this.longitude.toString())
|
||||
.build()
|
||||
|
||||
private val apiRequest: okhttp3.Request = okhttp3.Request.Builder() // to API request to retrieve the data
|
||||
.get()
|
||||
.url(this.apiUrl)
|
||||
.build()
|
||||
|
||||
|
||||
/**
|
||||
* Produce the current temperature at the given position into a kafka server.
|
||||
*/
|
||||
fun produce() {
|
||||
// request the current weather data
|
||||
val response = httpClient.newCall(this.apiRequest).execute()
|
||||
// check for a successful response
|
||||
if (!response.isSuccessful)
|
||||
TODO("Faraphel: throw an error")
|
||||
|
||||
// parse the response into a map of string to string
|
||||
val data: JsonObject = jsonParser.fromJson(response.body!!.string(), JsonObject::class.java)
|
||||
// access the current temperature
|
||||
val currentTemperature: Double = data["current"].asJsonObject["temperature"].asDouble
|
||||
|
||||
// create a new record
|
||||
val record: ProducerRecord<String, Double> = ProducerRecord(this.topic, currentTemperature)
|
||||
// send the record
|
||||
this.kafkaProducer.send(record)
|
||||
}
|
||||
|
||||
/**
|
||||
* indefinitely produce temperature data.
|
||||
* @param frequency the refresh rate of the data
|
||||
*/
|
||||
fun produceForever(frequency: Duration = 60.seconds) {
|
||||
while (true) {
|
||||
// produce a data
|
||||
this.produce()
|
||||
// wait for the cooldown
|
||||
Thread.sleep(frequency.toJavaDuration())
|
||||
}
|
||||
}
|
||||
}
|
60
src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt
Normal file
60
src/main/kotlin/fr/faraphel/m1_pe_kafka/main.kt
Normal file
|
@ -0,0 +1,60 @@
|
|||
package fr.faraphel.m1_pe_kafka
|
||||
|
||||
import fr.faraphel.m1_pe_kafka.kafka.Consumer
|
||||
import fr.faraphel.m1_pe_kafka.kafka.Converter
|
||||
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
|
||||
import io.quarkus.runtime.Quarkus
|
||||
import io.quarkus.runtime.QuarkusApplication
|
||||
import io.quarkus.runtime.annotations.QuarkusMain
|
||||
import okhttp3.internal.wait
|
||||
import org.apache.kafka.clients.admin.AdminClient
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig
|
||||
import org.apache.kafka.clients.admin.NewTopic
|
||||
import java.util.*
|
||||
|
||||
|
||||
/**
|
||||
* The entrypoint of the program
|
||||
*/
|
||||
@QuarkusMain
|
||||
class Main : QuarkusApplication {
|
||||
override fun run(vararg args: String?): Int {
|
||||
println("starting...")
|
||||
|
||||
val adminConfig = Properties().apply {
|
||||
this[AdminClientConfig.CLIENT_ID_CONFIG] = "main"
|
||||
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS")
|
||||
}
|
||||
val admin = AdminClient.create(adminConfig)
|
||||
admin.createTopics(mutableListOf(
|
||||
NewTopic("temperature-celsius", 1, 1),
|
||||
NewTopic("temperature-fahrenheit", 1, 1),
|
||||
)).all().get()
|
||||
|
||||
val producer = TemperatureProducer(49.9, 2.3, "temperature-celsius")
|
||||
val converter = Converter("temperature-celsius", "temperature-fahrenheit")
|
||||
val consumer = Consumer("temperature-fahrenheit")
|
||||
|
||||
val threadProducer = Thread { producer.produceForever() }
|
||||
val threadConsumer = Thread { consumer.consumeForever() }
|
||||
|
||||
threadConsumer.start()
|
||||
converter.start()
|
||||
threadProducer.start()
|
||||
|
||||
threadConsumer.join()
|
||||
threadProducer.join()
|
||||
converter.stop()
|
||||
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The main function
|
||||
* Call the entrypoint directly. Used to run the program directly with Kotlin
|
||||
*/
|
||||
fun main(args: Array<String>) {
|
||||
Quarkus.run(Main::class.java, *args)
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
quarkus.kafka-streams.application-id=fr.faraphel.m1_pe_kafka
|
Loading…
Reference in a new issue