import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
@EnableKafka
@Configuration
class KafkaConfig {
@Value("\${spring.kafka.bootstrap-servers}")
private val BOOTSTRAP_SERVER: String? = null
@Value("\${spring.kafka.consumer.group-id}")
private val GROUP_ID: String? = null
@Value("\${spring.kafka.consumer.auto-offset-reset}")
private val AUTO_OFFSET_RESET: String? = null
// Consumer 설정을 위한 Factory Bean을 정의합니다.
// 여러 개의 Consumer 스레드를 설정하기 위해 setConcurrency를 사용하여 동시성을 3으로 설정합니다.
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(3)
return factory
}
// Consumer 설정
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
val deserializer = JsonDeserializer(Any::class.java)
deserializer.addTrustedPackages("*")
val config = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.GROUP_ID_CONFIG to GROUP_ID,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to deserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to AUTO_OFFSET_RESET
)
return DefaultKafkaConsumerFactory(config, StringDeserializer(), deserializer)
}
// Producer 설정을 위한 Factory Bean을 정의합니다.
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val config = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
)
return DefaultKafkaProducerFactory(config)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory())
}
}
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class KafkaMessageUtils(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
fun <T> messageFormatter(value: Map<String, Any>, type: Class<T>): Message<T> {
val objectMapper = jacksonObjectMapper()
val constructParametricType = objectMapper.typeFactory.constructParametricType(Message::class.java, type)
return objectMapper.readValue(objectMapper.writeValueAsString(value), constructParametricType)
}
fun sendResultMessage(message: Message<*>, topic: String, status: String) {
val messageData = mapOf(
"test_id" to message.test_id,
"service" to "BOARD_SERVICE",
"event" to topic,
"data" to message.data,
"status" to status
)
kafkaTemplate.send("$topic-result", messageData)
}
data class Message<T>(
val test_id: String,
val data: T
)
}
댓글남기기