diff --git a/docker-compose-2.yaml b/docker-compose-2.yaml deleted file mode 100644 index 1193b24..0000000 --- a/docker-compose-2.yaml +++ /dev/null @@ -1,59 +0,0 @@ -version: '3.5' - -services: - zookeeper: - image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" - ] - ports: - - "2181:2181" - environment: - LOG_DIR: /tmp/logs - networks: - - kafkastreams-network - - kafka: - image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS} --override group.min.session.timeout.ms=$${KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS}" - ] - depends_on: - - zookeeper - ports: - - "9092:9092" - environment: - LOG_DIR: "/tmp/logs" - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_NUM_PARTITIONS: 3 - KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS: 100 - networks: - - kafkastreams-network - - producer: - image: quarkus-quickstarts/kafka-streams-producer:1.0 - build: - context: producer - dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} - environment: - KAFKA_BOOTSTRAP_SERVERS: kafka:9092 - networks: - - kafkastreams-network - - aggregator: - image: quarkus-quickstarts/kafka-streams-aggregator:1.0 - build: - context: aggregator - dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} - environment: - QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092 - networks: - - kafkastreams-network - -networks: - kafkastreams-network: - name: ks \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 86707b5..89419c1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,15 +1,79 @@ -version: '2' - -networks: - kafka: - driver: bridge +version: '3.5' services: - kafka: - image: bitnami/kafka + # Handle the coordination between the kafka images + zookeeper: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] ports: - - "9092:9092" + - "2181:2181" + environment: + LOG_DIR: /tmp/logs networks: - kafka - volumes: - - kafka_data:/bitnami/kafka + + # The event system + kafka: + image: strimzi/kafka:0.19.0-kafka-2.5.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS} --override group.min.session.timeout.ms=$${KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NUM_PARTITIONS: 3 + KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS: 100 + networks: + - kafka + + # Producer to a Kafka cluster + producer: + image: quarkus-quickstarts/kafka-streams-producer:1.0 + build: + context: producer + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafka + + # Aggregator to a Kafka cluster + aggregator: + image: quarkus-quickstarts/kafka-streams-aggregator:1.0 + build: + context: aggregator + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafka + + # Our application + application: + build: + context: . + dockerfile: ./src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + depends_on: + - zookeeper + - kafka + - producer + - aggregator + networks: + - kafka + +networks: + # the Kafka network + kafka: + name: kafka diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt index 4f0e59e..199b4fd 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Consumer.kt @@ -20,12 +20,12 @@ class Consumer { private val topic: String = "mon_beau_topic" init { - this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" this.properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" - this.properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" this.properties[ConsumerConfig.GROUP_ID_CONFIG] = "premier_groupe" this.properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + this.properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") this.consumer = KafkaConsumer(properties); } diff --git a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt index ce7f11d..837a32a 100644 --- a/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt +++ b/src/main/kotlin/fr/faraphel/m1_pe_kafka/Producer.kt @@ -15,9 +15,11 @@ class Producer { private val topic: String = "Température_Celsius" init { - this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092" this.properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" this.properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + this.properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = System.getenv("KAFKA_BOOTSTRAP_SERVERS") + ?: throw IllegalArgumentException("Missing environment variable: KAFKA_BOOTSTRAP_SERVERS") + this.producer = KafkaProducer(properties) }