Java 分布式缓存

什么是缓存?缓存有什么作用?

在讲定义之前,我们先看看开发过程中遇到的问题:

随着互联网的兴起,网民的数量越来越多,有些高权重的网站,流量特别多,我们这些小站是 多么羡慕嫉妒恨啊,但是流量多,也是是非多,为什么这么说呢?因为传统的数据库主要是 关系型数据库,而且应用最多的就是MySQL,想必大家都知道,MySQL是百万级量,当数据 达到千万级的时候,数据查询是非常慢,而且本身MySQL难以支持大并发!尽管MySQL 可以支持集群部署,但那也是杯水车薪,根本无法挡住流量的洪流!这个时候 缓存技术 就出现了, 它主要是:

这三个都是NoSQL,它们都能够支持大并发,因此有很多企业开始把部分读的功能放在缓存上,它 的步骤是这样的

  • 先判断缓存中心里面有没有缓存
  • 如果有缓存,则直接取缓存的数据
  • 如果没有缓存,则从数据库提取数据,然后写到缓存里,并设置有效期;

经过这样改造的系统,很多系统的并发能力提高了不少!有些企业高并发的业务,甚至把缓存 搭建成分页式缓存,提供多节点服务,提供分页式解决方案有

  • Twitter 开源的 twemproxy
  • Codis 我们中国的(很多大企业都在用)
  • Redis 自身也有提供分布式,但是很少商用!

注:

pika 也是实现了Redis 协议,它是 360 开源的,由于 Redis 的一些不足,例如

1. Redis重启时,如果数据里大,就会发生加载慢等问题,还有就是
2. 内存昂贵,多数公司还是承担不起的

因此Pika正是在这样的情况下诞生的,具说 alibaba 也开始研究此项目,并应用到阿里
自己的系统上!

当然在使用上,和Redis没有什么区别,Redis用什么客户端,Pika 都是能够兼容的!

redis和memcached的区别

Redis支持多种复杂数据结构

相比于memcached,redis拥有更多是数据结构,所以支持更多的数据操作,redis允许的value数据结构类型有5种:String(字符串)、List(列表)、Set(集合)、Hash(哈希)、Zset(有序集合)。

性能对比

redis只支持单核,memcached可以使用多核,所以平均每一个核上 redis 在存储小数据时比 memcached 性能更高。但是在大数据存储上的处理比起memcached,redis还是稍微逊色了点。

为什么redis单线程处理效率高?

1. 纯内存操作。
2. 核心是基于非阻塞的 IO 多路复用机制。
3. C 语言实现,一般来说,C 语言实现的程序“距离”操作系统更近,执行速度相对会更快。
4. 单线程反而避免了多线程的频繁上下文切换问题,预防了多线程可能产生的竞争问题。

分布式缓存搭建

pika 分布式缓存搭建

缓存常见问题

代码示例

  • 第一步 需要引入maven jar
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.9.0</version>
</dependency>
  • 第二步 代码

Redis 连接池初始化

public class SharedJedisManager {

	private static Log LOG = LogFactory.getLog(SharedJedisManager.class);

	private static final int DEFAULT_MAX_WAIT = 10000;
	private static final int DEFAULT_MAXTOTAL = 2000;
	private static final int DEFAULT_MAXIDEL = 50;

	private static final SharedJedisManager instance = new SharedJedisManager();

	private ShardedJedisPool pool;

	private ShardedJedis mDefaultJedis;

	public static SharedJedisManager getInstance()
	{
		return instance;
	}

	private SharedJedisManager()
	{
		int maxTotal = DEFAULT_MAXTOTAL;
		int maxIdle = DEFAULT_MAXIDEL;
//		int maxTotalEnv = StringUtils.asInt(System.getProperty("pika.max.total"));
//		if(maxTotalEnv > 0) maxTotal = maxTotalEnv;

		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxWaitMillis(DEFAULT_MAX_WAIT);
		config.setMaxTotal(DEFAULT_MAXTOTAL);
		config.setMaxIdle(maxIdle);
		config.setTestOnBorrow(false);
		config.setBlockWhenExhausted(false);

		// 线程池异步
//		this.mLoadBalance = new RoundRobinSharedLoadBalance(config);

		MyConfiguration conf = MyConfiguration.getInstance();
		String[] hosts = conf.getStrings("pika.master.server1");
		int port = conf.getInt("pika.master.port", 9221);

		// master
		for(String host : hosts) {
			List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
			JedisShardInfo sharedInfo = new JedisShardInfo(host, port, 3000);
			shards.add(sharedInfo);
			this.pool = new ShardedJedisPool(config, shards);
			LOG.info("my shared client pika server = " + host);
			break;
		}
		this.mDefaultJedis = getResource();
		LOG.info("[pika-config]-[maxTotal=" + maxTotal + "]-[maxIdle=" + maxIdle + "]");
	}

