Java 分布式缓存
什么是缓存?缓存有什么作用?
在讲定义之前,我们先看看开发过程中遇到的问题:
随着互联网的兴起,网民的数量越来越多,有些高权重的网站,流量特别多,我们这些小站是 多么羡慕嫉妒恨啊,但是流量多,也是是非多,为什么这么说呢?因为传统的数据库主要是 关系型数据库,而且应用最多的就是MySQL,想必大家都知道,MySQL是百万级量,当数据 达到千万级的时候,数据查询是非常慢,而且本身MySQL难以支持大并发!尽管MySQL 可以支持集群部署,但那也是杯水车薪,根本无法挡住流量的洪流!这个时候 缓存技术 就出现了, 它主要是:
这三个都是NoSQL,它们都能够支持大并发,因此有很多企业开始把部分读的功能放在缓存上,它 的步骤是这样的
- 先判断缓存中心里面有没有缓存
- 如果有缓存,则直接取缓存的数据
- 如果没有缓存,则从数据库提取数据,然后写到缓存里,并设置有效期;
经过这样改造的系统,很多系统的并发能力提高了不少!有些企业高并发的业务,甚至把缓存 搭建成分页式缓存,提供多节点服务,提供分页式解决方案有
- Twitter 开源的 twemproxy
- Codis 我们中国的(很多大企业都在用)
- Redis 自身也有提供分布式,但是很少商用!
注:
pika 也是实现了Redis 协议,它是 360 开源的,由于 Redis 的一些不足,例如
1. Redis重启时,如果数据里大,就会发生加载慢等问题,还有就是
2. 内存昂贵,多数公司还是承担不起的
因此Pika正是在这样的情况下诞生的,具说 alibaba 也开始研究此项目,并应用到阿里
自己的系统上!
当然在使用上,和Redis没有什么区别,Redis用什么客户端,Pika 都是能够兼容的!
redis和memcached的区别
Redis支持多种复杂数据结构
相比于memcached,redis拥有更多是数据结构,所以支持更多的数据操作,redis允许的value数据结构类型有5种:String(字符串)、List(列表)、Set(集合)、Hash(哈希)、Zset(有序集合)。
性能对比
redis只支持单核,memcached可以使用多核,所以平均每一个核上 redis 在存储小数据时比 memcached 性能更高。但是在大数据存储上的处理比起memcached,redis还是稍微逊色了点。
为什么redis单线程处理效率高?
1. 纯内存操作。
2. 核心是基于非阻塞的 IO 多路复用机制。
3. C 语言实现,一般来说,C 语言实现的程序“距离”操作系统更近,执行速度相对会更快。
4. 单线程反而避免了多线程的频繁上下文切换问题,预防了多线程可能产生的竞争问题。
分布式缓存搭建
缓存常见问题
代码示例
- 第一步 需要引入maven jar
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
- 第二步 代码
Redis 连接池初始化
public class SharedJedisManager {
private static Log LOG = LogFactory.getLog(SharedJedisManager.class);
private static final int DEFAULT_MAX_WAIT = 10000;
private static final int DEFAULT_MAXTOTAL = 2000;
private static final int DEFAULT_MAXIDEL = 50;
private static final SharedJedisManager instance = new SharedJedisManager();
private ShardedJedisPool pool;
private ShardedJedis mDefaultJedis;
public static SharedJedisManager getInstance()
{
return instance;
}
private SharedJedisManager()
{
int maxTotal = DEFAULT_MAXTOTAL;
int maxIdle = DEFAULT_MAXIDEL;
// int maxTotalEnv = StringUtils.asInt(System.getProperty("pika.max.total"));
// if(maxTotalEnv > 0) maxTotal = maxTotalEnv;
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxWaitMillis(DEFAULT_MAX_WAIT);
config.setMaxTotal(DEFAULT_MAXTOTAL);
config.setMaxIdle(maxIdle);
config.setTestOnBorrow(false);
config.setBlockWhenExhausted(false);
// 线程池异步
// this.mLoadBalance = new RoundRobinSharedLoadBalance(config);
MyConfiguration conf = MyConfiguration.getInstance();
String[] hosts = conf.getStrings("pika.master.server1");
int port = conf.getInt("pika.master.port", 9221);
// master
for(String host : hosts) {
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
JedisShardInfo sharedInfo = new JedisShardInfo(host, port, 3000);
shards.add(sharedInfo);
this.pool = new ShardedJedisPool(config, shards);
LOG.info("my shared client pika server = " + host);
break;
}
this.mDefaultJedis = getResource();
LOG.info("[pika-config]-[maxTotal=" + maxTotal + "]-[maxIdle=" + maxIdle + "]");
}
private ShardedJedis getResource() {
return pool.getResource();
}
// ============================== get ==============================
public String getString(String key) {
String rs = null;
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis != null) {
rs = jedis.get(key);
} else {
rs = mDefaultJedis.get(key);
}
} catch (Exception e) {
LOG.error("getString error:", e);
} finally {
close(jedis);
}
return rs;
}
public List<Object> getStringByPipeline(String... keys)
{
List<Object> rsList = null;
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis == null) return null;
ShardedJedisPipeline pipeline = jedis.pipelined();
for(String key : keys)
{
pipeline.get(key);
}
rsList = pipeline.syncAndReturnAll();
} catch (Exception e) {
} finally {
close(jedis);
}
return rsList;
}
public void setStringByPipeline(Map<String, Object> keyValue)
{
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis == null) return;
ShardedJedisPipeline pipeline = jedis.pipelined();
Set<Map.Entry<String, Object>> setMap= keyValue.entrySet();
boolean first = true;
boolean isString = false;
for(Map.Entry<String, Object> entry : setMap)
{
if(isString)
{
pipeline.set(entry.getKey(), (String)entry.getValue());
}
else
{
Object value = entry.getValue();
if(first)
{
if(value instanceof String)
{
pipeline.set(entry.getKey(), (String)value);
isString = true;
}
first = false;
}
else
{
String valueString = FastJsonHelper.jsonEncode(value);
pipeline.set(entry.getKey(), valueString);
}
}
}
pipeline.sync();
} catch (Exception e) {
} finally {
close(jedis);
}
}
// ============================== set ==============================
/**
* <p>
* 设置key value,如果key已经存在则返回0,nx==> not exist
* </p>
*
* @param key
* @param value
* @return 成功返回1 如果存在 和 发生异常 返回 0
*/
public void setString(String key, String value) {
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis != null)
{
jedis.set(key, value);
} else {
mDefaultJedis.set(key, value);
}
} catch (Exception e) {
LOG.error("setString error:", e);
} finally {
close(jedis);
}
}
/**
* <p>
* 设置key value并制定这个键值的有效期
* </p>
*
* @param key
* @param value
* @param seconds
* 单位:秒
* @return 成功返回OK 失败和异常返回null
*/
public void setString(String key, String value, int expire) {
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis != null)
{
jedis.setex(key, expire, value);
} else {
mDefaultJedis.setex(key, expire, value);
}
} catch (Exception e) {
LOG.error("setString error:", e);
} finally {
close(jedis);
}
}
// ============================== delete ==============================
public void delete(String key) {
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis != null)
{
jedis.del(key);
} else {
mDefaultJedis.del(key);
}
} catch (Exception e) {
LOG.error("delete error:", e);
} finally {
close(jedis);
}
}
public boolean exists(String key)
{
boolean rs = false;
ShardedJedis jedis = null;
try {
jedis = getResource();
if(jedis != null)
{
rs = jedis.exists(key);
} else {
mDefaultJedis.exists(key);
}
} catch (Exception e) {
LOG.error("delete error:", e);
} finally {
close(jedis);
}
return rs;
}
private void close(ShardedJedis redis) {
try {
if (redis != null)
redis.close();
} catch (Exception e) {
LOG.error("close error:", e);
}
}
public static void main(String[] args) throws IOException
{
SharedJedisManager client = SharedJedisManager.getInstance();
client.setString("a1", "1");
client.setString("a2", "2");
client.setString("a3", "3");
List<Object> list = client.getStringByPipeline("a1", "a2", "a3");
for(Object value : list)
{
System.out.println(value);
}
}
}
定义 Redis 接口层
public interface RedisService {
/**
* <p>设置key value并制定这个键值的有效期</p>
* @param key
* @param value
* @param seconds 单位:秒
*/
public void setString(String key,String value, int expire);
public List<Object> getStringByPipeline(String... keys);
public void setStringByPipeline(Map<String, Object> keyValue);
public String getString(String key);
public void delete(String key);
public boolean exists(String key);
}
实现Redis接口
public class SharedServiceImpl implements RedisService {
private SharedJedisManager client = SharedJedisManager.getInstance();
// ============================== get ==============================
public String getString(String key) {
return client.getString(key);
}
public List<Object> getList(String... keys)
{
return client.getStringByPipeline(keys);
}
public void setString(String key, String value, int expire) {
if(expire <= 0)
{
client.setString(key, value);
}
else
{
client.setString(key, value, expire);
}
}
public void delete(String key) {
client.delete(key);
}
public boolean exists(String key)
{
return client.exists(key);
}
public List<Object> getStringByPipeline(String... keys)
{
return client.getStringByPipeline(keys);
}
public void setStringByPipeline(Map<String, Object> keyValue)
{
client.setStringByPipeline(keyValue);
}
}
封装Api层
public class PikaManager {
private static RedisService mRedisService ;
private interface PikaManagerInternal {
public PikaManager mgr = new PikaManager();
}
public static PikaManager getIntance()
{
return PikaManagerInternal.mgr;
}
private PikaManager()
{
synchronized (RedisService.class) {
if(mRedisService == null)
{
mRedisService = new SharedServiceImpl();
}
}
}
// ============================== get ==============================
public String getString(String key)
{
return mRedisService.getString(key);
}
public String getList(String key)
{
return mRedisService.getString(key);
}
// ============================== set ==============================
/**
* <p>设置key value并制定这个键值的有效期</p>
* @param key
* @param value
* @param seconds 单位:秒
* @return 成功返回OK 失败和异常返回null
*/
public void setString(String key,String value, int expire)
{
mRedisService.setString(key, value, expire);
}
// ============================== delete ==============================
public void delete(String key)
{
mRedisService.delete(key);
}
public boolean exists(String key)
{
return mRedisService.exists(key);
}
public List<Object> getStringByPipeline(String... keys)
{
return mRedisService.getStringByPipeline(keys);
}
public void setStringByPipeline(Map<String, Object> keyValue)
{
mRedisService.setStringByPipeline(keyValue);
}
public void test1()
{
setString("test", "good", 3600);
String value = getString("test");
// if("OK".equalsIgnoreCase(rs)) System.out.println("ok================");
System.out.println("========> rs = " + ", value = " + value + " ");
// System.out.println("delete result = " + delete("test2"));
}
public void testExpire()
{
try {
String key = "category_list";
setString(key, "test3", -1);
} catch (Exception e) {
System.out.println("error");
}
//
// String value = getString(key);
//
// System.out.println("========> rs = " + null + ", value = " + value + " ");
// System.out.println(delete("test3"));
}
public static void test()
{
PikaManager redisManager = new PikaManager();
testThread(redisManager);
testThread(redisManager);
testThread(redisManager);
testThread(redisManager);
}
private static void testThread(PikaManager redisManager)
{
new Thread(new Runnable() {
@Override
public void run() {
boolean start = true;
while(start)
{
redisManager.setString("test", "111", 100);
System.out.println(redisManager.getString("test"));
//ThreadUtils.sleep(50);
}
}
}).start();
}
public static void main(String[] args) throws InterruptedException
{
test();
// PikaManager redisManager = new PikaManager();
//// redisManager.testExpire();
// while(true) {
// try {
// redisManager.test1();
// } catch (Exception e) {
// e.printStackTrace();
// System.out.println("error");
// }
// Thread.sleep(1000);
// }
}
}
接口Api说明
-
getString 获取缓存
-
setString 写入缓存
-
setStringByPipeline 通过管道写入缓存
-
delete 删除缓存
-
exists 判断缓存是不是存在