博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java笔记(十七) 异步任务执行服务
阅读量:5327 次
发布时间:2019-06-14

本文共 15793 字,大约阅读时间需要 52 分钟。

异步任务执行服务

一、基本原理和概念

一)基本接口

1)Runnable和Callable:表示要执行的异步任务。

2)Executor和ExecutorService:表示执行服务。

3)Future:表示异步任务的结果。

Executor接口:

public interface Executor {    void execute(Runnable command);}

ExecutorService扩展了Executor:

public interface ExecutorService extends Executor {    
Future
submit(Callable
task);
Future
submit(Runnable task, T result); Future
submit(Runnable task);}

这三个submit都只是表示任务已经提交,不代表已经执行,通过Future可以查询可以

查询异步任务的状态、获取最终结果、取消任务等。

public interface Future
{ //用于取消任务,如果任务还没有开始,则不再运行,如果任务已经在执行,则不一定能 //取消,参数mayInterruptIfRunning表示,如果任务已经在执行,是否调用interrupt //方法中断线程,如果为false就不会,如果为true就会尝试线程中断,但中断也不一定取消 boolean cancel(boolean mayInterruptIfRunning); //返回cancel方法的返回值,任务不一定被终止 boolean isCancelled(); //不管什么方式,只要任务结束,都返回true boolean isDone(); //用于返回异步任务最终的结果,如果任务还未执行,会阻塞等待。 V get() throws InterruptedException, ExecutionException; //限定等待时间,如果超时任务还没有结束,抛出异常TimeoutException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

Future是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,是其中的纽带,

任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。

二)基本用法

public class BasicDemon {    static class Task implements Callable
{ @Override public Integer call() throws Exception { int sleepSeconds = new Random().nextInt(1000); Thread.sleep(sleepSeconds); return sleepSeconds; } } public static void main(String[] args) { //使用一个线程执行所有服务 ExecutorService executor = Executors.newSingleThreadExecutor(); Future
future = executor.submit(new Task()); //模拟执行其他任务 try { Thread.sleep(1000); System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //关闭执行任务服务 executor.shutdown(); }}
public interface ExecutorService extends Executor {    //表示不再接收新任务,但已经提交的任务会继续执行,即使任务还未开始    void shutdown();    //不接收新任务,终止已经提交但还尚未执行的任务,    // 对于已经执行的任务,用interrupt方法尝试中断。    //返回已经提交但尚未执行的任务列表    List
shutdownNow(); //shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有的任务都已结束 //不过isShutdown方法会返回true。 boolean isShutdown(); //所有任务都结束返回true boolean isTerminated(); //等待所有任务结束 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //等待所有任务完成,返回Future列表中,每个Future的isDone方法都返回true, //但这并不代表任务执行成功,也可能是被取消了。
List
> invokeAll(Collection
> tasks) throws InterruptedException; //指定等待时间,如果超时后有的任务没完成,就会被取消。
List
> invokeAll(Collection
> tasks, long timeout, TimeUnit unit) throws InterruptedException; //只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务被取消 //如果没有任务能在限时内成功返回,抛出TimeoutException,如果限时内所有的任务 //都完成了,但都发生了异常,抛出ExecutionException.
T invokeAny(Collection
> tasks)throws InterruptedException, ExecutionException;
T invokeAny(Collection
> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

三)基本实现原理

ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,

ExecutorService有一个抽象实现类AbstractExecutorService

1.AbstractExecutorService 

该类提供了submit,invokeAll,invokeAny的默认实现,子类需要实现其他方法。

除了execute,其他方法都与执行服务的生命周期管理有关。submit/invokeAll/invokeAny

最终都会调用execute,我们来简单实现它们:

public void execute(Runnable command) {    new Thread(command).start();}
public 
Future
submit(Callable
task) { if(task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task); execute(ftask); return ftask;}
protected 
RunnableFuture
newTaskFor(Callable
callable) { return new FutureTask
(callable);}

2.FutureTask 

FutureTask实现了RunnableFuture接口。它的成员变量:

private Callable
callable;

整数变量state表示状态:

private volatile int state;

取值为:

NEW = 0; //任务在运行COMPLETING = 1; //临时状态,任务即将结束,在设置结果NORMAL = 2; //任务正常执行完成 EXCEPTIONAL = 3 //任务执行抛出异常结束CANCELLED = 4; //任务被取消INTERRUPTING = 5; //任务在被中断INTERRUPTED = 6; //任务被中断

有一个变量表示最终的执行结果或异常:

private Object outcome;

有个变量表示运行任务的线程:

private volatile Thread runner;

有个单向链表表示等待任务的执行结果的线程:

private volatile WaitNode waiters;

构造方法:

public FutureTask(Runnable runnable, V result) {    //转化为Callable    this.callable = Executors.callable(runnable, result);    this.state = NEW; //ensure visibility of callable}

任务执行服务:

public void run() {    if(state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                    null, Thread.currentThread()))        return;    try {        Callable
c = callable; if(c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if(ran) set(result); } } finally { //runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; //state must be re-read after nulling runner to prevent //leaked interrupts int s = state; if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);}}

其中,set和setException除了设置结果,修改状态外,还会调用finshCompletion,它会

唤醒所有等待结果的线程。

对于任务提交者,它通过get方法获取结果,限时get方法的代码为:

public V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {    if(unit == null)        throw new NullPointerException();    int s = state;    if(s <= COMPLETING &&            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)        throw new TimeoutException();    return report(s);}
private V report(int s) throws ExecutionException {    Object x = outcome;    if(s == NORMAL)        return (V)x;    if(s >= CANCELLED)        throw new CancellationException();    throw new ExecutionException((Throwable)x);}
public boolean cancel(boolean mayInterruptIfRunning) {    if(state != NEW)        return false;    if(mayInterruptIfRunning) {        if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))            return false;        Thread t = runner;        if(t != null)            t.interrupt();        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state    }    else if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))        return false;     //唤醒所有等待结果的线程    finishCompletion();    return true;}

二、线程池

线程池是并发程序中一个非常重要的概念和技术。线程池主要由两个概念组成:

一是任务队列,另一个是工作者线程。工作者线程主体就是一个循环,循环从队列

中接受任务并执行,任务队列保存待执行的任务。线程池的优点:

1)可以重用线程,避免线程创建的开销;

2)任务过多时,通过排队避免创建过多线程,减少系统资源和竞争,确保任务有序完成。

