@@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
6
6
import org.apache.kafka.clients.producer.ProducerConfig
7
7
import org.apache.kafka.common.serialization.StringDeserializer
8
8
import org.apache.kafka.common.serialization.StringSerializer
9
+ import org.springframework.beans.factory.annotation.Value
9
10
import org.springframework.context.annotation.Bean
10
11
import org.springframework.context.annotation.Configuration
11
12
import org.springframework.context.annotation.Profile
@@ -24,6 +25,12 @@ import org.springframework.kafka.support.serializer.JsonSerializer
24
25
@EnableKafka
25
26
class KafkaConfig {
26
27
28
+ @Value(" \$ {spring.kafka.bootstrap-servers}" )
29
+ private lateinit var bootstrapServer: String
30
+
31
+ @Value(" \$ {spring.kafka.consumer.group-id}" )
32
+ private lateinit var groupId: String
33
+
27
34
@Bean
28
35
fun kafkaListenerContainerFactory (consumerFactory : ConsumerFactory <String , JsonNode >) =
29
36
ConcurrentKafkaListenerContainerFactory <String , JsonNode >().also { it.consumerFactory = consumerFactory }
@@ -32,6 +39,8 @@ class KafkaConfig {
32
39
fun consumerFactory () = DefaultKafkaConsumerFactory <String , JsonNode >(consumerProps)
33
40
34
41
val consumerProps = mapOf (
42
+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
43
+ ConsumerConfig .GROUP_ID_CONFIG to groupId,
35
44
ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
36
45
ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer ::class .java,
37
46
JsonDeserializer .USE_TYPE_INFO_HEADERS to false ,
@@ -44,6 +53,7 @@ class KafkaConfig {
44
53
fun producerFactory () = DefaultKafkaProducerFactory <String , KafkaMessage >(senderProps)
45
54
46
55
val senderProps = mapOf (
56
+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
47
57
ProducerConfig .LINGER_MS_CONFIG to 10 ,
48
58
ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG to StringSerializer ::class .java,
49
59
ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer ::class .java
0 commit comments