	private ShardedJedis getResource() {
		return pool.getResource();
	}

	// ============================== get ==============================
	public String getString(String key) {
		String rs = null;
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis != null) {
				rs = jedis.get(key);
			} else {
				rs = mDefaultJedis.get(key);
			}
		} catch (Exception e) {
			LOG.error("getString error:", e);
		} finally {
			close(jedis);
		}
		return rs;
	}

	public List<Object> getStringByPipeline(String... keys)
	{
		List<Object> rsList = null;
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis == null) return null;

			ShardedJedisPipeline pipeline = jedis.pipelined();
			for(String key : keys)
			{
				pipeline.get(key);
			}
			rsList = pipeline.syncAndReturnAll();
		} catch (Exception e) {
		} finally {
			close(jedis);
		}
		return rsList;
	}

	public void setStringByPipeline(Map<String, Object> keyValue)
	{
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis == null) return;
			ShardedJedisPipeline pipeline = jedis.pipelined();
			Set<Map.Entry<String, Object>> setMap= keyValue.entrySet();
			boolean first = true;
			boolean isString = false;
			for(Map.Entry<String, Object> entry : setMap)
			{
				if(isString)
				{
					pipeline.set(entry.getKey(), (String)entry.getValue());
				}
				else
				{
					Object value = entry.getValue();
					if(first)
					{
						if(value instanceof String)
						{
							pipeline.set(entry.getKey(), (String)value);
							isString = true;
						}
						first = false;
					}
					else
					{
						String valueString = FastJsonHelper.jsonEncode(value);
						pipeline.set(entry.getKey(), valueString);
					}

				}

			}
			pipeline.sync();
		} catch (Exception e) {
		} finally {
			close(jedis);
		}
	}

	// ============================== set ==============================
	/**
	 * <p>
	 * 设置key value,如果key已经存在则返回0,nx==> not exist
	 * </p>
	 *
	 * @param key
	 * @param value
	 * @return 成功返回1 如果存在 和 发生异常 返回 0
	 */
	public void setString(String key, String value) {
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis != null)
			{
				jedis.set(key, value);
			} else {
				mDefaultJedis.set(key, value);
			}
		} catch (Exception e) {
			LOG.error("setString error:", e);
		} finally {
			close(jedis);
		}
	}

	/**
	 * <p>
	 * 设置key value并制定这个键值的有效期
	 * </p>
	 *
	 * @param key
	 * @param value
	 * @param seconds
	 *            单位:秒
	 * @return 成功返回OK 失败和异常返回null
	 */
	public void setString(String key, String value, int expire) {
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis != null)
			{
				jedis.setex(key, expire, value);
			} else {
				mDefaultJedis.setex(key, expire, value);
			}
		} catch (Exception e) {
			LOG.error("setString error:", e);
		} finally {
			close(jedis);
		}
	}

	// ============================== delete ==============================
	public void delete(String key) {
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis != null)
			{
				jedis.del(key);
			} else {
				mDefaultJedis.del(key);
			}
		} catch (Exception e) {
			LOG.error("delete error:", e);
		} finally {
			close(jedis);
		}
	}


	public boolean exists(String key)
	{
		boolean rs = false;
		ShardedJedis jedis = null;
		try {
			jedis = getResource();
			if(jedis != null)
			{
				rs = jedis.exists(key);
			} else {
				mDefaultJedis.exists(key);
			}
		} catch (Exception e) {
			LOG.error("delete error:", e);
		} finally {
			close(jedis);
		}
		return rs;
	}

	private void close(ShardedJedis redis) {
		try {
			if (redis != null)
				redis.close();
		} catch (Exception e) {
			LOG.error("close error:", e);
		}
	}

	public static void main(String[] args) throws IOException
	{
		SharedJedisManager client = SharedJedisManager.getInstance();
		client.setString("a1", "1");
		client.setString("a2", "2");
		client.setString("a3", "3");

		List<Object> list = client.getStringByPipeline("a1", "a2", "a3");

		for(Object value : list)
		{
			System.out.println(value);
		}

	}
}

