>

低29位表示当前线程的运行任务数量澳门博发娱乐

- 编辑:澳门博发娱乐官网 -

低29位表示当前线程的运行任务数量澳门博发娱乐

java使用默认线程池踩过的坑(1)

场景

一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会持续特别长的时间,甚至有一直阻塞的可能。我们需要一个manager来管理这些task,当这个task的上一次执行时间距离现在超过5个调度周期的时候,就直接停掉这个线程,然后再重启它,保证两个目标目录下没有待处理的txt文件堆积。

问题

直接使用java默认的线程池调度task1和task2.由于外部txt的种种不可控原因,导致task2线程阻塞。现象就是task1和线程池调度器都正常运行着,但是task2迟迟没有动作。

当然,找到具体的阻塞原因并进行针对性解决是很重要的。但是,这种措施很可能并不能完全、彻底、全面的处理好所有未知情况。我们需要保证任务线程或者调度器的健壮性!

方案计划

线程池调度器并没有原生的针对被调度线程的业务运行状态进行监控处理的API。因为task2是阻塞在我们的业务逻辑里的,所以最好的方式是写一个TaskManager,所有的任务线程在执行任务前全部到这个TaskManager这里来注册自己。这个TaskManager就负责对于每个自己管辖范围内的task进行实时全程监控!

后面的重点就是如何处理超过5个执行周期的task了。

方案如下:

●一旦发现这个task线程,立即中止它,然后再次重启;

●一旦发现这个task线程,直接将整个pool清空并停止,重新放入这两个task ——task明确的情况下】;

方案实施

中止后重启

●Task实现类

class FileTask extends Thread { private long lastExecTime = 0; protected long interval = 10000; public long getLastExecTime() {     return lastExecTime; } public void setLastExecTime(long lastExecTime) {     this.lastExecTime = lastExecTime; } public long getInterval() {     return interval; } public void setInterval(long interval) {     this.interval = interval; }  public File[] getFiles() {     return null; } 

●Override

public void run() { while (!Thread.currentThread().isInterrupted()) { lastExecTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + " is running -> " + new Date()); try { Thread.sleep(getInterval() * 6 * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace();    // 当线程池shutdown之后,这里就会抛出exception了             }         }     }     } 

●TaskManager

public class TaskManager  implements Runnable { private final static Log logger = LogFactory.getLog(TaskManager .class); public Set<FileTask> runners = new CopyOnWriteArraySet<FileTask>(); ExecutorService pool = Executors.newCachedThreadPool(); public void registerCodeRunnable(FileTask process) { runners.add(process); } public TaskManager (Set<FileTask> runners) { this.runners = runners; } 

@Override

public void run() {        while (!Thread.currentThread().isInterrupted()) {            try {                long current = System.currentTimeMillis();                for (FileTask wrapper : runners) {                    if (current - wrapper.getLastExecTime() > wrapper.getInterval() * 5) {                        wrapper.interrupt();                        for (File file : wrapper.getFiles()) {                            file.delete();                        }                     wrapper.start();                      }                }            } catch (Exception e1) {                logger.error("Error happens when we trying to interrupt and restart a task ");                ExceptionCollector.registerException(e1);            }            try {                Thread.sleep(500);            } catch (InterruptedException e) {            }        }    }     

这段代码会报错 java.lang.Thread IllegalThreadStateException。为什么呢?其实这是一个很基础的问题,您应该不会像我一样马虎。查看Thread.start()的注释, 有这样一段:

It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.

是的,一个线程不能够启动两次。那么它是怎么判断的呢?

public synchronized void start() {         /**          * A zero status value corresponds to state "NEW".    0对应的是state NEW          */ 

if (threadStatus != 0) //如果不是NEW state,就直接抛出异常!


澳门博发娱乐官网 1


) 场景 一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会...

java中通用的线程池实例代码,java线程池实例

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Vector;

/**
 * 任务分发器
 */
public class TaskManage extends Thread
{
    protected Vector<Runnable> tasks = new Vector<Runnable>();
    protected boolean running = false;
    protected boolean stopped = false;
    protected boolean paused = false;
    protected boolean killed = false;
    private ThreadPool pool;

    public TaskManage(ThreadPool pool)
    {
        this.pool = pool;
    }

    public void putTask(Runnable task)
    {
        tasks.add(task);
    }

    public void putTasks(Runnable[] tasks)
    {
        for (int i = 0; i < tasks.length; i++)
            this.tasks.add(tasks[i]);
    }

    public void putTasks(Collection<Runnable> tasks)
    {
        this.tasks.addAll(tasks);
    }

    protected Runnable popTask()
    {
        if (tasks.size() > 0) return (Runnable) tasks.remove(0);
        else return null;
    }

    public boolean isRunning()
    {
        return running;
    }

    public void stopTasks()
    {
        stopped = true;
    }

    public void stopTasksSync()
    {
        stopTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void pauseTasks()
    {
        paused = true;
    }

    public void pauseTasksSync()
    {
        pauseTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void kill()
    {
        if (!running) interrupt();
        else killed = true;
    }

    public void killSync()
    {
        kill();
        while (isAlive())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public synchronized void startTasks()
    {
        running = true;
        this.notify();
    }

    public synchronized void run()
    {
        try
        {
            while (true)
            {
                if (!running || tasks.size() == 0)
                {
                    pool.notifyForIdleThread();
                    this.wait();
                }
                else
                {
                    Runnable task;
                    while ((task = popTask()) != null)
                    {
                        task.run();
                        if (stopped)
                        {
                            stopped = false;
                            if (tasks.size() > 0)
                            {
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
                                break;
                            }
                        }
                        if (paused)
                        {
                            paused = false;
                            if (tasks.size() > 0)
                            {
                                System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
                                break;
                            }
                        }
                    }
                    running = false;
                }

                if (killed)
                {
                    killed = false;
                    break;
                }
            }
        }
        catch (InterruptedException e)
        {
            TaskException.getResultMessage(e);
            return;
        }
    }
}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/**
 * 线程池
 */
public class ThreadPool
{
    protected int maxPoolSize = TaskConfig.maxPoolSize;
    protected int initPoolSize = TaskConfig.initPoolSize;
    protected Vector<TaskManage> threads = new Vector<TaskManage>();
    protected boolean initialized = false;
    protected boolean hasIdleThread = false;

    public ThreadPool()
    {
        super();
    }

    public ThreadPool(int maxPoolSize, int initPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        this.initPoolSize = initPoolSize;
    }

    public void init()
    {
        initialized = true;
        for (int i = 0; i < initPoolSize; i++)
        {
            TaskManage thread = new TaskManage(this);
            thread.start();
            threads.add(thread);
        }
    }

    public void setMaxPoolSize(int maxPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        if (maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize);
    }

    /**
     * 重设当前线程数 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事
     * 务处理完成 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     */
    public void setPoolSize(int size)
    {
        if (!initialized)
        {
            initPoolSize = size;
            return;
        }
        else if (size > getPoolSize())
        {
            for (int i = getPoolSize(); i < size && i < maxPoolSize; i++)
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
            }
        }
        else if (size < getPoolSize())
        {
            while (getPoolSize() > size)
            {
                TaskManage th = (TaskManage) threads.remove(0);
                th.kill();
            }
        }
    }

    public int getPoolSize()
    {
        return threads.size();
    }

    protected void notifyForIdleThread()
    {
        hasIdleThread = true;
    }

    protected boolean waitForIdleThread()
    {
        hasIdleThread = false;
        while (!hasIdleThread && getPoolSize() >= maxPoolSize)
        {
            try
            {
                Thread.sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
                return false;
            }
        }

        return true;
    }

    public synchronized TaskManage getIdleThread()
    {
        while (true)
        {
            for (Iterator<TaskManage> itr = threads.iterator(); itr.hasNext();)
            {
                TaskManage th = (TaskManage) itr.next();
                if (!th.isRunning()) return th;
            }

            if (getPoolSize() < maxPoolSize)
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
                return thread;
            }

            if (waitForIdleThread() == false) return null;
        }
    }

    public void processTask(Runnable task)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTask(task);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Runnable[] tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Collection<Runnable> tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

public class TopTask implements Runnable
{

    private ThreadPool pool;

    public TopTask()
    {
        super();
    }

    public TopTask(ThreadPool pool)
    {
        super();
        this.pool = pool;
    }

    @Override
    public void run()
    {
        init();
        start();
    }

    /**
     * 初始化验证权限、参数之类
     */
    public void init()
    {

    }

    /**
     * 开始自动任务
     */
    public void start()
    {
        for (int i = 0; i < 10; i++)
        {
            pool.processTask(new BeginAuto());
        }
    }
}
/**
 * 实现类
 */
class BeginAuto implements Runnable
{
    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getId() + "..................");
    }

}

复制代码 代码如下: package com.smart.frame.task.autoTask; import java.util.Collection; import java.util.Vector;...

  1. //代码清单StartCycleRunTask:容器监听器  
  2. package com.baobaotao.web;  
  3. import java.util.Date;  
  4. import java.util.Timer;  
  5. import java.util.TimerTask;  
  6. import javax.servlet.ServletContextEvent;  
  7. import javax.servlet.ServletContextListener;  
  8. public class StartCycleRunTask implements ServletContextListener ...{  
  9.     private Timer timer;  
  10.     public void contextDestroyed(ServletContextEvent arg0) ...{  
  11.         // ②该方法在Web容器关闭时执行  
  12.         System.out.println("Web应用程序启动关闭...");  
  13.     }  
  14.     public void contextInitialized(ServletContextEvent arg0) ...{  
  15.          //②在Web容器启动时自动执行该方法  
  16.         System.out.println("Web应用程序启动...");  
  17.         timer = new Timer();//②-1:创建一个Timer,Timer内部自动创建一个背景线程  
  18.         TimerTask task = new SimpleTimerTask();  
  19.         timer.schedule(task, 1000L, 5000L); //②-2:注册一个5秒钟运行一次的任务  
  20.     }  
  21. }  
  22. class SimpleTimerTask extends TimerTask ...{//③任务  
  23.     private int count;  
  24.     public void run() ...{  
  25.         System.out.println((++count)+"execute task..."+(new Date()));  
  26.     }  
  27. }  

引用

全面理解Java内存模型
Java多线程系列目录(共43篇)
Android开发——Android中常见的4种线程池(保证你能看懂并理解)
Handler、Thread和Runnable简单分析

关键代码就是那个while循环。如果task不为空执行task否则从getTask()中取任务。在执行完任务后会在finally 块中设置task = null;
顺便介绍一下,在这里,我们可以看到beforeExecute(Thread t, Runnable r)方法和afterExecute(Runnable r, Throwable t)会在任务的执行前后执行,我们可以通过继承线程池的方式来重写这两个方法,这样就能够对任务的执行进行监控啦。
咋一看 好像没什么问题。其实我们可以发现如果执行完一个任务 task 设置为 null。就要调用 getTask()方法 。 点进去查看一下。

容器收到一个Servlet请求,调度线程从线程池中选出一个工作者线程,将请求传递给该工作者线程,然后由该线程来执行Servlet的 service方法。当这个线程正在执行的时候,容器收到另外一个请求,调度线程同样从线程池中选出另一个工作者线程来服务新的请求,容器并不关心这个请求是否访问的是同一个Servlet.当容器同时收到对同一个Servlet的多个请求的时候,那么这个Servlet的service()方法将在多线程中并发执行。
Servlet容器默认采用单实例多线程的方式来处理请求,这样减少产生Servlet实例的开销,提升了对请求的响应时间,对于Tomcat可以在server.xml中通过<Connector>元素设置线程池中线程的数目。
如图: 

线程操作

同步
同步方式:synchronized和lock
同步相关方法:wait()/notify()/notifyAll() sleep()/join()/yield() await()/signal()/signalAll

            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                        if (r != null)
                    return r;

Spring为JDK Timer和Quartz Scheduler所提供的TimerFactoryBean和SchedulerFactoryBean能够和Spring容器的生命周期关联,在 Spring容器启动时启动调度器,而在Spring容器关闭时,停止调度器。所以在Spring中通过这两个FactoryBean配置调度器,再从 Spring IoC中获取调度器引用进行任务调度将不会出现这种Web容器关闭而任务依然运行的问题。而如果你在程序中直接使用Timer或Scheduler,如不 进行额外的处理,将会出现这一问题。 

Handler、Thread和Runnable

Runnable是接口,可避免单继承局限,使用更灵活,且一个实例可以给多个thread共享。
Thread其实是一个对象,thread有thread.run()和thread.start()两种方法,run方法其实没有新建线程,而是在当前线程中直接执行;start才会真正创建一个线程,start带有synchronized同步锁,且一个nativeCreate的native方法去请求CPU,这个函数会把Thread自己的实例传进去,是c++实现的,sleep/interrupt等方法都是在这里实现的。
Handler是用来和Looper中的消息队列交互的,handler通过ThreadLocal获取Looper的MessageQueue,可以向queue中添加message,message的target又指向handler,这样Looper循环处理消息时,会把消息再交给handler去处理。

可不要被迷惑,这里的t是通过work.thread; 得到的。这时候我们需要查看work类中的run方法。
work在ThreadPoolExecutor为一个内部类实现了Runnable接口。只有一个构造方法

如果我们手工使用JDK Timer(Quartz的Scheduler),在Web容器启动时启动Timer,当Web容器关闭时,除非你手工关闭这个Timer,否则Timer中的任务还会继续运行!

常用的线程类

Thread和HandlerThread
在使用场景上,HandlerThread更节省资源:
如果多次调用new Thread(){...},会创造多个匿名线程,销毁资源
使用HandlerThread,是通过Looper缓存任务,重用线程,节省资源开销
Thread、Runnable和Callable
Runnable是个接口,实现更加灵活,而且一个Runnable的实例可以被多个Thread复用

我们通常都是通过执行execute(Runnable command)方法来向线程池提交一个不需要返回结果的任务的(如果你需要返回结果那么就是 <T> Future<T> submit(Callable<T> task)方法),怀着一颗探索的心,敲敲翻开了线程池的源码:

在使用长连接的comet服务端推送技术中,消息推送线程设置为守护线程,服务于ChatServlet的servlet用户线程,在servlet的init启动消息线程,servlet一旦初始化后,一直存在服务器,servlet摧毁后,消息线程自动退出

sleep() wait() join() yield()

sleep阻塞暂停,不释放锁和cpu
wait释放锁和cpu,需要被notify才能唤醒
join是暂停当前线程,先执行join进来的线程
yield是把当前线程设为可执行状态,给同等或更高优先级的其他线程一个执行机会,线程yield之后,很有可能继续恢复执行

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //得到worker对象中我们提交的任务
        Runnable task = w.firstTask;

        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果当前任务为空  那么就从getTask中获得任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //任务执行前调用的方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务结束后调用的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

[java] view plain copy

线程的定义和状态

创建、就绪、运行、阻塞、停止

今天看到了别人的一个代码,为了实现每小时重启一下MQ拉取消息,他使用的是Thread.sleep(1000*60*60)方法,然后重启MQ。我一看到就非常头疼啊。。为什么要使用这种方式而不使用java的线程池呢?于是我就问他,他说当时为了方便。大家都知道Thread.sleep期间是不会释放共享资源的,会造成死锁现象。然后我就想Thread.sleep可以在睡觉过程中等待被interrupt中断,然后继续工作。那么线程池是怎么保证他的核心线程不释放 而一直等待任务的执行的呢?难道我们一直理解的线程run方法执行完毕线程就销毁是不正确的?而且还有我们为何通过设置allowCoreThreadTimeOut(true) 就能使核心线程销毁的呢?

[html] view plain copy

线程池

线程池可以节约创建和销毁线程的资源开销。

  1. 线程池常见的几个类的用法:
    ThreadPoolExecutor、Executor,Executors,ExecutorService,CompletionService,Future,Callable 等
  2. 线程池四个分类
    newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool和SingleThreadExecutor
  3. 自定义线程池 ThreadPoolExecutor
    线程池工作原理
    核心线程数、等待队列、处理策略等
 public void execute(Runnable command) {
         /*如果提交的任务为null  抛出空指针异常*/
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        /*如果当前的任务数小于等于设置的核心线程大小,那么调用addWorker直接执行该任务*/
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /*如果当前的任务数大于设置的核心线程大小,而且当前的线程池状态时运行状态,那么向阻塞队列中添加任务*/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*如果向队列中添加失败,那么就新开启一个线程来执行该任务*/
        else if (!addWorker(command, false))
            reject(command);
    }

 

线程优先级

线程优先级的系统规则
线程是具有优先级的,高优先级的线程有更多CPU资源。
继承-线程优先级具有继承性,如果ThreadA启动了ThreadB,B默认具有和A一样的优先级。(this.priority = parent.getPriority();)
设置-优先级可以手动设置。
并行-高优先级的线程能获得更多CPU资源,但是低优先级的线程也能继续工作,高优先级并不会先执行
随机-高优先级的线程并不一定先执行,执行顺序是随机的。
线程优先级的设置
Thread和HandlerThread设置优先级的方式不一样。
Thread中线程优先级范围是1~10,数值越高,优先级越高,默认是5。
java.lang.Thread.setPriority(int i);
HandlerThread中线程优先级范围是-20~19,数值越低,优先级越高,默认为0。
android.os.Process.setThreadPriority(int p);
android.os.Process.setThreadPriority(int tid, int p);
HandlerThread还可以通过new HandlerThread("tname",-3)来设置。
在Thread或Runnable的run方法中,也可以通过Process.setThreadPriority设置优先级。
一般在实际应用中,通过Process设置优先级,对线程调度影响效果更明显,因为Process是android系统特别优化过的,是native的方法。

当线程池中的线程数小于corePoolSize 时,新提交的任务直接新建一个线程执行任务(不管是否有空闲线程)
当线程池中的线程数等于corePoolSize 时,新提交的任务将会进入阻塞队列(workQueue)中,等待线程的调度
当阻塞队列满了以后,如果corePoolSize < maximumPoolSize ,则新提交的任务会新建线程执行任务,直至线程数达到maximumPoolSize
当线程数达到maximumPoolSize 时,新提交的任务会由(饱和策略)管理

转到Tomcat控制台,你将看到虽然Web应用已经关闭,但Timer任务还在我行我素地执行如故——舞台已经拆除,戏子继续表演: 

  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 重写了run方法  */
        public void run() {
            runWorker(this);
        }

本文由胜博发-运维发布,转载请注明来源:低29位表示当前线程的运行任务数量澳门博发娱乐