博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用ConcurrentHashMap实现高效缓存框架
阅读量:6038 次
发布时间:2019-06-20

本文共 7013 字,大约阅读时间需要 23 分钟。

hot3.png

      在项目中我们有的时候需要使用某种形式的缓存,使用缓存能够重用之前的计算结果,降低系统延迟,提高吞吐量,但是其却会消耗更多的内存。就像许多重复发明的轮子一样,缓存框架的设计看上去简单,但系统的性能将会由缓存框架的伸缩性决定。如下是一段使用HashMap实现的缓存框架:

public interface Computable
{ V compute(A arg) throws InterruptedException;}
import java.math.BigInteger;public class ExpensiveFunction implements Computable
{ @Override public BigInteger compute(String arg) throws InterruptedException { return new BigInteger(arg); }}
import java.util.HashMap;import java.util.Map;public class Memorizer1
implements Computable
{ private final Map
cache = new HashMap<>(); private final Computable
c; public Memorizer1(Computable
c) { this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (null == result) { result = c.compute(arg); cache.put(arg, result); } return result; }}

      上述代码中,Computable接口定义的是一类用于执行某种类型计算的策略族。ExpensiveFunction实现了Computable接口,该类在概念上是通过传入的参数arg,经过一系列复杂计算而得到结果,这里为了方便起见,只是返回了一个BigInteger对象。Memorizer1类也实现了Computable接口,这里实际上用到了装饰者模式,在构造Memorizer1类时需要传入一个Computable类型对象进来,如ExpensiveFunction,当需要使用ExpensiveFunction类来进行复杂计算时,可以通过Memorizer1类来对其进行装饰,转而调用Memorizer1的compute方法。而在Memorizer1内部,其使用了一个HashMap来对真正的Computable对象(如ExpensiveFunction)的结果进行了缓存,如果传入的参数arg能够在cache中找到结果,那么直接返回,否则调用实际的Computable::compute方法进行计算,通过这种方式达到提高系统新能的目的。

      上述Memorizer1虽然能够实现对计算结果的缓存,但是由于HashMap不是线程安全的,其使用synchronized将整个compute方法包裹起来,当并发量较高时,就会出现多个线程同时竞争执行compute方法的情况,系统的性能可能比不使用缓存更低,此时如何提高Memorizer1的包裹内容的伸缩性将决定了系统的性能。

      观察这段程序会发现,Memorizer1中有两个全局属性,一个是用于缓存的cache,一个是用于计算的c。这里c对象只表示了一组通过传入参数计算结果的策略,其是一个无状态对象(所谓的无状态对象是指其内部没有被多个线程公用的属性);而主要的cache对象才是所有线程需要竞争的对象,因而这里使用锁的时候其实只需要锁住HashMap一个类型的对象即可。在java中对于HashMap,可以使用ConcurrentHashMap来解决多线程竞争map对象的问题,如下是使用ConcurrentHashMap改写Memorizer1的形式:

import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class Memorizer2
implements Computable
{ private final Map
cache = new ConcurrentHashMap<>(); private final Computable
c; public Memorizer2(Computable
c) { this.c = c; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (null == result) { result = c.compute(arg); cache.put(arg, result); } return result; }}

      上述Memorizer2很好的改善了Memorizer1中存在的不足,并且多线程情况下也能够很好的运行,但是其仍存在一些不足,通过分析Memorizer2::compute方法可以发现,使用cache的位置主要有两个:首先获取cache中键为arg的值,如果不存在则调用Computable::compute方法来计算结果,并将结果保存到cache中。其实在调用cache::get和cache::put方法之间进行了一些计算,当一个线程执行了cache::get方法之后得到了空的result,其会执行高额的Computable::compute方法,此时该线程还未执行到cache::put方法,而另一个线程也调用了cache::get方法,其也将得到一个空的result,这样这两个线程还是有可能执行两次Computable::compute方法,并且由于Computable::compute方法耗费较高,因而重复计算的概率还是比较大的。

      在java类库中提供了另外一个类FutureTask,该类有一个get方法,如果有结果,那么其会立即返回结果,如果还未运行完成,那么其会阻塞,直到得到运行结果为止。这里我们可以通过FutureTask改写上述缓存框架,如下是改写后的代码:

