Merge pull request 'Add a REST API to access the current temperature' (#13) from rest into master
Reviewed-on: #13
This commit is contained in:
commit
cc150a01e5
7 changed files with 87 additions and 26 deletions
|
@ -40,8 +40,8 @@ The container `application` can be easily modified with the following environmen
|
||||||
| TEMPERATURE_LOCATION | true | \<latitude>, \<longitude> | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates where to get the temperatures from |
|
| TEMPERATURE_LOCATION | true | \<latitude>, \<longitude> | 49.9, 2.3 ([Amiens, France](https://fr.wikipedia.org/wiki/Amiens)) | The coordinates where to get the temperatures from |
|
||||||
|
|
||||||
## Expectation
|
## Expectation
|
||||||
The `application` container shall print the current temperature at the selected place in
|
The `application` container shall print the current temperature at the selected place in Fahrenheit every minute.
|
||||||
Fahrenheit every minute.
|
You can also access this value with the REST api at the `http://localhost:8080/temperature` endpoint.
|
||||||
|
|
||||||
## References
|
## References
|
||||||
The project use the [Open-Meteo API](https://open-meteo.com/) to fetch the current temperature at the
|
The project use the [Open-Meteo API](https://open-meteo.com/) to fetch the current temperature at the
|
||||||
|
|
|
@ -27,9 +27,9 @@ dependencies {
|
||||||
implementation("io.quarkus:quarkus-kafka-client")
|
implementation("io.quarkus:quarkus-kafka-client")
|
||||||
implementation("io.quarkus:quarkus-kafka-streams")
|
implementation("io.quarkus:quarkus-kafka-streams")
|
||||||
implementation("io.quarkus:quarkus-messaging-kafka")
|
implementation("io.quarkus:quarkus-messaging-kafka")
|
||||||
|
implementation("io.quarkus:quarkus-rest")
|
||||||
implementation("com.squareup.okhttp3:okhttp:4.12.0")
|
implementation("com.squareup.okhttp3:okhttp:4.12.0")
|
||||||
implementation("com.google.code.gson:gson:2.8.9")
|
implementation("com.google.code.gson:gson:2.8.9")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
group = "fr.faraphel"
|
group = "fr.faraphel"
|
||||||
|
|
|
@ -32,9 +32,17 @@ services:
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
|
dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
environment:
|
environment:
|
||||||
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
|
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
|
||||||
TEMPERATURE_LOCATION: 49.9, 2.3
|
TEMPERATURE_LOCATION: 49.9, 2.3
|
||||||
|
healthcheck:
|
||||||
|
test: curl --fail http://localhost:8080/ping
|
||||||
|
start_period: 10s
|
||||||
|
timeout: 5s
|
||||||
|
interval: 60s
|
||||||
|
retries: 3
|
||||||
networks:
|
networks:
|
||||||
- kafka
|
- kafka
|
||||||
depends_on:
|
depends_on:
|
||||||
|
|
|
@ -2,13 +2,17 @@ package fr.faraphel.m1_pe_kafka
|
||||||
|
|
||||||
import fr.faraphel.m1_pe_kafka.error.MissingEnvironmentException
|
import fr.faraphel.m1_pe_kafka.error.MissingEnvironmentException
|
||||||
import fr.faraphel.m1_pe_kafka.kafka.AdminUtils
|
import fr.faraphel.m1_pe_kafka.kafka.AdminUtils
|
||||||
import fr.faraphel.m1_pe_kafka.kafka.PrintConsumer
|
import fr.faraphel.m1_pe_kafka.kafka.Consumer
|
||||||
import fr.faraphel.m1_pe_kafka.kafka.Converter
|
import fr.faraphel.m1_pe_kafka.kafka.Converter
|
||||||
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
|
import fr.faraphel.m1_pe_kafka.kafka.TemperatureProducer
|
||||||
|
import fr.faraphel.m1_pe_kafka.rest.TemperatureEndpoint
|
||||||
import fr.faraphel.m1_pe_kafka.utils.celsius
|
import fr.faraphel.m1_pe_kafka.utils.celsius
|
||||||
import io.quarkus.runtime.Quarkus
|
import io.quarkus.runtime.Quarkus
|
||||||
import io.quarkus.runtime.QuarkusApplication
|
import io.quarkus.runtime.QuarkusApplication
|
||||||
import io.quarkus.runtime.annotations.QuarkusMain
|
import io.quarkus.runtime.annotations.QuarkusMain
|
||||||
|
import java.time.Instant
|
||||||
|
import java.time.ZoneId
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -24,6 +28,11 @@ class Main : QuarkusApplication {
|
||||||
* @throws MissingEnvironmentException a required environment variable from the configuration is missing
|
* @throws MissingEnvironmentException a required environment variable from the configuration is missing
|
||||||
*/
|
*/
|
||||||
override fun run(vararg args: String?): Int {
|
override fun run(vararg args: String?): Int {
|
||||||
|
// create a time formatter
|
||||||
|
val timeFormatter = DateTimeFormatter
|
||||||
|
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||||
|
.withZone(ZoneId.systemDefault())
|
||||||
|
|
||||||
// get the kafka server address
|
// get the kafka server address
|
||||||
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
val kafkaServer = System.getenv("KAFKA_BOOTSTRAP_SERVERS")
|
||||||
?: throw MissingEnvironmentException("KAFKA_BOOTSTRAP_SERVERS")
|
?: throw MissingEnvironmentException("KAFKA_BOOTSTRAP_SERVERS")
|
||||||
|
@ -62,10 +71,17 @@ class Main : QuarkusApplication {
|
||||||
) { temperature -> temperature.celsius.asFahrenheit }
|
) { temperature -> temperature.celsius.asFahrenheit }
|
||||||
|
|
||||||
// create a consumer that will print the received values in the Fahrenheit topic
|
// create a consumer that will print the received values in the Fahrenheit topic
|
||||||
val consumer = PrintConsumer(
|
val consumer = Consumer(
|
||||||
server=kafkaServer,
|
server=kafkaServer,
|
||||||
topic=topicTemperatureFahrenheit
|
topic=topicTemperatureFahrenheit
|
||||||
)
|
) { message ->
|
||||||
|
// format the time of the message to a proper string
|
||||||
|
val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
|
||||||
|
// print the value
|
||||||
|
println("[${time}] ${message.value()}°F")
|
||||||
|
// update the value for the API
|
||||||
|
TemperatureEndpoint.setTemperature(message.value())
|
||||||
|
}
|
||||||
|
|
||||||
// run all the clients
|
// run all the clients
|
||||||
producer.start()
|
producer.start()
|
||||||
|
|
|
@ -4,37 +4,27 @@ package fr.faraphel.m1_pe_kafka.kafka
|
||||||
import org.apache.kafka.clients.consumer.*
|
import org.apache.kafka.clients.consumer.*
|
||||||
import org.apache.kafka.common.serialization.DoubleDeserializer
|
import org.apache.kafka.common.serialization.DoubleDeserializer
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer
|
import org.apache.kafka.common.serialization.StringDeserializer
|
||||||
import java.time.DateTimeException
|
|
||||||
|
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
|
||||||
import java.time.ZoneId
|
|
||||||
import java.time.format.DateTimeFormatter
|
|
||||||
import java.util.Collections
|
import java.util.Collections
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kotlin.time.Duration.Companion.milliseconds
|
|
||||||
import kotlin.time.Duration.Companion.seconds
|
import kotlin.time.Duration.Companion.seconds
|
||||||
import kotlin.time.toJavaDuration
|
import kotlin.time.toJavaDuration
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A basic consumer that print everything in the selected topic.
|
* A basic consumer that call a function on all received messages
|
||||||
* @param server the kafka server address
|
* @param server the kafka server address
|
||||||
* @param topic the topic to get the records from
|
* @param topic the topic to get the records from
|
||||||
* @param identifier the kafka identifier for the configuration
|
* @param identifier the kafka identifier for the configuration
|
||||||
|
* @param callback the function to call on all messages
|
||||||
*/
|
*/
|
||||||
class PrintConsumer(
|
class Consumer(
|
||||||
private val server: String,
|
private val server: String,
|
||||||
private val identifier: String = "consumer",
|
private val identifier: String = "consumer",
|
||||||
private val topic: String,
|
private val topic: String,
|
||||||
|
private val callback: (ConsumerRecord<String, Double>) -> Unit,
|
||||||
) : Thread() {
|
) : Thread() {
|
||||||
companion object {
|
|
||||||
// create a formatter to convert the timestamp to a string
|
|
||||||
private val timeFormatter = DateTimeFormatter
|
|
||||||
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
|
||||||
.withZone(ZoneId.systemDefault())
|
|
||||||
}
|
|
||||||
|
|
||||||
private val properties: Properties = Properties().apply {
|
private val properties: Properties = Properties().apply {
|
||||||
// identifier
|
// identifier
|
||||||
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
|
this[ConsumerConfig.GROUP_ID_CONFIG] = identifier
|
||||||
|
@ -59,12 +49,7 @@ class PrintConsumer(
|
||||||
val messages: ConsumerRecords<String, Double> = this.consumer.poll(timeout)
|
val messages: ConsumerRecords<String, Double> = this.consumer.poll(timeout)
|
||||||
|
|
||||||
// print them with their timestamp and content
|
// print them with their timestamp and content
|
||||||
messages.forEach { message ->
|
messages.forEach { message -> this.callback(message) }
|
||||||
// format the time of the message to a proper string
|
|
||||||
val time = timeFormatter.format(Instant.ofEpochMilli(message.timestamp()))
|
|
||||||
// print the value
|
|
||||||
println("[${time}] ${message.value()}°F")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
20
src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt
Normal file
20
src/main/kotlin/fr/faraphel/m1_pe_kafka/rest/PingEndpoint.kt
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package fr.faraphel.m1_pe_kafka.rest
|
||||||
|
|
||||||
|
import jakarta.ws.rs.GET
|
||||||
|
import jakarta.ws.rs.Path
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple ping endpoint for the API
|
||||||
|
* Always answer "Pong!"
|
||||||
|
* Can be used to test if the API can be reached
|
||||||
|
*/
|
||||||
|
@Path("ping")
|
||||||
|
class PingEndpoint {
|
||||||
|
/**
|
||||||
|
* Handler for a GET request on this endpoint
|
||||||
|
* @return "Pong!"
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
fun get(): String = "Pong!"
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
package fr.faraphel.m1_pe_kafka.rest
|
||||||
|
|
||||||
|
import jakarta.ws.rs.GET
|
||||||
|
import jakarta.ws.rs.Path
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This API endpoint return the latest temperature measured (in Fahrenheit)
|
||||||
|
*/
|
||||||
|
@Path("temperature")
|
||||||
|
class TemperatureEndpoint {
|
||||||
|
companion object {
|
||||||
|
private var temperature: Double? = null ///< the latest temperature value
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter to update the latest temperature value
|
||||||
|
* @param temperature the new temperature value
|
||||||
|
*/
|
||||||
|
fun setTemperature(temperature: Double) {
|
||||||
|
this.temperature = temperature
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handler for a GET request on this endpoint
|
||||||
|
* @return the latest temperature measured
|
||||||
|
*/
|
||||||
|
@GET
|
||||||
|
fun get(): Double? {
|
||||||
|
return temperature
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue