在Spring Boot中使用Redis和Kafka非常方便,这得益于Spring提供的强大集成支持(Spring Data Redis 和 Spring for Apache Kafka)。下面我将通过示例详细介绍如何集成和使用它们。
一、在Spring Boot中使用Redis
Redis在Spring Boot中通常用作缓存、分布式锁或临时数据存储。
1. 添加依赖
首先,在pom.xml中添加Spring Data Redis的起步依赖(Starter)。
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
常用的连接池依赖(如Lettuce默认已包含,如需使用Jedis或配置连接池可添加):
1 2 3 4
| <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
|
2. 配置连接信息
在application.yml(或application.properties)中配置Redis服务器连接信息。
1 2 3 4 5 6 7 8 9 10 11
| spring: redis: host: localhost port: 6379 password: database: 0 lettuce: pool: max-active: 8 max-idle: 8 min-idle: 0
|
3. 使用RedisTemplate直接操作Redis
Spring提供了RedisTemplate和StringRedisTemplate两个核心类来操作Redis。
示例:创建一个Service类来操作数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit;
@Service public class RedisService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
public void set(String key, Object value, long expire) { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.set(key, value, expire, TimeUnit.SECONDS); }
public Object get(String key) { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); return ops.get(key); }
public Boolean delete(String key) { return redisTemplate.delete(key); } }
|
4. 使用注解进行缓存抽象(推荐)
Spring Cache抽象允许你通过注解轻松地将方法结果缓存到Redis中。
a. 在主应用类上开启缓存支持
1 2 3 4 5 6 7
| @SpringBootApplication @EnableCaching public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } }
|
b. 在Service方法上使用注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Service public class UserService {
@Cacheable(value = "user", key = "#id") public User getUserById(Long id) { System.out.println("从数据库查询用户: " + id); return new User(id, "User" + id); }
@CacheEvict(value = "user", key = "#id") public void deleteUserById(Long id) { }
@CachePut(value = "user", key = "#user.id") public User updateUser(User user) { return user; } }
|
你需要一个配置类来指定使用Redis作为缓存管理器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.connection.RedisConnectionFactory; import java.time.Duration;
@Configuration public class RedisCacheConfig {
@Bean public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) { RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofMinutes(10)) .disableCachingNullValues();
return RedisCacheManager.builder(connectionFactory) .cacheDefaults(config) .build(); } }
|
二、在Spring Boot中使用Kafka
Kafka主要用于异步消息传递、流处理和解耦系统。
1. 添加依赖
在pom.xml中添加Spring for Apache Kafka的依赖。
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
2. 配置连接信息
在application.yml中配置Kafka集群地址和消费者/生产者属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: ack-mode: manual_immediate
|
3. 创建消息生产者(Producer)
使用KafkaTemplate来发送消息非常简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;
@Service public class KafkaProducerService {
private static final String TOPIC_NAME = "my_topic";
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message: [" + message + "] with offset: [" + result.getRecordMetadata().offset() + "]"); }
@Override public void onFailure(Throwable ex) { System.err.println("Unable to send message: [" + message + "] due to: " + ex.getMessage()); } }); } }
|
4. 创建消息消费者(Consumer)
使用@KafkaListener注解来监听Topic并消费消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service;
@Service public class KafkaConsumerService {
@KafkaListener(topics = "my_topic", groupId = "my-group", concurrency = "3") public void listen(@Payload String message, Acknowledgment ack) { try { System.out.println("Received Message: " + message); ack.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + message); } } }
|
5. 发送复杂对象(JSON)
通常我们不会只发送字符串,而是发送JSON格式的对象。
a. 配置生产者序列化器
1 2 3 4 5 6 7 8
| spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.model"
|
b. 发送和接收对象
1 2 3 4 5 6 7 8 9
| kafkaTemplate.send(TOPIC_NAME, new MyObject(1L, "Name"));
@KafkaListener(topics = "my_topic", groupId = "my-group") public void listen(MyObject myObject, Acknowledgment ack) { System.out.println("Received Object: " + myObject); ack.acknowledge(); }
|
总结对比
| 组件 |
主要用途 |
Spring Boot集成核心类 |
关键注解/接口 |
| Redis |
缓存、分布式锁、会话存储 |
RedisTemplate, StringRedisTemplate |
@Cacheable, @CacheEvict, @CachePut |
| Kafka |
异步消息、解耦、流处理 |
KafkaTemplate |
@KafkaListener |
这样,你就可以在Spring Boot项目中轻松地使用Redis和Kafka来处理缓存和异步消息了。