Redis笔记(七)Java实现Redis消息队列

简介: 这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。 在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>


阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,


阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,

这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。

在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。


使用Redis实现消息队列

1.封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  Message  implements  Serializable{
 
private  static  final  long  serialVersionUID = 1L;
 
private  String titile;
private  String info;
 
public  Message(String titile,String info){
this .titile=titile;
this .info=info;
}
 
public  String getTitile() {
return  titile;
}
public  void  setTitile(String titile) {
this .titile = titile;
}
public  String getInfo() {
return  info;
}
public  void  setInfo(String info) {
this .info = info;
}
}

  

2.为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  class  MessageUtil {
 
//convert To String
public  static  String convertToString(Object obj,String charset)  throws  IOException{
 
ByteArrayOutputStream bo =  new  ByteArrayOutputStream();
ObjectOutputStream oo =  new  ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return  str;
}
 
//convert To Message
public  static  Object convertToMessage( byte [] bytes)  throws  Exception{
ByteArrayInputStream in =  new  ByteArrayInputStream(bytes);
ObjectInputStream sIn =  new  ObjectInputStream(in);
return  sIn.readObject();
 
}
}

  

3.从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  class  RedisUtil {
 
/**
* Jedis connection pool
* @Title: config
*/
public  static  JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle( "redis" );
String host=bundle.getString( "host" );
int  port=Integer.valueOf(bundle.getString( "port" ));
int  timeout=Integer.valueOf(bundle.getString( "timeout" ));
//  String password=bundle.getString("password");
 
JedisPoolConfig config= new  JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString( "maxActive" )));
config.setMaxWait(Integer.valueOf(bundle.getString( "maxWait" )));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString( "testOnBorrow" )));
config.setTestOnReturn(Boolean.valueOf(bundle.getString( "testOnReturn" )));
 
JedisPool pool= new  JedisPool(config, host, port, timeout);
 
return  pool;
}
}

  

4.创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public  class  Producer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  provide(String channel,Message message)  throws  IOException{
String str1=MessageUtil.convertToString(channel, "UTF-8" );
String str2=MessageUtil.convertToString(message, "UTF-8" );
jedis.publish(str1, str2);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

5.创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public  class  Consumer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  consum(String channel)  throws  IOException{
JedisPubSub jedisPubSub =  new  JedisPubSub() {
// 取得订阅的消息后的处理
public  void  onMessage(String channel, String message) {
System.out.println( "Channel:" +channel);
System.out.println( "Message:" +message.toString());
}
 
// 初始化订阅时候的处理
public  void  onSubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onSubscribe:" +channel);
}
 
// 取消订阅时候的处理
public  void  onUnsubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onUnsubscribe:" +channel);
}
 
// 初始化按表达式的方式订阅时候的处理
public  void  onPSubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取消按表达式的方式订阅时候的处理
public  void  onPUnsubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取得按表达式的方式订阅的消息后的处理
public  void  onPMessage(String pattern, String channel, String message) {
System.out.println(pattern +  "="  + channel +  "="  + message);
}
};
 
jedis.subscribe(jedisPubSub, channel);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

6.测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  void  main(String[] args){
 
Message msg= new  Message( "hello!" "this is the first message!" );
 
Producer producer= new  Producer();
Consumer consumer= new  Consumer();
try  {
producer.provide( "chn1" ,msg);
consumer.consum( "chn1" );
catch  (IOException e) {
e.printStackTrace();
}
}

  

 


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
13天前
|
JSON NoSQL Java
Redis入门到通关之Java客户端SpringDataRedis(RedisTemplate)
Redis入门到通关之Java客户端SpringDataRedis(RedisTemplate)
33 0
|
28天前
|
设计模式 Java
Java基础—笔记—多态、final、抽象类、接口篇
该文介绍了编程中的多态、final和抽象类、接口相关概念。多态允许子类重写父类方法,通过父类引用调用子类方法,实现解耦和提高代码灵活性,但也可能导致无法使用子类特有功能,需通过强制类型转换解决。final用于修饰不可变的类、方法或变量,防止继承、重写和多次赋值。抽象类是一种包含抽象方法的类,用于强制子类重写特定方法,实现多态,适用于模板方法设计模式,解决代码重复问题。
17 0
|
8天前
|
存储 NoSQL 安全
java 中通过 Lettuce 来操作 Redis
java 中通过 Lettuce 来操作 Redis
java 中通过 Lettuce 来操作 Redis
|
13天前
|
存储 消息中间件 缓存
jeecgboot运行磁盘不足问题( java.io.IOException)和redis闪退问题
jeecgboot运行磁盘不足问题( java.io.IOException)和redis闪退问题
18 0
|
20天前
|
人工智能 前端开发 Java
Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码
智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。
170 10
|
21天前
|
消息中间件 存储 安全
从零开始构建Java消息队列系统
【4月更文挑战第18天】构建一个简单的Java消息队列系统,包括`Message`类、遵循FIFO原则的`MessageQueue`(使用`LinkedList`实现)、`Producer`和`Consumer`类。在多线程环境下,`MessageQueue`的操作通过`synchronized`保证线程安全。测试代码中,生产者发送10条消息,消费者处理这些消息。实际应用中,可能需要考虑持久化、分布式队列和消息确认等高级特性,或者使用成熟的MQ系统如Kafka或RabbitMQ。
|
22天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
23天前
|
缓存 NoSQL Java
使用Redis进行Java缓存策略设计
【4月更文挑战第16天】在高并发Java应用中,Redis作为缓存中间件提升性能。本文探讨如何使用Redis设计缓存策略。Redis是开源内存数据结构存储系统,支持多种数据结构。Java中常用Redis客户端有Jedis和Lettuce。缓存设计遵循一致性、失效、雪崩、穿透和预热原则。常见缓存模式包括Cache-Aside、Read-Through、Write-Through和Write-Behind。示例展示了使用Jedis实现Cache-Aside模式。优化策略包括分布式锁、缓存预热、随机过期时间、限流和降级,以应对缓存挑战。
|
24天前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
28天前
|
运维 NoSQL 算法
Java开发-深入理解Redis Cluster的工作原理
综上所述,Redis Cluster通过数据分片、节点发现、主从复制、数据迁移、故障检测和客户端路由等机制,实现了一个分布式的、高可用的Redis解决方案。它允许数据分布在多个节点上,提供了自动故障转移和读写分离的功能,适用于需要大规模、高性能、高可用性的应用场景。
18 0
http://www.vxiaotou.com