Java并发包中线程池的实现类是ThreadPoolExecutor,它继承自AbstracExecutorService,

实现了ExecutorService.

一)理解线程池

主要构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler)

1.线程池大小

corePoolSize:核心线程个数

maximumPoolSize:最大线程个数

keepAliveTime和unit:表示当线程池中线程个数大于corePoolSize时额外空闲线程的存活时间。

如果该值为0,表示所有线程都不会超时终止。

一般情况下,有新任务到来的时候,如果当前线程个数小于corePoolSize,就会创建一个新

线程来执行该任务,需要说明的是即使其他线程是空闲着的,也会创建新线程。不过,如果

线程个数大等于corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是

它是尝试排队,而不是阻塞等待入队,如果队列满了或者因为其他原因不能立即入队,它就不

会排队,而是检查线程个数是否达到了maximumPoolSize,如果没有,就会继续创建线程,直到

线程数达到maximumPoolSize。

查看关于线程和任务数的一些动态数字:

//返回当前线程个数public int getPoolSize()//返回线程池曾经达到过的最大线程数public int getLargestPoolSize()//返回线程池创建以来所有已完成的任务数public long getCompletedTaskCount()//返回所有任务数,包括已完成和在排队的public long getTaskCount()

关于任务队列,需要强调的是,如果用的是无界队列,线程个数最多只能达到corePoolSize,

新的任务总会排队,参数maximumPoolSize也就没有意义。

2.任务拒绝策略 

如果任务队列有界,且maximumPoolSize有限,则当队列排满,线程个数

也达到maximumPoolSize,这时,新任务来了就会触发线程池任务拒绝策略。

此时,默认情况下,默认情况下提交任务的方法(executoe/submit/invokeAll等)

会抛出RejectExecutionException。不过该策略可以自定义,ThreadPoolExecutor

实现了4种处理方式:

1)ThreadPoolExecutor.AbortPolicy:默认处理方式,抛异常;

2)ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛异常也不执行;

3)ThreadPoolExecutor.DiscarOldestPolicy:将等待时间最长的任务扔掉,然后自己排队;

4)ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。

