关键词搜索

源码搜索 ×
×

Java 8并发教程:原子变量和ConcurrentMap

发布2017-05-25浏览2593次

详情内容

原文地址:http://winterbe.com/postshttps://files.jxasp.com/image/2015/05https://files.jxasp.com/image/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/

欢迎阅读我的Java 8中多线程编程教程系列的第三部分。本教程介绍了并发API的两个重要部分:原子变量和并发映射。 在最新的Java 8版本中引入了lambda表达式和功能编程,两者都得到了很大的改进。所有这些新功能都用一大堆易于理解的代码示例进行描述。 请享用!

为了简单起见,本教程的代码示例使用这里定义的两个辅助方法sleep(seconds)stop(executor)

AtomicInteger

java.concurrent.atomic包包含许多有用的类来执行原子操作。当您可以安全地在多个线程上并行执行操作时,操作是原子的,而不使用我以前的教程中所示的synchronized关键字或锁。

在内部,原子类大量使用比较和交换 CAS),这是大多数现代CPU直接支持的原子指令。那些指令通常比同步通过锁快得多。 所以我的建议是更喜欢原子类超过锁,以防你只需要同时更改单个可变变量。

AtomicInteger现在让我们选择一个原子类的几个例子:AtomicInteger

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> executor.submit(atomicInt::incrementAndGet));
  5. stop(executor);
  6. System.out.println(atomicInt.get()); // => 1000
通过使用 AtomicInteger 作为 Integer 的替代,我们可以在线程安全的庄园中同时增加数量,而不需要同步对变量的访问。 方法 incrementAndGet() 是一个原子操作,所以我们可以从多个线程安全地调用这个方法。

 AtomicInteger支持各种原子操作。方法updateAndGet()接受一个lambda表达式,以便对整数执行任意的算术运算:

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.updateAndGet(n -> n + 2);
  7. executor.submit(task);
  8. });
  9. stop(executor);
  10. System.out.println(atomicInt.get()); // => 2000
方法 accumulateAndGet() 接受另一种类型为 IntBinaryOperator lambda IntBinaryOperator 我们使用这种方法在下一个示例中将所有值从 0 1000 并发:

  1. AtomicInteger atomicInt = new AtomicInteger(0);
  2. ExecutorService executor = Executors.newFixedThreadPool(2);
  3. IntStream.range(0, 1000)
  4. .forEach(i -> {
  5. Runnable task = () ->
  6. atomicInt.accumulateAndGet(i, (n, m) -> n + m);
  7. executor.submit(task);
  8. });
  9. stop(executor);
  10. System.out.println(atomicInt.get()); // => 499500

其他有用的原子类是 AtomicBoolean AtomicLong AtomicReference

LongAdder

可以使用LongAdder类作为AtomicLong的替代方法来AtomicLong地向数字添加值。

  1. ExecutorService executor = Executors.newFixedThreadPool(2);
  2. IntStream.range(0, 1000)
  3. .forEach(i -> executor.submit(adder::increment));
  4. stop(executor);
  5. System.out.println(adder.sumThenReset()); // => 1000
LongAdder 提供了方法 add() increment() ,就像原子序列类一样,也是线程安全的。 但是,除了总结单个结果之外,这个类在内部维护一组变量以减少对线程的争用。 实际结果可以通过调用 sum() sumThenReset()

当多线程的更新比读取更常见时,此类通常优于原子序号。 捕获统计数据时通常是这种情况,例如,您想要计算在Web服务器上提供的请求数。LongAdder的缺点是更高的内存消耗,因为一组变量被保存在内存中。

LongAccumulator

 LongAccumulatorLongAdder的更广泛版本。代替执行简单的添加操作,类LongAccumulator构建了LongBinaryOperator类型的lambda表达式,如此代码示例所示:

  1. LongBinaryOperator op = (x, y) -> 2 * x + y;
  2. LongAccumulator accumulator = new LongAccumulator(op, 1L);
  3. ExecutorService executor = Executors.newFixedThreadPool(2);
  4. IntStream.range(0, 10)
  5. .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
  6. stop(executor);
  7. System.out.println(accumulator.getThenReset()); // => 2539
我们创建一个具有函数 2 * x + y 和初始值为 1 LongAccumulator 每次调用 accumulate(i) 当前结果和值 i 都作为参数传递给 lambda 表达式。

LongAccumulator就像LongAdder一样,在LongAdder维护一组变量以减少与线程的争用。

ConcurrentMap

ConcurrentMap扩展了映射接口,并定义了最有用的并发收集类型之一。 Java 8通过向此界面添加新方法来引入功能编程。

在下面的代码段中,我们使用以下示例映射来演示这些新方法:

  1. ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
  2. map.put("foo", "bar");
  3. map.put("han", "solo");
  4. map.put("r2", "d2");
  5. map.put("c3", "p0");

方法forEach() 接受一个类型为 BiConsumer lambda 表达式, BiConsumer 具有作为参数传递的映射的键和值。 它可以用作替代每个循环来遍历并发映射的条目。 迭代在当前线程上顺序执行。

map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));

方法putIfAbsent() 只有在给定键不存在任何值时,才会将新值放入映射。 至少对于 ConcurrentHashMap ,该方法的实现是线程安全的,就像 put() ,所以你不必同步从不同的线程并发访问映射:

  1. String value = map.putIfAbsent("c3", "p1");
  2. System.out.println(value); // p0
getOrDefault() 方法返回给定键的值。 如果此键不存在,则返回传递的默认值:

  1. String value = map.getOrDefault("hi", "there");
  2. System.out.println(value); // there

