[WIP] implemented basic temperature feature with basic consumer and producer

This commit is contained in:
Faraphel 2024-06-11 14:03:41 +02:00
parent 86362c217f
commit 866ee1c95e
8 changed files with 227 additions and 1 deletions

View file

@ -21,6 +21,7 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("io.quarkus:quarkus-arc") implementation("io.quarkus:quarkus-arc")
testImplementation("io.quarkus:quarkus-junit5") testImplementation("io.quarkus:quarkus-junit5")
implementation("io.quarkus:quarkus-messaging-kafka")
} }
group = "fr.faraphel" group = "fr.faraphel"

59
docker-compose-2.yaml Normal file
View file

@ -0,0 +1,59 @@
version: '3.5'
services:
zookeeper:
image: strimzi/kafka:0.19.0-kafka-2.5.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
networks:
- kafkastreams-network
kafka:
image: strimzi/kafka:0.19.0-kafka-2.5.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS} --override group.min.session.timeout.ms=$${KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 3
KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS: 100
networks:
- kafkastreams-network
producer:
image: quarkus-quickstarts/kafka-streams-producer:1.0
build:
context: producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
networks:
- kafkastreams-network
aggregator:
image: quarkus-quickstarts/kafka-streams-aggregator:1.0
build:
context: aggregator
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
networks:
- kafkastreams-network
networks:
kafkastreams-network:
name: ks

15
docker-compose.yaml Normal file
View file

@ -0,0 +1,15 @@
version: '2'
networks:
kafka:
driver: bridge
services:
kafka:
image: bitnami/kafka
ports:
- "9092:9092"
networks:
- kafka
volumes:
- kafka_data:/bitnami/kafka

View file

@ -4,3 +4,8 @@ quarkusPluginVersion=3.11.1
quarkusPlatformGroupId=io.quarkus.platform quarkusPlatformGroupId=io.quarkus.platform
quarkusPlatformArtifactId=quarkus-bom quarkusPlatformArtifactId=quarkus-bom
quarkusPlatformVersion=3.11.1 quarkusPlatformVersion=3.11.1
#Kafka Properties
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
quarkus.kafka-streams.application-id=my-quarkus-app

View file

@ -0,0 +1,45 @@
package fr.faraphel.m1_pe_kafka
/**
* The consumer
*/
import org.apache.kafka.clients.consumer.*
import java.time.Duration
import java.util.Collections
import java.util.Properties
/**
* The consumer
*/
class Consumer {
private val properties: Properties = Properties()
private var consumer: KafkaConsumer<String, String>
private val topic: String = "mon_beau_topic"
init {
this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe"
this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
this.consumer = KafkaConsumer(properties);
}
fun main() {
this.consumer.subscribe(Collections.singletonList(topic));
// Afficher les messages reçu
while (true) {
val messages: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(500));
messages.forEach { message ->
println(">" + message.value())
}
}
}
}

View file

@ -0,0 +1,27 @@
package fr.faraphel.m1_pe_kafka
import fr.faraphel.m1_pe_kafka.utils.Temperature
import fr.faraphel.m1_pe_kafka.utils.celsius
/**
* The main function
*/
fun main() {
println("Debut du programme")
val temperature: Temperature = 0.celsius
println(temperature.kelvin)
val producer = Producer()
val consumer = Consumer()
val thread1 = Thread { producer.main() }
val thread2 = Thread { consumer.main() }
thread1.start()
thread2.start()
thread1.join()
thread2.join()
}

View file

@ -0,0 +1,33 @@
package fr.faraphel.m1_pe_kafka
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties
/**
* The producer
*/
class Producer {
private val properties: Properties = Properties()
private var producer: KafkaProducer<String, String>
private val topic: String = "Température_Celsius"
init {
this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
this.producer = KafkaProducer(properties)
}
fun main() {
while (true) {
// create a new record
val record: ProducerRecord<String, String> = ProducerRecord(this.topic, "bonjour el mundo")
// send the record
this.producer.send(record)
this.producer.flush()
}
}
}

View file

@ -0,0 +1,41 @@
package fr.faraphel.m1_pe_kafka.utils
/**
* Represent a temperature
* Allow to store temperature and convert the units in an easier way
* @param value the value of the temperature (Kelvin)
*/
class Temperature(
private val value: Double
) {
// convert Temperature to Double (Kelvin)
val kelvin: Double
get() = this.value
// convert Temperature to Double (Celcius)
val celcius: Double
get() = this.value - 275.15
// convert Temperature to Double (Fahrenheit)
val fahrenheit: Double
get() = ((this.value - 273.15) * 1.8) + 32
}
// convert Double (Kelvin) to Temperature
val Double.kelvin: Temperature
get() = Temperature(this)
// convert Double (Celsius) to Temperature
val Double.celsius: Temperature
get() = Temperature(this + 275.15)
// convert Double (Fahrenheit) to Temperature
val Double.fahrenheit: Temperature
get() = Temperature((this - 32 / 1.8) + 273.15)
// convert Int (Kelvin) to Temperature
val Int.kelvin: Temperature
get() = this.toDouble().kelvin
// convert Int (Celsius) to Temperature
val Int.celsius: Temperature
get() = this.toDouble().celsius
// convert Int (Fahrenheit) to Temperature
val Int.fahrenheit: Temperature
get() = this.toDouble().fahrenheit