他们都实现了RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {    //当线程池不能接受任务时,调用该方法    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

默认的RejectedExecutionHandler:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

3.线程工厂  

ThreadFactory是一个接口:

public interface ThreadFactory {    Thread newThread(Runnable r);}

这个接口根据Runnable创建一个Thread. ThreadPoolExecutor中线程的默认实现就是Execotors类中的静态内部类

DefaultThreadFactory,主要就是创建一个线程,给线程设置一个名称,设置daemon属性为false,设置线程的优先级

为标准默认优先级,线程的名称为:pool-<线程池编号>-thread-<线程编号>。可以自定义,实现该接口。

4.关于核心线程的特殊配置 

当线程池中线程个数小等于corePoolSize时,线程池中的线程是核心线程,默认情况下:

核心线程不会预先创建,只有当有任务时才创建,核心线程不会因为空闲而终止。

ThreadPoolExecutor有如下方法,可以改变这些默认行为:

//预先创建所有核心线程public int prestartAllCoreThreads()//创建一个核心线程,如果所有核心线程都已经创建,则返回falsepublic boolean prestartCoreThread()//如果参数为true,则keepAliveTime参数也适用于核心线程public void allowCoreThreadTimeOut(boolean value)

二)工厂类Executors

该类提供了创建线程池的方法:

public static ExcutorService newSingleThreadExecutor() {    return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,            new LinkedBlockingQueue
());}

注意使用的是无界队列

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads, 0L,            TimeUnit.MILLISECONDS, new LinkedBlockingQueue
());}

创建固定线程个数的线程池,使用无界队列,线程创建后不会超时终止,

由于是无界队列,如果排队任务过多,可能会消耗过多内存。

public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,            TimeUnit.SECONDS, new SynchronousQueue
());}

创建一个线程池,当新线程到来时,如果有空闲线程在等待任务,则其中一个空闲线程接受该任务,

否则就创建一个新线程,线程创建的总个数几乎不受限制,对于任意一个空闲线程,如果60秒内没有新任务,就终止。

思考,应该怎么选择线程池?

三)线程池死锁

自己思考。

三、定时任务的那些陷阱

在Java中主要有两种方式实现定时任务:

1)使用java.util包中的Timer和TimeTask

2)使用java并发包中的ScheduledExecutorService

一)Timer和TimeTask

1.基本用法 

TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable,具体的定时任务需要继承

该类,实现run方法。Timer是一个具体类,它负责定时任务的调度和执行:

//在指定的绝对时间运行taskpublic void schedule(TimerTask task, Date time)//在当前时间延迟delay毫秒后执行public void schedule(TimerTask task, long delay)//固定延时重复执行,第一次计划执行时间为firstTime,//后一次的计划执行时间为前一次的“实际”加上period,如果由于某种原因该次任务延时了, //则本次任务也会延时,即延时时间period始终不变。public void schedule(TimerTask task, Date firstTime, long period)//同样是固定延时重复执行,第一次执行时间为当前时间加上delaypublic void schedule(TimerTask task, long delay, long period)//固定频率重复执行,第一次计划执行时间为firstTime//后一次的计划执行时间为前一次的计划时间加上periodpublic void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)//public void scheduleAtFixedRate(TimerTask task, long delay, long period)

 注意固定延时和固定频率的区别。另外需要注意,如果第一个计划执行的时间firstTime是一个过去时,则任务会

立即执行,对于固定延时的任务,下次任务会基于第一次执行时间计算,而对于固定频率的任务,则会从firstTime

开始计算,有可能加上period还是一个过去时间,从而连续运行很多次,直到时间超过当前时间。

例子:

public class TimerFixedDelay {    static String getNowTime() {        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");        return format.format(new Date());    }    static class LongRunningTask extends TimerTask {        public void run() {            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            Date date = new Date();            System.out.println("Long running task finished! And finish time is " + getNowTime() );        }    }    static class DelayTask extends TimerTask {        public void run() {            System.out.println("Now the time is " + getNowTime());        }    }    public static void main(String[] args) {        Timer timer = new Timer();        timer.schedule(new LongRunningTask(), 10);        timer.schedule(new DelayTask(), 100, 1000);        /*Long running task finished! And finish time is 2018-12-24 04:50:29        Now the time is 2018-12-24 04:50:29        Now the time is 2018-12-24 04:50:30        Now the time is 2018-12-24 04:50:31        Now the time is 2018-12-24 04:50:32        Now the time is 2018-12-24 04:50:33        Now the time is 2018-12-24 04:50:35*/        //        Timer timer1 = new Timer();//        timer1.schedule(new LongRunningTask(), 10);//        timer1.scheduleAtFixedRate(new DelayTask(), 100, 1000);        /*Long running task finished! And finish time is 2018-12-24 04:48:48        Now the time is 2018-12-24 04:48:48          Now the time is 2018-12-24 04:48:48  //补足了之前运行的代码        Now the time is 2018-12-24 04:48:48        Now the time is 2018-12-24 04:48:48        Now the time is 2018-12-24 04:48:48        Now the time is 2018-12-24 04:48:48        Now the time is 2018-12-24 04:48:49        Now the time is 2018-12-24 04:48:50        Now the time is 2018-12-24 04:48:51        Now the time is 2018-12-24 04:48:52        Now the time is 2018-12-24 04:48:53        Now the time is 2018-12-24 04:48:54        Now the time is 2018-12-24 04:48:55        Now the time is 2018-12-24 04:48:56*/     }} 

2.基本原理 

Timer内部主要由任务队列和Timer线程两部分组成。任务队列是一个基于

堆实现的优先级队列,按照下次执行时间排优先级。Timer线程负责执行

所有的定时任务,注意,一个Timer对象只有一个Timer线程,所以对于上面的

例子,任务会被延迟。

Timer线程的主体是一个循环,从队列中获取任务,如果队列中有任务

且计划执行时间小等于当前时间,就执行它,如果队列中没有任务或者

第一个任务延时还没有到,就睡眠。如果睡眠过程中队列上添加新任务

是第一个任务,Timer线程就会被唤醒,重新进行检查。

