|
@@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.apache.kafka.common.serialization.StringSerializer;
|
|
import org.apache.kafka.common.serialization.StringSerializer;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.kafka.annotation.EnableKafka;
|
|
import org.springframework.kafka.annotation.EnableKafka;
|
|
@@ -34,6 +35,7 @@ public class KafkaConfig {
|
|
* @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object>
|
|
* @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object>
|
|
*/
|
|
*/
|
|
@Bean
|
|
@Bean
|
|
|
|
+ @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
public ProducerFactory<String, Object> producerFactory() {
|
|
public ProducerFactory<String, Object> producerFactory() {
|
|
Map<String, Object> props = new HashMap<>();
|
|
Map<String, Object> props = new HashMap<>();
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);//kafka 地址
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);//kafka 地址
|
|
@@ -51,6 +53,7 @@ public class KafkaConfig {
|
|
* @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object>
|
|
* @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object>
|
|
*/
|
|
*/
|
|
@Bean
|
|
@Bean
|
|
|
|
+ @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
public KafkaTemplate<String, Object> kafkaTemplate() {
|
|
public KafkaTemplate<String, Object> kafkaTemplate() {
|
|
return new KafkaTemplate<>(producerFactory());
|
|
return new KafkaTemplate<>(producerFactory());
|
|
}
|
|
}
|
|
@@ -63,6 +66,7 @@ public class KafkaConfig {
|
|
* @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object>
|
|
* @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object>
|
|
*/
|
|
*/
|
|
@Bean
|
|
@Bean
|
|
|
|
+ @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
public ConsumerFactory<String, Object> consumerFactory() {
|
|
public ConsumerFactory<String, Object> consumerFactory() {
|
|
Map<String, Object> props = new HashMap<>();
|
|
Map<String, Object> props = new HashMap<>();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
|
|
@@ -83,6 +87,7 @@ public class KafkaConfig {
|
|
* @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object>
|
|
* @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object>
|
|
*/
|
|
*/
|
|
@Bean
|
|
@Bean
|
|
|
|
+ @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
|
|
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
|
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
factory.setConsumerFactory(consumerFactory());
|
|
factory.setConsumerFactory(consumerFactory());
|
|
@@ -95,6 +100,7 @@ public class KafkaConfig {
|
|
* @return org.apache.kafka.clients.admin.AdminClient
|
|
* @return org.apache.kafka.clients.admin.AdminClient
|
|
*/
|
|
*/
|
|
@Bean
|
|
@Bean
|
|
|
|
+ @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
public AdminClient adminClient() {
|
|
public AdminClient adminClient() {
|
|
Properties props = new Properties();
|
|
Properties props = new Properties();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
|