From 5174d1bf3e6e2b413d3e5f68ab37b176c442c6a8 Mon Sep 17 00:00:00 2001 From: Faraphel Date: Tue, 2 Jul 2024 10:41:00 +0200 Subject: [PATCH] added a REST API to access the current temperature --- README.md | 4 +-- build.gradle.kts | 2 +- docker-compose.yaml | 8 +++++ .../kotlin/fr/faraphel/m1_pe_kafka/Main.kt | 22 +++++++++++-- .../kafka/{PrintConsumer.kt => Consumer.kt} | 25 +++------------ .../faraphel/m1_pe_kafka/rest/PingEndpoint.kt | 20 ++++++++++++ .../m1_pe_kafka/rest/TemperatureEndpoint.kt | 32 +++++++++++++++++++ 7 files changed, 87 insertions(+), 26 deletions(-) rename src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/{PrintConsumer.kt => Consumer.kt} (73%) create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/TemperatureEndpoint.kt diff --git a/README.md b/README.md index b16cc56..94d59e9 100644 --- a/README.md +++ b/README.md @@ -40,8 +40,8 @@ The container `application` can be easily modified with the following environmen | TEMPERATURE_LOCATION | true | \, \ | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates where to get the temperatures from | ## Expectation -The `application` container shall print the current temperature at the selected place in -Fahrenheit every minute. +The `application` container shall print the current temperature at the selected place in Fahrenheit every minute. +You can also access this value with the REST api at the `http://localhost:8080/temperature` endpoint. ## References The project use the [Open-Meteo API](https://open-meteo.com/) to fetch the current temperature at the diff --git a/build.gradle.kts b/build.gradle.kts index 0931aae..1468849 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -27,9 +27,9 @@ dependencies { implementation("io.quarkus:quarkus-kafka-client") implementation("io.quarkus:quarkus-kafka-streams") implementation("io.quarkus:quarkus-messaging-kafka") + implementation("io.quarkus:quarkus-rest") implementation("com.squareup.okhttp3:okhttp:4.12.0") implementation("com.google.code.gson:gson:2.8.9") - } group = "fr.faraphel" diff --git a/docker-compose.yaml b/docker-compose.yaml index a020af1..c84c84b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -32,9 +32,17 @@ services: build: context: . dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + ports: + - "8080:8080" environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 TEMPERATURE_LOCATION: 49.9, 2.3 + healthcheck: + test: curl --fail http://localhost:8080/ping + start_period: 10s + timeout: 5s + interval: 60s + retries: 3 networks: - kafka depends_on: diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt index 2492e67..b1f0dfb 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt @@ -2,13 +2,17 @@ package fr.faraphel.m1_pe_kafka import fr.faraphel.m1_pe_kafka.error.MissingEnvironmentException import fr.faraphel.m1_pe_kafka.kafka.AdminUtils -import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer +import fr.faraphel.m1_pe_kafka.kafka.Consumer import fr.faraphel.m1_pe_kafka.kafka.Converter import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer +import fr.faraphel.m1_pe_kafka.rest.TemperatureEndpoint import fr.faraphel.m1_pe_kafka.utils.celsius import io.quarkus.runtime.Quarkus import io.quarkus.runtime.QuarkusApplication import io.quarkus.runtime.annotations.QuarkusMain +import java.time.Instant +import java.time.ZoneId +import java.time.format.DateTimeFormatter /** @@ -24,6 +28,11 @@ class Main : QuarkusApplication { * @throws MissingEnvironmentException a required environment variable from the configuration is missing */ override fun run(vararg args: String?): Int { + // create a time formatter + val timeFormatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()) + // get the kafka server address val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS") ?: throw MissingEnvironmentException("KAFKA_BOOTSTRAP_SERVERS") @@ -62,10 +71,17 @@ class Main : QuarkusApplication { ) { temperature -> temperature.celsius.asFahrenheit } // create a consumer that will print the received values in the Fahrenheit topic - val consumer = PrintConsumer( + val consumer = Consumer( server=kafkaServer, topic=topicTemperatureFahrenheit - ) + ) { message -> + // format the time of the message to a proper string + val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp())) + // print the value + println("[${time}] ${message.value()}°F") + // update the value for the API + TemperatureEndpoint.setTemperature(message.value()) + } // run all the clients producer.start() diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt similarity index 73% rename from src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt rename to src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt index 912501d..ba318f8 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/PrintConsumer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/kafka/Consumer.kt @@ -4,37 +4,27 @@ package fr.faraphel.m1_pe_kafka.kafka import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.serialization.DoubleDeserializer import org.apache.kafka.common.serialization.StringDeserializer -import java.time.DateTimeException import java.time.Duration -import java.time.Instant -import java.time.ZoneId -import java.time.format.DateTimeFormatter import java.util.Collections import java.util.Properties -import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration /** - * A basic consumer that print everything in the selected topic. + * A basic consumer that call a function on all received messages * @param server the kafka server address * @param topic the topic to get the records from * @param identifier the kafka identifier for the configuration + * @param callback the function to call on all messages */ -class PrintConsumer( +class Consumer( private val server: String, private val identifier: String = "consumer", private val topic: String, + private val callback: (ConsumerRecord) -> Unit, ) : Thread() { - companion object { - // create a formatter to convert the timestamp to a string - private val timeFormatter = DateTimeFormatter - .ofPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(ZoneId.systemDefault()) - } - private val properties: Properties = Properties().apply { // identifier this[ConsumerConfig.GROUP_ID_CONFIG] = identifier @@ -59,12 +49,7 @@ class PrintConsumer( val messages: ConsumerRecords = this.consumer.poll(timeout) // print them with their timestamp and content - messages.forEach { message -> - // format the time of the message to a proper string - val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp())) - // print the value - println("[${time}] ${message.value()}°F") - } + messages.forEach { message -> this.callback(message) } } /** diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt new file mode 100644 index 0000000..03a93b8 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt @@ -0,0 +1,20 @@ +package fr.faraphel.m1_pe_kafka.rest + +import jakarta.ws.rs.GET +import jakarta.ws.rs.Path + + +/** + * A simple ping endpoint for the API + * Always answer "Pong!" + * Can be used to test if the API can be reached + */ +@Path("ping") +class PingEndpoint { + /** + * Handler for a GET request on this endpoint + * @return "Pong!" + */ + @GET + fun get(): String = "Pong!" +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/TemperatureEndpoint.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/TemperatureEndpoint.kt new file mode 100644 index 0000000..7472ba1 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/TemperatureEndpoint.kt @@ -0,0 +1,32 @@ +package fr.faraphel.m1_pe_kafka.rest + +import jakarta.ws.rs.GET +import jakarta.ws.rs.Path + + +/** + * This API endpoint return the latest temperature measured (in Fahrenheit) + */ +@Path("temperature") +class TemperatureEndpoint { + companion object { + private var temperature: Double? = null ///< the latest temperature value + + /** + * Setter to update the latest temperature value + * @param temperature the new temperature value + */ + fun setTemperature(temperature: Double) { + this.temperature = temperature + } + } + + /** + * Handler for a GET request on this endpoint + * @return the latest temperature measured + */ + @GET + fun get(): Double? { + return temperature + } +} -- 2.45.2