replaceAll() 方法接受一个类型为 BiFunction lambda BiFunction BiFunctions 需要两个参数并返回一个值。 在这种情况下,使用键和每个映射条目的值调用该函数,并返回要为当前密钥分配的新值:

  1. map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
  2. System.out.println(map.get("r2")); // d3
而不是替换 map 的所有值 compute() 让我们转换单个条目。 该方法接受要计算的密钥和双功能以指定值的转换。

  1. map.compute("foo", (key, value) -> value + value);
  2. System.out.println(map.get("foo")); // barbar

除了 compute() 还有两个变量: computeIfAbsent() computeIfPresent() 这些方法的功能参数只有在键不存在或分别存在的情况下才被调用。

最后,可以使用merge()方法merge()新值与映射中的现有值进行统一。合并接受一个密钥,要合并到现有条目中的新值和一个双功能来指定两个值的合并行为:

  1. map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
  2. System.out.println(map.get("foo")); // boo was foo

ConcurrentHashMap

以上所有这些方法都是ConcurrentMap一部分,从而可用于该接口的所有实现。 此外,最重要的实现ConcurrentHashMap已经通过几种新方法进一步增强,以在地图上执行并行操作。

就像并行流一样,这些方法使用Java 8中的ForkJoinPool.commonPool()可以使用一个特殊的ForkJoinPool。该池使用一个取决于可用内核数量的预设并行度。我的机器上有四个CPU内核可以实现三个并行处理:

System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3
可以通过设置以下 JVM 参数来减小或增加该值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
我们使用相同的示例图来进行演示,但是这次我们通过具体实现 ConcurrentHashMap 来代替 ConcurrentMap ,所以我们可以从这个类访问所有的公共方法:

  1. ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
  2. map.put("foo", "bar");
  3. map.put("han", "solo");
  4. map.put("r2", "d2");
  5. map.put("c3", "p0");
Java 8 引入了三种并行操作: forEach search reduce 这些操作中的每一个都有四种形式接受具有键,值,条目和键值对参数的函数。

所有这些方法都使用一个共同的第一个参数,称为parallelismThreshold该阈值表示并行执行操作时的最小收集大小。例如,如果通过阈值为500,并且地图的实际大小为499,则操作将在单个线程上顺序执行。在下面的例子中,我们使用一个阈值来总是强制执行并行执行来进行演示。

ForEach

方法forEach()能够并行迭代地图的键值对。 使用当前迭代步骤的键和值调用类型BiConsumerlambda表达式。为了可视化并行执行,我们将当前线程名称打印到控制台。 请记住,在我的情况下,底层的ForkJoinPool使用三个线程。

  1. map.forEach(1, (key, value) ->
  2. System.out.printf("key: %s; value: %s; thread: %s\n",
  3. key, value, Thread.currentThread().getName()));
  4. // key: r2; value: d2; thread: main
  5. // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
  6. // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
  7. // key: c3; value: p0; thread: main


Search

方法search()接受返回当前键值对的非空搜索结果的BiFunction,如果当前迭代不符合所需的搜索条件,则null 一旦返回非空结果,进一步处理被抑制。请记住,ConcurrentHashMap是无序的。搜索功能不应取决于地图的实际处理顺序。如果地图的多个条目与给定的搜索函数匹配,则结果可能是非确定性的。

  1. String result = map.search(1, (key, value) -> {
  2. System.out.println(Thread.currentThread().getName());
  3. if ("foo".equals(key)) {
  4. return value;
  5. }
  6. return null;
  7. });
  8. System.out.println("Result: " + result);
  9. // ForkJoinPool.commonPool-worker-2
  10. // main
  11. // ForkJoinPool.commonPool-worker-3
  12. // Result: bar

以下是另一个仅查看地图值的示例:

  1. String result = map.searchValues(1, value -> {
  2. System.out.println(Thread.currentThread().getName());
  3. if (value.length() > 3) {
  4. return value;
  5. }
  6. return null;
  7. });
  8. System.out.println("Result: " + result);
  9. // ForkJoinPool.commonPool-worker-2
  10. // main
  11. // main
  12. // ForkJoinPool.commonPool-worker-1
  13. // Result: solo

Reduce

Java 8 Streams中已知的方法reduce()接受两种类型为BiFunction lambda BiFunction 第一个函数将每个键值对转换为任何类型的单个值。第二个功能将所有这些变换的值组合成一个单独的结果,忽略任何可能的null值。

  1. String result = map.reduce(1,
  2. (key, value) -> {
  3. System.out.println("Transform: " + Thread.currentThread().getName());
  4. return key + "=" + value;
  5. },
  6. (s1, s2) -> {
  7. System.out.println("Reduce: " + Thread.currentThread().getName());
  8. return s1 + ", " + s2;
  9. });
  10. System.out.println("Result: " + result);
  11. // Transform: ForkJoinPool.commonPool-worker-2
  12. // Transform: main
  13. // Transform: ForkJoinPool.commonPool-worker-3
  14. // Reduce: ForkJoinPool.commonPool-worker-3
  15. // Transform: main
  16. // Reduce: main
  17. // Reduce: main
  18. // Result: r2=d2, c3=p0, han=solo, foo=bar


我希望你喜欢阅读我的有关Java 8并发的教程系列的第三部分。 本教程的代码示例与许多其他Java 8代码片段一起托管在GitHub 欢迎您分享回购并自行尝试。

我希望你喜欢这篇文章。如果您有任何其他问题,请在下面的评论中向我发送您的反馈。 你也应该跟随我在Twitter上更多的开发相关的东西!


 


相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载