`
623deyingxiong
  • 浏览: 188363 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

高级并发对象(Councurrency Tutorial 7)

阅读更多
高级并发对象

到目前为止,上几个章节中,我们重点讲了Java框架最原始的低级API。这些API对于完成基本任务来说已经足够了,但是对于更高级的工作我们需要更高级的API。对于那些需要充分利用现代多处理器和多核系统的大规模高并发应用尤其如此。

本节中,我们将看到一些Java5.0新增的高级并发特性。这些特性大部分实现在java.tuil.concurrenct 包中。在Java集合框架中也有一些新的并发数据结构。

锁对象(Lock objects) 提供了可以简化许多并发应用的锁的惯用法(locking idioms)。
  • Executors 为加载和管理线程定义了高级API。Executors提供了适合大型应用的线程池管理。
  • 并发集合(Concurrent collections)简化了对大型数据集合的管理,可以大大降低同步的需要。
  • 原子对象(Atomic variables) 可以帮助缩小同步粒度和避免内存不一致问题(memory consistency errors)。
  • ThreadLocalRandom(JDK 7) 提供高效的多线程生成伪随机数的方法。

锁对象(Lock Objects)

同步方法与同步代码块依赖一个简单的可重入锁,这种锁很容易使用,但是有诸多限制。Java.util.concurrent.locks包中提供了更多复杂的常用锁对象。我们不会详细的描述这个包,但是我们会关注它的最基础的接口Lock。

Lock对象的工作机制类似于同步代码块使用的隐含锁。与隐含锁一样,在某一时刻只允许一个线程拥有锁对象。通过它们关联的Condition对象,锁对象也支持wait/notify机制。

与隐含锁相比,Lock对象最大的优势是它可以撤消获取一个锁对象的尝试。tryLock方法若没办法立即或在预设的超时时间范围内无法获得锁对象,它将会撤消获得锁的尝试(即,撤出锁对象的等待队列)。lockInterruptibly方法在收到另一线程发送的中断请求后会取消获取锁对象的尝试。

让我们使用Lock对象来解决下我们在活性(Liveness)一节中看到的死锁问题。Alphonse和Gaston会注意到对方鞠躬的动作。我们通过强制双方在鞠躬之前必须获取双方的锁对象来改善模型。下面是改善模型后的代码。为了演示普遍原理,我们假设Alphonse和Gaston非常痴迷于新获得的能够安全鞠躬的能力,以至于他们不能停止向对方鞠躬:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;

public class Safelock {
    static class Friend {
        private final String name;
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                if (! (myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            return myLock && yourLock;
        }
            
        public void bow(Friend bower) {
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has"
                        + " bowed to me!%n", 
                        this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                System.out.format("%s: %s started"
                    + " to bow to me, but saw that"
                    + " I was already bowing to"
                    + " him.%n",
                    this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" +
                " bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }
    
        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {}
                bowee.bow(bower);
            }
        }
    }
            

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}


Executors

目前为止,在在所有示例中,我们通过定义Runnable对象描述被新线程执行的任务,通过定义Thread对象描述线程本身。两者紧紧联系在一起。这在小型应用中能够很好的工作,但是在大型应用中,把线程的管理和创建与线程的使用分割开来是非常重要的。封装了线程创建和管理的对象被叫做executors.下面的章节将详细描述executors。
  • Executor接口定义了三种executor对象类型
  • 线程池(Thread Pools) 是executor最普遍的实现类型
  • Fork/Join (JDK 7)是一个多进程框架

Executor接口

Java.util.concurrent包定义了三种executor接口:
  • Executor,支持加载新任务的简单接口
  • ExecutorService,Executor的子接口,增加了帮助管理任务和executor本身的生命周期的功能。
  • ScheduledExecutorService,ExecutorService的子接口,支持周期性地执行任务。

要指出的是,Executor对象都声明为这三种接口的引用而不是某个Executor实现类的引用。(译注:Executor的实例,通过工厂(Executors)模式创建,没有具体的实现类)

Executor

Executor接口只定义了一个方法execute,被设计用来代替一般的创建线程惯例,如果r是一个Runnable对象,e是一个Executor对象你可以将:
	(new Thread(r)).start();

替换为:
e.execute(r);


然而,对于execute方法的实现并没有特殊要求。低级的实现只是创建一个新的线程并立即执行。基于Executor的实现,execute可能就是这样做的,但更有可能是使用一个已经存在的工作线程(worker Threads)去执行r,或者将r放在一个执行队列中等待工作线程有空的时候再执行。(我们会在线程池一节中描述工作线程)。

Java.util.concurrent包中的executor实现被设计成能够充分利用更高级的ExecutorService和ScheduledExecutorService接口,它们也和Executor接口一起工作。

ExecutorService

ExecutorService接口提供了另一个相似的submit方法,但比execute更加通用。和execute一样,submit接受Runnable对象,但也接受Callable对象,Callable允许任务执行后返回一个值。Submit方法返回Future对象,Future对象被用来接收Callable返回的值,并管理Callable和Runnable对象所代表的任务。(译注:此处提供一个能够描述submit 及Future 的优势http://623deyingxiong.iteye.com/admin/blogs/1753975)

ExecutorService也提供处理Callable对象(译注:Callable对象类似于Runnable,但其”call”方法可以有返回值)的方法。最后,ExecutorService提供大量的方法管理executor的关闭。为了支持立即停止(immediate shutdown),任务应该正确地处理中断请求(译注:即在线程接收到中断通知后立即终止执行)。

ScheduledExecutorService

ScheduledExecutorService接口为它的父类ExecutorService的行为提供计划,允许在执行Runnable和Callable任务之前停顿一段时间。接口定义了scheduleAtFixedRate和scheduleWithFixedDelay,这两个方法以特定的时间间隔重复地执行特定任务。

线程池

大部分java.util.concurrent中的executor实现类使用了工作线程的(worker threads)线程池。工作线程不属于特定某个Runnable和Callable任务,经常用来执行多个任务。

使用工作线程可以减少创建线程的资源浪费。Thread对象占用了大量的内存,在一个大型应用中,分配和释放线程对象会造成大量的线程管理开支(overhead)。

一种常用的线程池是固定大小线程池(the fixed thread pool)。这种线程池有特定数量的线程在运行;如果一个线程在使用过程中意外停止(译注:如抛出未捕获的异常),它会自动被另一个新线程替代。任务通过一个内部的任务队列提交给线程池执行,此任务队列可以在活动任务数量大于线程池中工作线程个数时,存储多余的活动任务。

“固定大小线程池”的一个好处是”优雅的缓冲”。考虑一个web应用服务,它的每一个线程只处理一个HTTP请求。如果这个应用简单地为每一个新来的请求创建一个新的处理线程,那么,当请求数量足够多时,线程占用的资源总和将超过系统的承受能力,服务器会因此忽然停止对所有请求的应答(译注:常见的内存溢出)。使用固定大小线程池后,即使请求数量超出工作线程能够处理的请求上限,但是新来的HTTP请求会被暂时存放在消息队列中,当出现空闲的工作线程后,这些HTTP请求就会得到及时的处理。

创建一个使用固定线程池的executor的方法是调用 java.util.concurrent.Executors工厂类(译注:它是所有ExecutorService的工厂)的newFixedThreadPool方法 。此类也提供了下面的工厂方法:
  • newCachedThreadPool方法创建了一个使用可扩展线程池的executor。拥有这种线程池的executor适合执行生命周期较短的任务。
  • newSingleThreadExecutor方法创建了一个只有一个工作线程的executor。
  • 还有一些工厂方法提供以上executor的ScheduledExecutorService版本。

如果以上工厂方法都无法满足你的需要, java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor 将为你提供更多选择。

Fork/Join

(译者声明:本节没有做过实践,可能有些理解会有问题,个人觉得有点MapReduce的意思)

Java SE 7中的新特性,fork/join框架帮助你创建多进程应用。它被设计用来完成可以分成很多小进程的工作。目的是使用所有可用的进程来提升你的应用的性能。

像任何ExecutorService一样,fork/join框架把任务分发给线程池中工作线程。不同的是,因为fork/join框架使用work-stealing 算法。完成工作的工作线程可以从其他还在忙碌的线程那里偷任务来执行。

fork/join框架的核心是ForkJoinPool类,一个AbstractExecutorService的扩展类。ForkJoinPool实现了核心的work-stealing算法,能够执行ForkJoinTask任务。

基本应用

使用fork/join框架很简单。第一步是写一些代码来执行工作的一部分。你的代码应该看起来像这样:
if (此部分工作足够小)
  直接干活。
else
  把工作分成两部分,
  调用完成这两部分的代码,并等待结果返回。

把以上代码封装成ForkJoinTask子类,特别地作为更具体的类型RecursiveTask(可以返回结果)或 RecursiveAction。

Blurring for Clarity

为了帮助你理解fork/join 框架是如何工作的,考虑一个简单的例子。假设你想对一张图片模糊处理。原图片用一个整数数组表示,每个整数包含了对描述像素的颜色值。模糊化的目标图片也是由相同大小的整数数组表示。

模糊化通过不断重复一次修改一个像素值来完成。每个像素值被修改为周围像素的平均值(红,绿,蓝都被平均),结果被放在目标数组中。下面是可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size, should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Re-assemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
…

现在你实现抽象方法compute(),可以直接做模糊化处理,也可以将它分成小的任务。一个简单的方法是通过数组的长度来判断工作是要直接进行或者分割。
protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

//如果当前的方法在RecursiveAction类的子类中,就可以直接将它放到ForkJoinPool中执行。

//创建一个能代表所有要做的工作的任务。

// 原图片的像素值保存在src中。
// 目标图片的像素值保存在dst中。
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

//创建用来执行任务的ForkJoinPool。
ForkJoinPool pool = new ForkJoinPool();

//执行任务。
pool.invoke(fb);

完整的代码中,还要包含其他的如显示原图片和目标图片的代码,详情请看ForkBlur类。

并发集合(Concurrent Collections)

Java.util.concurrent 包里有很多对Java集合框架(Java Collections Framework)的扩展。下面这些是下面这些是最容易分类的集合接口:
  • BlockingQueue 定义了一个先进先出的数据结构,当你试图将数据添加到一个已满的队列,或者从空队列中取数据时,将会阻塞或者超时退出。
  • ConcurrentMap 是java.util.Map的子接口,定义了很多有用的自动的操作。只有当键值存在时,才能替换或删除一个键值对,或者只有当键值不存在(absent)时才能添加一个键值对。使这些操作自动化帮助避免同步(synchronization)。对ConcurrentMap标准的普适实现是ConcurrentHashMap,它是HashMap的并发衍生类。
  • ConcurrentNavigableMap 是一个ConcurrentMap的子接口,它支持模糊匹配(approximate matches)。标准的对ConcurrentnavigableMap的普适实现是ConcurrentSkipListMap,它是TreeMap并发衍生类。

所有的这些集合通过建立对集合中元素操作(增删改查)的先后关系(happens-before relationship),来帮助我们避免内存一致性错误。

原子变量(Atomic Variables)

Java.util.concurrent.atomic 包中定义了对单个变量的原子操作支持。包中的所有的类的getter和setter,都像对volatile变量操作一样,具有原子性。那意味着,一个set操作与后续的get操作存在绝对的先后关系。

为了看到原子变量是如何使用的,我们回到我们原来用来描述线程干扰的Counter类:
class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }

}


让Counter安全运算不发生线程干扰的方法是像在SynchronizedCounter同步它的方法:

class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }

}

对于这个简单地类,synchronization是一个可以接受的解决方案。但是对于更复杂的类,我们就要避免不必要的synchronization对线程活性的影响。将int型字段替换为AtomicInteger让我们可以不需要借助synchronization就可以避免线程干扰,就像在AtomicCounter中这样:
import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }

}


并发随机数

在JDK7中,java.util.concurrent 包含了一个方便应用程序使用的类,ThreadLocalRandom,可以将它用来在多线程或ForJoinTask中获得随机数。

在并发环境中,使用ThreadLocalRandom替换Math.random()可以减少冲突,更可以提高性能。

你所做的只是调用ThreadLocalRandom.curretn(),然后调用它的方法中的一个来获得随机数。这里是一个例子:
	int r = ThreadLocalRandom.current() .nextInt(4, 77);

扩展阅读

  • Concurrent Programming in Java: Design Principles and Pattern (2nd Edition) by Doug Lea. A comprehensive work by a leading expert, who's also the architect of the Java platform's concurrency framework.
  • Java Concurrency in Practice by Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea. A practical guide designed to be accessible to the novice.
  • Effective Java Programming Language Guide by Joshua Bloch. Though this is a general programming guide, its chapter on threads contains essential "best practices" for concurrent programming.
  • Concurrency: State Models & Java Programs (2nd Edition), by Jeff Magee and Jeff Kramer. An introduction to concurrent programming through a combination of modeling and practical examples.
  • Java Concurrent Animated: Animations that show usage of concurrency features.


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics