Add a REST API to access the current temperature #13
7 changed files with 87 additions and 26 deletions
|
@ -40,8 +40,8 @@ The container `application` can be easily modified with the following environmen
|
|||
| TEMPERATURE_LOCATION | true | \<latitude>, \<longitude> | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates where to get the temperatures from |
|
||||
|
||||
## Expectation
|
||||
The `application` container shall print the current temperature at the selected place in
|
||||
Fahrenheit every minute.
|
||||
The `application` container shall print the current temperature at the selected place in Fahrenheit every minute.
|
||||
You can also access this value with the REST api at the `http://localhost:8080/temperature` endpoint.
|
||||
|
||||
## References
|
||||
The project use the [Open-Meteo API](https://open-meteo.com/) to fetch the current temperature at the
|
||||
|
|
|
@ -27,9 +27,9 @@ dependencies {
|
|||
implementation("io.quarkus:quarkus-kafka-client")
|
||||
implementation("io.quarkus:quarkus-kafka-streams")
|
||||
implementation("io.quarkus:quarkus-messaging-kafka")
|
||||
implementation("io.quarkus:quarkus-rest")
|
||||
implementation("com.squareup.okhttp3:okhttp:4.12.0")
|
||||
implementation("com.google.code.gson:gson:2.8.9")
|
||||
|
||||
}
|
||||
|
||||
group = "fr.faraphel"
|
||||
|
|
|
@ -32,9 +32,17 @@ services:
|
|||
build:
|
||||
context: .
|
||||
dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
|
||||
TEMPERATURE_LOCATION: 49.9, 2.3
|
||||
healthcheck:
|
||||
test: curl --fail http://localhost:8080/ping
|
||||
start_period: 10s
|
||||
timeout: 5s
|
||||
interval: 60s
|
||||
retries: 3
|
||||
networks:
|
||||
- kafka
|
||||
depends_on:
|
||||
|
|
|
@ -2,13 +2,17 @@ package fr.faraphel.m1_pe_kafka
|
|||
|
||||
import fr.faraphel.m1_pe_kafka.error.MissingEnvironmentException
|
||||
import fr.faraphel.m1_pe_kafka.kafka.AdminUtils
|
||||
import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer
|
||||
import fr.faraphel.m1_pe_kafka.kafka.Consumer
|
||||
import fr.faraphel.m1_pe_kafka.kafka.Converter
|
||||
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
|
||||
import fr.faraphel.m1_pe_kafka.rest.TemperatureEndpoint
|
||||
import fr.faraphel.m1_pe_kafka.utils.celsius
|
||||
import io.quarkus.runtime.Quarkus
|
||||
import io.quarkus.runtime.QuarkusApplication
|
||||
import io.quarkus.runtime.annotations.QuarkusMain
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
||||
|
||||
/**
|
||||
|
@ -24,6 +28,11 @@ class Main : QuarkusApplication {
|
|||
* @throws MissingEnvironmentException a required environment variable from the configuration is missing
|
||||
*/
|
||||
override fun run(vararg args: String?): Int {
|
||||
// create a time formatter
|
||||
val timeFormatter = DateTimeFormatter
|
||||
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||
.withZone(ZoneId.systemDefault())
|
||||
|
||||
// get the kafka server address
|
||||
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||
?: throw MissingEnvironmentException("KAFKA_BOOTSTRAP_SERVERS")
|
||||
|
@ -62,10 +71,17 @@ class Main : QuarkusApplication {
|
|||
) { temperature -> temperature.celsius.asFahrenheit }
|
||||
|
||||
// create a consumer that will print the received values in the Fahrenheit topic
|
||||
val consumer = PrintConsumer(
|
||||
val consumer = Consumer(
|
||||
server=kafkaServer,
|
||||
topic=topicTemperatureFahrenheit
|
||||
)
|
||||
) { message ->
|
||||
// format the time of the message to a proper string
|
||||
val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
|
||||
// print the value
|
||||
println("[${time}] ${message.value()}°F")
|
||||
// update the value for the API
|
||||
TemperatureEndpoint.setTemperature(message.value())
|
||||
}
|
||||
|
||||
// run all the clients
|
||||
producer.start()
|
||||
|
|
|
@ -4,37 +4,27 @@ package fr.faraphel.m1_pe_kafka.kafka
|
|||
import org.apache.kafka.clients.consumer.*
|
||||
import org.apache.kafka.common.serialization.DoubleDeserializer
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import java.time.DateTimeException
|
||||
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.Collections
|
||||
import java.util.Properties
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
|
||||
/**
|
||||
* A basic consumer that print everything in the selected topic.
|
||||
* A basic consumer that call a function on all received messages
|
||||
* @param server the kafka server address
|
||||
* @param topic the topic to get the records from
|
||||
* @param identifier the kafka identifier for the configuration
|
||||
* @param callback the function to call on all messages
|
||||
*/
|
||||
class PrintConsumer(
|
||||
class Consumer(
|
||||
private val server: String,
|
||||
private val identifier: String = "consumer",
|
||||
private val topic: String,
|
||||
private val callback: (ConsumerRecord<String, Double>) -> Unit,
|
||||
) : Thread() {
|
||||
companion object {
|
||||
// create a formatter to convert the timestamp to a string
|
||||
private val timeFormatter = DateTimeFormatter
|
||||
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||
.withZone(ZoneId.systemDefault())
|
||||
}
|
||||
|
||||
private val properties: Properties = Properties().apply {
|
||||
// identifier
|
||||
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
|
||||
|
@ -59,12 +49,7 @@ class PrintConsumer(
|
|||
val messages: ConsumerRecords<String, Double> = this.consumer.poll(timeout)
|
||||
|
||||
// print them with their timestamp and content
|
||||
messages.forEach { message ->
|
||||
// format the time of the message to a proper string
|
||||
val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
|
||||
// print the value
|
||||
println("[${time}] ${message.value()}°F")
|
||||
}
|
||||
messages.forEach { message -> this.callback(message) }
|
||||
}
|
||||
|
||||
/**
|
20
src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt
Normal file
20
src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt
Normal file
|
@ -0,0 +1,20 @@
|
|||
package fr.faraphel.m1_pe_kafka.rest
|
||||
|
||||
import jakarta.ws.rs.GET
|
||||
import jakarta.ws.rs.Path
|
||||
|
||||
|
||||
/**
|
||||
* A simple ping endpoint for the API
|
||||
* Always answer "Pong!"
|
||||
* Can be used to test if the API can be reached
|
||||
*/
|
||||
@Path("ping")
|
||||
class PingEndpoint {
|
||||
/**
|
||||
* Handler for a GET request on this endpoint
|
||||
* @return "Pong!"
|
||||
*/
|
||||
@GET
|
||||
fun get(): String = "Pong!"
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package fr.faraphel.m1_pe_kafka.rest
|
||||
|
||||
import jakarta.ws.rs.GET
|
||||
import jakarta.ws.rs.Path
|
||||
|
||||
|
||||
/**
|
||||
* This API endpoint return the latest temperature measured (in Fahrenheit)
|
||||
*/
|
||||
@Path("temperature")
|
||||
class TemperatureEndpoint {
|
||||
companion object {
|
||||
private var temperature: Double? = null ///< the latest temperature value
|
||||
|
||||
/**
|
||||
* Setter to update the latest temperature value
|
||||
* @param temperature the new temperature value
|
||||
*/
|
||||
fun setTemperature(temperature: Double) {
|
||||
this.temperature = temperature
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for a GET request on this endpoint
|
||||
* @return the latest temperature measured
|
||||
*/
|
||||
@GET
|
||||
fun get(): Double? {
|
||||
return temperature
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue