M1-PE-Kafka/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt

93 lines
3.2 KiB
Kotlin

package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.error.MissingEnvironmentException
import fr.faraphel.m1_pe_kafka.kafka.AdminUtils
import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer
import fr.faraphel.m1_pe_kafka.kafka.Converter
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
import fr.faraphel.m1_pe_kafka.utils.celsius
import io.quarkus.runtime.Quarkus
import io.quarkus.runtime.QuarkusApplication
import io.quarkus.runtime.annotations.QuarkusMain
/**
* The main class.
* Contains the entrypoint of the program.
*/
@QuarkusMain
class Main : QuarkusApplication {
/**
* The entrypoint of the program
* @param args command line arguments
* @return the result code of the program
* @throws MissingEnvironmentException a required environment variable from the configuration is missing
*/
override fun run(vararg args: String?): Int {
// get the kafka server address
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
?: throw MissingEnvironmentException("KAFKA_BOOTSTRAP_SERVERS")
// get the topics name
val topicTemperatureCelsius: String = System.getenv("TOPIC_TEMPERATURE_CELSIUS")
?: "temperature-celsius"
val topicTemperatureFahrenheit: String = System.getenv("TOPIC_TEMPERATURE_FAHRENHEIT")
?: "temperature-fahrenheit"
// create an admin object
val admin = AdminUtils(kafkaServer)
// create our topics
admin.createTopics(topicTemperatureCelsius, topicTemperatureFahrenheit)
// get the location of the temperature to get
val location = System.getenv("TEMPERATURE_LOCATION")
?: throw MissingEnvironmentException("TEMPERATURE_LOCATION")
// parse the location to get the latitude and longitude
val (latitude, longitude) = location.split(",").map { coordinate -> coordinate.trim().toDouble() }
// create a producer that will generate temperature values based on the current temperature at Amiens (France).
val producer = TemperatureProducer(
server=kafkaServer,
latitude=latitude,
longitude=longitude,
topic=topicTemperatureCelsius
)
// create a converter that will convert values coming from the Celsius topic to the Fahrenheit topic
val converter = Converter(
server=kafkaServer,
inputTopic=topicTemperatureCelsius,
outputTopic=topicTemperatureFahrenheit
) { temperature -> temperature.celsius.asFahrenheit }
// create a consumer that will print the received values in the Fahrenheit topic
val consumer = PrintConsumer(
server=kafkaServer,
topic=topicTemperatureFahrenheit
)
// run all the clients
producer.start()
consumer.start()
converter.start()
// wait for them to finish before closing
producer.join()
consumer.join()
// close the converter if the others clients are done
converter.stop()
return 0
}
}
/**
* The main function.
* Simply call the entrypoint.
* Used to run the program directly with Kotlin.
*/
fun main(args: Array<String>) {
Quarkus.run(Main::class.java, *args)
}