定义 Redis 接口层

public interface RedisService {

	/**
     * <p>设置key value并制定这个键值的有效期</p>
     * @param key
     * @param value
     * @param seconds 单位:秒
     */  
    public void setString(String key,String value, int expire);

	public List<Object> getStringByPipeline(String... keys);
	public void setStringByPipeline(Map<String, Object> keyValue);

    public String getString(String key);
    public void delete(String key);
    public boolean exists(String key);

}

实现Redis接口

public class SharedServiceImpl implements RedisService {

	private SharedJedisManager client = SharedJedisManager.getInstance();

	// ============================== get ==============================
	public String getString(String key) {
		return client.getString(key);
	}

	public List<Object> getList(String... keys)
	{
		return client.getStringByPipeline(keys);
	}

	public void setString(String key, String value, int expire) {
		if(expire <= 0)
		{
			client.setString(key, value);
		}
		else
		{
			client.setString(key, value, expire);
		}
	}

	public void delete(String key) {
		client.delete(key);
	}

	public boolean exists(String key)
	{
		return client.exists(key);
	}

	public List<Object> getStringByPipeline(String... keys)
	{
		return client.getStringByPipeline(keys);
	}
	public void setStringByPipeline(Map<String, Object> keyValue)
	{
		client.setStringByPipeline(keyValue);
	}

}

封装Api层

public class PikaManager {

	private static RedisService mRedisService ;

	private interface PikaManagerInternal {
		public PikaManager mgr = new PikaManager();
	}

	public static PikaManager getIntance()
	{
		return PikaManagerInternal.mgr;
	}

	private PikaManager()
	{
		synchronized (RedisService.class) {
			if(mRedisService == null)
			{
				mRedisService = new SharedServiceImpl();
			}
		}
	}

	// ============================== get ==============================
	public String getString(String key)
	{
		return mRedisService.getString(key);
	}

	public String getList(String key)
	{
		return mRedisService.getString(key);
	}

	// ============================== set ==============================
	/**
     * <p>设置key value并制定这个键值的有效期</p>
     * @param key
     * @param value
     * @param seconds 单位:秒
     * @return 成功返回OK 失败和异常返回null
     */  
    public void setString(String key,String value, int expire)
    {  
    	mRedisService.setString(key, value, expire);  
    }  

	// ============================== delete ==============================
	public void delete(String key)
	{
		mRedisService.delete(key);
	}

	public boolean exists(String key)
	{
		return mRedisService.exists(key);
	}

	public List<Object> getStringByPipeline(String... keys)
	{
		return mRedisService.getStringByPipeline(keys);
	}
	public void setStringByPipeline(Map<String, Object> keyValue)
	{
		 mRedisService.setStringByPipeline(keyValue);
	}

	public void test1()
	{
		setString("test", "good", 3600);
		String value = getString("test");
//		if("OK".equalsIgnoreCase(rs)) System.out.println("ok================");
		System.out.println("========> rs = " + ", value = " + value + "  ");
//		System.out.println("delete result = " + delete("test2"));
	}

	public void testExpire()
	{
		try {
			String key = "category_list";
			setString(key, "test3", -1);
		} catch (Exception e) {
			System.out.println("error");
		}
//		
//		String value = getString(key);
//		
//		System.out.println("========> rs = " + null + ", value = " + value + "  ");

//		System.out.println(delete("test3"));
	}

	public static void test()
	{
		PikaManager redisManager = new PikaManager();

		testThread(redisManager);
		testThread(redisManager);
		testThread(redisManager);
		testThread(redisManager);

	}

	private static void testThread(PikaManager redisManager)
	{
		new Thread(new Runnable() {

			@Override
			public void run() {
				boolean start = true;
				while(start)
				{
					redisManager.setString("test", "111", 100);
					System.out.println(redisManager.getString("test"));
					//ThreadUtils.sleep(50);
				}
			}
		}).start();
	}

	public static void main(String[] args) throws InterruptedException
	{
		test();
//		PikaManager redisManager = new PikaManager();
////		redisManager.testExpire();
//		while(true) {
//			try {
//				redisManager.test1();
//			} catch (Exception e) {
//				e.printStackTrace();
//				System.out.println("error");
//			}
//			Thread.sleep(1000);
//		}

	}

}

接口Api说明

  • getString 获取缓存

  • setString 写入缓存

  • setStringByPipeline 通过管道写入缓存

  • delete 删除缓存

  • exists 判断缓存是不是存在