注:接上篇IDEA整合Redis,本篇實(shí)現(xiàn)Redis的任務(wù)隊(duì)列,,Redis連接池具體配置看上篇,。 一:寫一個(gè)Jedis的工具類JedisUtil,將Jedis中的部分方法實(shí)現(xiàn),,代碼如下:
package com.wq.Util; import com.wq.RedisPool.RedisPool; import redis.clients.jedis.Jedis; import java.util.List; public class JedisUtil { private static Jedis jedis = null; /** * 存儲(chǔ)REDIS隊(duì)列 順序存儲(chǔ) * @param key 字節(jié)類型 * @param value 字節(jié)類型 */ public static void lpush(byte[] key,byte[] value){ try { jedis = RedisPool.getJedis(); jedis.lpush(key,value); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource(jedis); } } /** * 存儲(chǔ)REDIS隊(duì)列 反序存儲(chǔ) * @param key 字節(jié)類型 * @param value 字節(jié)類型 */ public static void rpush(byte[] key,byte[] value){ try { jedis = RedisPool.getJedis(); jedis.rpush(key,value); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource(jedis); } } /** * 移除列表的最后一個(gè)元素,,并將該元素添加到另一個(gè)列表并返回,就可以實(shí)現(xiàn)任務(wù)隊(duì)列 * @param srckey 原隊(duì)列的key * @param dstkey 目標(biāo)隊(duì)列的key */ public static byte[] rpoplpush(byte[] srckey,byte[] dstkey){ byte[] value = null; try { jedis = RedisPool.getJedis(); value= jedis.rpoplpush(srckey,dstkey); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource(jedis); } return value; } /** * 從列表中彈出一個(gè)值,將彈出的元素插入到另外一個(gè)列表中并返回它,; 如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。 * @param srckey * @param dstkey * @param timout * @return */ public static byte[] brpoplpush(byte[] srckey,byte[] dstkey,int timout){ byte[] value = null; try { jedis = RedisPool.getJedis(); value = jedis.brpoplpush(srckey,dstkey,timout); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return value; } /** * 設(shè)置實(shí)現(xiàn)任務(wù)隊(duì)列的鍵和過期時(shí)間 * @param key * @param timeout */ public static List<byte[]> brpop(byte[] key, int timeout){ List<byte[]> result = null; try { jedis = RedisPool.getJedis(); result=jedis.brpop(0,key); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return result; } /** * 移除隊(duì)列中的最后一個(gè)元素并顯示最后一個(gè)元素 * @param key * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; try { jedis = RedisPool.getJedis(); bytes = jedis.rpop(key); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return bytes; } }二:寫一個(gè)實(shí)體類MessageUtil,,實(shí)現(xiàn)存入Redis中的是對象,,不是單單的基本類型,存入Redis中的對象需要實(shí)現(xiàn)序列化接口,,代碼如下:
package com.wq.Util; import java.io.Serializable; public class MessageUtil implements Serializable{ private static final long serialVersionUID = -8785806144878640550L; private int id; private String content; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }三:使用兩個(gè)Redis列表,,一個(gè)隊(duì)列作為生成者,一個(gè)隊(duì)列作為消費(fèi)者,,加上線程實(shí)現(xiàn)兩個(gè)列表,,一個(gè)列表產(chǎn)生任務(wù),通過任務(wù)隊(duì)列,,另一個(gè)列表處理任務(wù),,代碼如下:
1.使用jedis中的flushAll方法,將Redis數(shù)據(jù)庫中的所有key清空:
控制臺(tái)信息說明,,兩個(gè)隊(duì)列都不存在了,; 2.再使用initList類,創(chuàng)建一個(gè)列表,,并將對象序列化過后存入列表,,并有線程持續(xù)產(chǎn)生對象并插入列表,代碼如下:
package com.wq.Util; import com.wq.RedisPool.RedisPool; import redis.clients.jedis.Jedis; public class initList { public static byte[] rediskey = "key".getBytes(); public static byte[] dstkey = "dstkey".getBytes(); public static long time=0; public static int i=0; public static void main(String args[]) { Jedis jedis = RedisPool.getJedis(); while (true){ try { MessageUtil msg1 = new MessageUtil(); msg1.setId(i); msg1.setContent("wq"+i); JedisUtil.lpush(rediskey,SerialoizebleUtil.serialize(msg1)); time=2000; System.out.println("success"+i); System.out.println(jedis.lrange(rediskey,0,100)); i++; Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } }控制臺(tái)輸出如下,,說明一直在向列表中插入新產(chǎn)生的對象,,這里插入了6個(gè)對象后,停止線程:
2.再開啟一個(gè)線程,,使用Redis中的brpoplpush方法,,實(shí)現(xiàn)任務(wù)隊(duì)列原理,代碼如下:
package com.wq.Util; import com.wq.RedisPool.RedisPool; import junit.framework.TestCase; import redis.clients.jedis.Jedis; public class JedisUtilTest extends TestCase { public static byte[] rediskey = "key".getBytes(); public static byte[] dstkey = "dstkey".getBytes(); public static long time=0; public static void main(String args[]){ Jedis jedis = RedisPool.getJedis(); while (true) { try { byte[] bytes = JedisUtil.brpoplpush(rediskey, dstkey, 0); MessageUtil msg = (MessageUtil) SerialoizebleUtil.unSerialize(bytes); if (msg != null) { System.out.println(msg.getId() + " " + msg.getContent()); } time=3000; System.out.println(jedis.lrange(rediskey,0,100)); System.out.println(jedis.lrange(dstkey,0,100)); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } }控制臺(tái)輸出如下:
上面的列表存的是剛才產(chǎn)生的6個(gè)對象,下面圈出來的是新的列表,,可以看出新的列表的對象在遞增,,說明成功實(shí)現(xiàn)了任務(wù)隊(duì)列原理,下面是全部完成了的圖片
這里將兩個(gè)線程同時(shí)啟動(dòng),,更容易理解任務(wù)隊(duì)列以及生產(chǎn)者消費(fèi)者概念,。
To be continue.......... |
|