 在执行任务之前,Timer线程判断任务是否为周期任务,如果是就设置

下次执行时间并添加到优先级队列中,对于固定延时任务,下次执行时间

为当前时间加上period,对于固定频率任务,下次执行时间为上次计划时间加上period。

3.死循环

 定时任务不能耗时太长,更不能是无限循环。

public class EndlessTimer {    static class LoopTask extends TimerTask {        public void run() {            while (true) {                try {                    Thread.sleep(1000);                } catch (Exception e) {                    e.printStackTrace();                }            }        }    }    static class SimpleTask extends TimerTask {        public void run() {            System.out.println("Never happen!"); //永远不会被执行        }    }    public static void main(String[] args) {        Timer timer = new Timer();        timer.schedule(new LoopTask(), 100);        timer.schedule(new SimpleTask(), 100);    }}

4.异常处理

在执行任何一个任务的run方法时,如果run方法抛出异常,Timer线程就会退出,

从而所有的定时任务都会被取消。所以,如果希望各个定时任务互不干扰,一定要在run方法内捕获异常。

二)ScheduledExecutorService

1.基本用法

ScheduledExecutorService是一个接口,其用法为:

public interface ScheduledExecutorService extends ExecutorService {    //单次执行,在指定时间delay后运行command    ScheduledFuture
schedule(Runnable command, long delay, TimeUnit unit) //单次执行,在指定时间delay后运行callable
ScheduledFuture
schedule(Callable
callable, long delay,TimeUnit unit); //固定频率重复执行 ScheduledFuture
scheduleAtFixedRate(Runnable command,long initialDelay, long delay, TimeUnit unit) //固定延时重复执行 ScheduledFuture
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);}

ScheduledExecutorServiced的主要实现类是SchedeuledThreadPoolExecutor,它是线程池

ThreadPoolExecutor的子类,其主要构造方法为:

public ScheduledThreadPoolExecutor(int corePoolSize)

它的任务队列是一个无界优先级队列。工厂类Executors也提供了一些方法,以创建SchedeuledThreadPoolExecutor:

//单线程定时任务public static ScheduledExecutorService newSingleThreadScheduledExecutor()public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)//多线程定时任务public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

与Timer中的任务类似,应该捕获所有异常。

转载于:https://www.cnblogs.com/Shadowplay/p/10109251.html

你可能感兴趣的文章
文件操作其他方法
查看>>
jQuery插件之ajaxFileUpload
查看>>
MMO-SNS类游戏服务器间数据交互策略分享
查看>>
记录:protobuf在网游中的用法
查看>>
【C++】指针的引用及面向对象
查看>>
《设计模式之禅》--摘要
查看>>
poj2976(二分搜索,最大化平均值)
查看>>
24种设计模式
查看>>
[bzoj3532][Sdoi2014]Lis
查看>>
HDU 2546 饭卡 动态规划01背包
查看>>
devexpress TreeList递归及点击获取数据
查看>>
记一次渗透测试某路由器
查看>>
使用gatling做压力测试与负载测试
查看>>
[转载]Linux shell中的竖线(|)——管道符号
查看>>
python 生成器和迭代器
查看>>
集合与数组,集合与集合之间的转换
查看>>
AJAX 数据库实例
查看>>
python-PIL-16bit-灰度图像生成-tiff
查看>>
完全的精确覆盖模板
查看>>
hdu-1312
查看>>