From 866ee1c95ef3c3c1515c12c64a3a14e166d98fa3 Mon Sep 17 00:00:00 2001 From: Faraphel Date: Tue, 11 Jun 2024 14:03:41 +0200 Subject: [PATCH] [WIP] implemented basic temperature feature with basic consumer and producer --- build.gradle.kts | 1 + docker-compose-2.yaml | 59 +++++++++++++++++++ docker-compose.yaml | 15 +++++ gradle.properties | 7 ++- .../fr/faraphel/m1_pe_kafka/Consumer.kt | 45 ++++++++++++++ .../kotlin/fr/faraphel/m1_pe_kafka/Main.kt | 27 +++++++++ .../fr/faraphel/m1_pe_kafka/Producer.kt | 33 +++++++++++ .../faraphel/m1_pe_kafka/utils/temperature.kt | 41 +++++++++++++ 8 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 docker-compose-2.yaml create mode 100644 docker-compose.yaml create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt create mode 100644 src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt diff --git a/build.gradle.kts b/build.gradle.kts index c59ac73..cbc82c6 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -21,6 +21,7 @@ dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("io.quarkus:quarkus-arc") testImplementation("io.quarkus:quarkus-junit5") + implementation("io.quarkus:quarkus-messaging-kafka") } group = "fr.faraphel" diff --git a/docker-compose-2.yaml b/docker-compose-2.yaml new file mode 100644 index 0000000..1193b24 --- /dev/null +++ b/docker-compose-2.yaml @@ -0,0 +1,59 @@ +version: '3.5' + +services: + zookeeper: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + networks: + - kafkastreams-network + + kafka: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS} --override group.min.session.timeout.ms=$${KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NUM_PARTITIONS: 3 + KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS: 100 + networks: + - kafkastreams-network + + producer: + image: quarkus-quickstarts/kafka-streams-producer:1.0 + build: + context: producer + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + + aggregator: + image: quarkus-quickstarts/kafka-streams-aggregator:1.0 + build: + context: aggregator + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + +networks: + kafkastreams-network: + name: ks \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..86707b5 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,15 @@ +version: '2' + +networks: + kafka: + driver: bridge + +services: + kafka: + image: bitnami/kafka + ports: + - "9092:9092" + networks: + - kafka + volumes: + - kafka_data:/bitnami/kafka diff --git a/gradle.properties b/gradle.properties index 0f9d829..9966905 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,4 +3,9 @@ quarkusPluginId=io.quarkus quarkusPluginVersion=3.11.1 quarkusPlatformGroupId=io.quarkus.platform quarkusPlatformArtifactId=quarkus-bom -quarkusPlatformVersion=3.11.1 \ No newline at end of file +quarkusPlatformVersion=3.11.1 + +#Kafka Properties +kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} +quarkus.kafka-streams.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:kafka:9092} +quarkus.kafka-streams.application-id=my-quarkus-app diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt new file mode 100644 index 0000000..4f0e59e --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt @@ -0,0 +1,45 @@ +package fr.faraphel.m1_pe_kafka + +/** + * The consumer + */ + +import org.apache.kafka.clients.consumer.* + +import java.time.Duration +import java.util.Collections +import java.util.Properties + + +/** + * The consumer + */ +class Consumer { + private val properties: Properties = Properties() + private var consumer: KafkaConsumer + private val topic: String = "mon_beau_topic" + + init { + this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + + this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe" + this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + + this.consumer = KafkaConsumer(properties); + } + + fun main() { + this.consumer.subscribe(Collections.singletonList(topic)); + + // Afficher les messages reçu + while (true) { + val messages: ConsumerRecords = consumer.poll(Duration.ofMillis(500)); + + messages.forEach { message -> + println(">" + message.value()) + } + } + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt new file mode 100644 index 0000000..5072108 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Main.kt @@ -0,0 +1,27 @@ +package fr.faraphel.m1_pe_kafka + +import fr.faraphel.m1_pe_kafka.utils.Temperature +import fr.faraphel.m1_pe_kafka.utils.celsius + + +/** + * The main function + */ +fun main() { + println("Debut du programme") + + val temperature: Temperature = 0.celsius + println(temperature.kelvin) + + val producer = Producer() + val consumer = Consumer() + + val thread1 = Thread { producer.main() } + val thread2 = Thread { consumer.main() } + + thread1.start() + thread2.start() + + thread1.join() + thread2.join() +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt new file mode 100644 index 0000000..ce7f11d --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt @@ -0,0 +1,33 @@ +package fr.faraphel.m1_pe_kafka + +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import java.util.Properties + + +/** + * The producer + */ +class Producer { + private val properties: Properties = Properties() + private var producer: KafkaProducer + private val topic: String = "Température_Celsius" + + init { + this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092" + this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + this.producer = KafkaProducer(properties) + } + + fun main() { + while (true) { + // create a new record + val record: ProducerRecord = ProducerRecord(this.topic, "bonjour el mundo") + // send the record + this.producer.send(record) + this.producer.flush() + } + } +} diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt new file mode 100644 index 0000000..43a0180 --- /dev/null +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/utils/temperature.kt @@ -0,0 +1,41 @@ +package fr.faraphel.m1_pe_kafka.utils + + +/** + * Represent a temperature + * Allow to store temperature and convert the units in an easier way + * @param value the value of the temperature (Kelvin) + */ +class Temperature( + private val value: Double +) { + // convert Temperature to Double (Kelvin) + val kelvin: Double + get() = this.value + // convert Temperature to Double (Celcius) + val celcius: Double + get() = this.value - 275.15 + // convert Temperature to Double (Fahrenheit) + val fahrenheit: Double + get() = ((this.value - 273.15) * 1.8) + 32 +} + +// convert Double (Kelvin) to Temperature +val Double.kelvin: Temperature + get() = Temperature(this) +// convert Double (Celsius) to Temperature +val Double.celsius: Temperature + get() = Temperature(this + 275.15) +// convert Double (Fahrenheit) to Temperature +val Double.fahrenheit: Temperature + get() = Temperature((this - 32 / 1.8) + 273.15) + +// convert Int (Kelvin) to Temperature +val Int.kelvin: Temperature + get() = this.toDouble().kelvin +// convert Int (Celsius) to Temperature +val Int.celsius: Temperature + get() = this.toDouble().celsius +// convert Int (Fahrenheit) to Temperature +val Int.fahrenheit: Temperature + get() = this.toDouble().fahrenheit