import java.util.Map;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;public class Memorizer3
implements Computable
{ private final Map
> cache = new ConcurrentHashMap<>(); private final Computable
c; public Memorizer3(Computable
c) { this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future
f = cache.get(arg); if (null == f) { Callable
eval = () -> { return c.compute(arg); }; FutureTask
ft = new FutureTask
(eval); f = ft; cache.put(arg, f); ft.run(); } try { return f.get(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } private InterruptedException launderThrowable(Throwable cause) { if (cause instanceof InterruptedException) { return (InterruptedException) cause; } else { return new InterruptedException(cause.getMessage()); } }}

      这里cache中存储的键还是计算需要的参数,而值则是一个个FutureTask对象,真正的计算放到了一个Callable对象中,在调用FutureTask::run方法时,是实际执行了Computable::compute方法。当一个线程传入一个参数之后,Memorizer3首先在cache中找是否已经有计算结果,如果没有,那么会创建一个FutureTask对象存入到cache中,然后调用FutureTask::run方法计算结果,最后调用FutureTask::get方法返回结果,如果run方法还未计算出结果,那么最后的get方法将会被阻塞,直到产生结果。

      Memorizer3对于缓存的实现几乎是完美的,但是其还是存在重复计算的缺陷,即当一个线程在cache中未找到结果,其创建了FutureTask对象并将其放入cache中,然后执行run方法计算结果的同时,另一个线程也传入同样的参数想要获取计算结果,其也发现cache中没有缓存结果,那么其也会创建一个新的FutureTask对象,并调用FutureTask::run方法计算结果。但是相对于Memorizer2,其出现重复计算的概率已经大大减少了。这里造成这个问题的原因还是在于Map的复合操作“若没有则添加”并不是原子的。在ConcurrentHashMap中提供了一个方法V putIfAbsent(K key, V value),其是一个原子的若不存在则添加方法,并且其会返回当前Map中已经存在的value值,若没有则返回null,如下是通过该方法解决上述重复计算问题的代码:

import java.util.concurrent.Callable;import java.util.concurrent.CancellationException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;public class Memorizer
implements Computable
{ private final ConcurrentMap
> cache = new ConcurrentHashMap<>(); private final Computable
c; public Memorizer(Computable
c) { this.c = c; } @Override public V compute(final A arg) throws InterruptedException { while (true) { Future
f = cache.get(arg); if (null == f) { Callable
eval = () -> { return c.compute(arg); }; FutureTask
ft = new FutureTask
(eval); f = cache.putIfAbsent(arg, ft); if (null == f) { f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(arg, f); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } } private InterruptedException launderThrowable(Throwable cause) { if (cause instanceof InterruptedException) { return (InterruptedException) cause; } return new InterruptedException(cause.getMessage()); }}

      这里通过调用ConcurrentHashMap::putIfAbsent方法,假设两个线程传入了同样的参数,并且都创建了一个FutureTask对象,一个线程获得了cache的执行权限执行了cache::putIfAbsent()方法,并且返回了一个null到局部变量f中,此时另一个线程也会调用cache::putIfAbsent()方法,由于第一个线程已经将相关键值对存入到cache中了,那么第二个线程将获得第一个线程创建的FutureTask对象,并且将其替换给当前线程中的局部变量f,并且其判断f不为null,那么其会调用f::get()方法,而此时第一个线程正在执行FutureTask::run方法,如果其已经计算完成,那么其会返回结果给第一个线程,而第二个线程是直接执行的FutureTask::get方法,如果第一个线程执行完成,那么第二个线程将直接获取结果返回,如果第一个线程没有执行完成,那么第二个线程将等待第一个线程执行完成后再返回结果。

      这里对compute方法使用while循环的目的是,当某个线程在执行结果的时候,其余的线程需要等待该线程执行完成,如果其余的线程由于某些原因等待被打断,那么通过while循环其会继续进入等待状态,从而得到执行结果。而对于某个执行计算结果的线程而言,如果其计算过程被取消或失败,那么对于缓存而言这就造成了缓存污染,因为存入的是FutureTask对象,而不是真正的结果,如果计算的线程被取消,那么实际上FutureTask::get方法将一直无法获取到值,但是cache中对应键的值也不是null的,这就是缓存污染的根源。上述代码中通过调用FutureTask::get方法时捕获CacellationException来捕捉执行计算的线程计算被取消或失败的情况,当出现这种情况的时候就从cache中将相应的FutureTask对象给移除掉,并且通过while循环也可以是后来进入的线程再次执行run方法从而得到计算结果。

      上述的Memorizer基本上是一个完美的缓存类,但是对于缓存而言,其数据如果存在过期问题,那么将需要另外进行设计,从而实现高性能吞吐的目的,当然,如果只是针对一些复杂的计算,只要传入的值不变,其结果永远不会发生变化,那么使用该框架还是非常合适的。

转载于:https://my.oschina.net/zhangxufeng/blog/898864

你可能感兴趣的文章
Google 翻译的妙用
查看>>
算法导论--python--插入排序
查看>>
Hydra用户手册
查看>>
常用的集合
查看>>
Unity3D工程源码目录
查看>>
杀死进程命令
查看>>
cookie 和session 的区别详解
查看>>
浮点数网络传输
查看>>
Mongodb对集合(表)和数据的CRUD操作
查看>>
面向对象类的解析
查看>>
tomcat如何修改发布目录
查看>>
CentOS 5.5 使用 EPEL 和 RPMForge 软件库
查看>>
Damien Katz弃Apache CouchDB,继以Couchbase Server
查看>>
Target runtime Apache Tomcat is not defined.错误解决方法
查看>>
某机字长为32位,存储容量为64MB,若按字节编址.它的寻址范围是多少?
查看>>
VC++ 监视文件(夹)
查看>>
【转】keyCode对照表及JS监听组合按键
查看>>
[Java开发之路](14)反射机制
查看>>
mac gentoo-prefix安装git svn
查看>>
浅尝异步IO
查看>>