在Spring Boot中使用Redis和Kafka非常方便,这得益于Spring提供的强大集成支持(Spring Data RedisSpring for Apache Kafka)。下面我将通过示例详细介绍如何集成和使用它们。


一、在Spring Boot中使用Redis

Redis在Spring Boot中通常用作缓存、分布式锁或临时数据存储。

1. 添加依赖

首先,在pom.xml中添加Spring Data Redis的起步依赖(Starter)。

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

常用的连接池依赖(如Lettuce默认已包含,如需使用Jedis或配置连接池可添加):

1
2
3
4
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

2. 配置连接信息

application.yml(或application.properties)中配置Redis服务器连接信息。

1
2
3
4
5
6
7
8
9
10
11
spring:
redis:
host: localhost # Redis服务器地址
port: 6379 # Redis服务器端口
password: # 密码,如果没有则不配置
database: 0 # 数据库索引(0-15)
lettuce:
pool:
max-active: 8 # 连接池最大连接数
max-idle: 8 # 连接池最大空闲连接数
min-idle: 0 # 连接池最小空闲连接数

3. 使用RedisTemplate直接操作Redis

Spring提供了RedisTemplateStringRedisTemplate两个核心类来操作Redis。

示例:创建一个Service类来操作数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class RedisService {

@Autowired
private RedisTemplate<String, Object> redisTemplate; // 可存储任意Java对象
// 也可注入 StringRedisTemplate,专门用于操作字符串

/**
* 设置键值对(带过期时间)
* @param key 键
* @param value 值
* @param expire 过期时间(秒)
*/
public void set(String key, Object value, long expire) {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
ops.set(key, value, expire, TimeUnit.SECONDS);
}

/**
* 获取值
* @param key 键
* @return
*/
public Object get(String key) {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
return ops.get(key);
}

/**
* 删除键
* @param key 键
* @return 是否删除成功
*/
public Boolean delete(String key) {
return redisTemplate.delete(key);
}
}

4. 使用注解进行缓存抽象(推荐)

Spring Cache抽象允许你通过注解轻松地将方法结果缓存到Redis中。

a. 在主应用类上开启缓存支持

1
2
3
4
5
6
7
@SpringBootApplication
@EnableCaching // 开启缓存注解功能
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}

b. 在Service方法上使用注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
public class UserService {

@Cacheable(value = "user", key = "#id") // 如果缓存中有,直接返回;没有则执行方法并存入缓存
public User getUserById(Long id) {
// 模拟从数据库查询
System.out.println("从数据库查询用户: " + id);
return new User(id, "User" + id);
}

@CacheEvict(value = "user", key = "#id") // 删除缓存
public void deleteUserById(Long id) {
// 先删除数据库数据,此注解会自动清除缓存
}

@CachePut(value = "user", key = "#user.id") // 更新缓存
public User updateUser(User user) {
// 更新数据库...
return user; // 返回的结果会被缓存
}
}

你需要一个配置类来指定使用Redis作为缓存管理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import java.time.Duration;

@Configuration
public class RedisCacheConfig {

@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10)) // 设置默认过期时间10分钟
.disableCachingNullValues(); // 不缓存null值

return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(config)
.build();
}
}

二、在Spring Boot中使用Kafka

Kafka主要用于异步消息传递、流处理和解耦系统。

1. 添加依赖

pom.xml中添加Spring for Apache Kafka的依赖。

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

2. 配置连接信息

application.yml中配置Kafka集群地址和消费者/生产者属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka集群地址,多个用逗号分隔
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3 # 发送失败后的重试次数
consumer:
group-id: my-group # 消费者组ID
auto-offset-reset: earliest # 如果没有初始偏移量,从最早的消息开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate # 手动提交偏移量并立即确认

3. 创建消息生产者(Producer)

使用KafkaTemplate来发送消息非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class KafkaProducerService {

private static final String TOPIC_NAME = "my_topic";

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 发送消息
* @param message 消息内容
*/
public void sendMessage(String message) {
// 发送消息,topic不存在时会自动创建
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);

// 添加回调,处理发送成功或失败后的逻辑
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message: [" + message + "] with offset: [" + result.getRecordMetadata().offset() + "]");
}

@Override
public void onFailure(Throwable ex) {
System.err.println("Unable to send message: [" + message + "] due to: " + ex.getMessage());
}
});
}
}

4. 创建消息消费者(Consumer)

使用@KafkaListener注解来监听Topic并消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

// 监听名为'my_topic'的Topic,concurrency表示并发消费者线程数
@KafkaListener(topics = "my_topic", groupId = "my-group", concurrency = "3")
public void listen(@Payload String message, Acknowledgment ack) {
try {
System.out.println("Received Message: " + message);
// 模拟业务处理...
// 处理成功后,手动提交偏移量
ack.acknowledge();
} catch (Exception e) {
// 处理异常,可根据业务决定是重试还是记录日志等
System.err.println("Error processing message: " + message);
// 不确认消息,Kafka会根据配置进行重试
}
}
}

5. 发送复杂对象(JSON)

通常我们不会只发送字符串,而是发送JSON格式的对象。

a. 配置生产者序列化器

1
2
3
4
5
6
7
8
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.model" # 信任的反序列化包名

b. 发送和接收对象

1
2
3
4
5
6
7
8
9
// 生产者
kafkaTemplate.send(TOPIC_NAME, new MyObject(1L, "Name"));

// 消费者
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void listen(MyObject myObject, Acknowledgment ack) {
System.out.println("Received Object: " + myObject);
ack.acknowledge();
}

总结对比

组件 主要用途 Spring Boot集成核心类 关键注解/接口
Redis 缓存、分布式锁、会话存储 RedisTemplate, StringRedisTemplate @Cacheable, @CacheEvict, @CachePut
Kafka 异步消息、解耦、流处理 KafkaTemplate @KafkaListener

这样,你就可以在Spring Boot项目中轻松地使用Redis和Kafka来处理缓存和异步消息了。