Java多线程设计模式之线程池模式

前序:

成都创新互联专业为企业提供揭东网站建设、揭东做网站、揭东网站设计、揭东网站制作等企业网站建设、网页设计与制作、揭东企业网站模板建站服务,十年揭东做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

  Thread-Per-Message Pattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作。它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现。该线程模式主要包括三个部分:

  1,Request参与者(委托人),也就是消息发送端或者命令请求端

  2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程。

  3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动。

  由于常规调用一个方法后,必须等待该方法完全执行完毕后才能继续执行下一步操作,而利用线程后,就不必等待具体任务执行完毕,就可以马上返回继续执行下一步操作。

  背景:

  由于在Thread-Per-Message Pattern中对于每一个请求都会生成启动一个线程,而线程的启动是很花费时间的工作,所以鉴于此,提出了Worker Thread,重复利用已经启动的线程。

  线程池:

  Worker Thread,也称为工人线程或背景线程,不过一般都称为线程池。该模式主要在于,事先启动一定数目的工作线程。当没有请求工作的时候,所有的工人线程都会等待新的请求过来,一旦有工作到达,就马上从线程池中唤醒某个线程来执行任务,执行完毕后继续在线程池中等待任务池的工作请求的到达。

  任务池:主要是存储接受请求的集合,利用它可以缓冲接受到的请求,可以设置大小来表示同时能够接受最大请求数目。这个任务池主要是供线程池来访问。

  线程池:这个是工作线程所在的集合,可以通过设置它的大小来提供并发处理的工作量。对于线程池的大小,可以事先生成一定数目的线程,根据实际情况来动态增加或者减少线程数目。线程池的大小不是越大越好,线程的切换也会耗时的。

  存放池的数据结构,可以用数组也可以利用集合,在集合类中一般使用Vector,这个是线程安全的。

  Worker Thread的所有参与者:

  1,Client参与者,发送Request的参与者

  2,Channel参与者,负责缓存Request的请求,初始化启动线程,分配工作线程

  3,Worker参与者,具体执行Request的工作线程

  4,Request参与者

  注意:将在Worker线程内部等待任务池非空的方式称为正向等待。

  将在Channel线程提供Worker线程来判断任务池非空的方式称为反向等待。

  线程池实例1:

  利用同步方法来实现,使用数组来作为任务池的存放数据结构。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用反向等待来判断任务池的非空状态。

  Channel参与者:

 
 
 
  1. package whut.threadpool;
  2. //用到了生产者与消费者模式
  3. //生成线程池,接受客户端线程的请求,找到一个工作线程分配该客户端请求
  4. public class Channel {
  5.     private static final int MAX_REQUEST = 100;// 并发数目,就是同时可以接受多少个客户端请求
  6.     //利用数组来存放请求,每次从数组末尾添加请求,从开头移除请求来处理
  7.     private final Request[] requestQueue;// 存储接受客户线程的数目
  8.     private int tail;//下一次存放Request的位置
  9.     private int head;//下一次获取Request的位置
  10.     private int count;// 当前request数量
  11.     private final WorkerThread[] threadPool;// 存储线程池中的工作线程
  12.     // 运用数组来存储
  13.     public Channel(int threads) {
  14.         this.requestQueue = new Request[MAX_REQUEST];
  15.         this.head = 0;
  16.         this.head = 0;
  17.         this.count = 0;
  18.         threadPool = new WorkerThread[threads];
  19.         // 启动工作线程
  20.         for (int i = 0; i < threadPool.length; i++) {
  21.             threadPool[i] = new WorkerThread("Worker-" + i, this);
  22.         }
  23.     }
  24.     public void startWorkers() {
  25.         for (int i = 0; i < threadPool.length; i++) {
  26.             threadPool[i].start();
  27.         }
  28.     }
  29.     // 接受客户端请求线程
  30.     public synchronized void putRequest(Request request) {
  31.         // 当Request的数量大于或等于同时接受的数目时候,要等待
  32.         while (count >= requestQueue.length)
  33.             try {
  34.                 wait();
  35.             } catch (InterruptedException e) {
  36.             }
  37.         requestQueue[tail] = request;
  38.         tail = (tail + 1) % requestQueue.length;
  39.         count++;
  40.         notifyAll();
  41.     }
  42.     // 处理客户端请求线程
  43.     public synchronized Request takeRequest() {
  44.         while (count <= 0)
  45.             try {
  46.                 wait();
  47.             } catch (InterruptedException e) {
  48.             }
  49.         Request request = requestQueue[head];
  50.         head = (head + 1) % requestQueue.length;
  51.         count--;
  52.         notifyAll();
  53.         return request;
  54.     }
  55. }

客户端请求线程:

 
 
 
  1. package whut.threadpool;
  2. import java.util.Random;
  3. //向Channel发送Request请求的
  4. public class ClientThread extends Thread{
  5.     private final Channel channel;
  6.     private static final Random random=new Random();
  7.                                                                
  8.     public ClientThread(String name,Channel channel)
  9.     {
  10.         super(name);
  11.         this.channel=channel;
  12.     }
  13.     public void run()
  14.     {
  15.         try{
  16.             for(int i=0;true;i++)
  17.             {
  18.                 Request request=new Request(getName(),i);
  19.                 channel.putRequest(request);
  20.                 Thread.sleep(random.nextInt(1000));
  21.             }
  22.         }catch(InterruptedException e)
  23.         {
  24.         }
  25.     }
  26. }

 工作线程:

 
 
 
  1. package whut.threadpool;
  2. //具体工作线程
  3. public class WorkerThread extends Thread{
  4.                                                       
  5.     private final Channel channel;
  6.     public WorkerThread(String name,Channel channel)
  7.     {
  8.       super(name);
  9.       this.channel=channel;
  10.     }
  11.                                                       
  12.     public void run()
  13.     {
  14.         while(true)
  15.         {
  16.             Request request=channel.takeRequest();
  17.             request.execute();
  18.         }
  19.     }
  20. }

#p#

线程池实例2:

  工作线程:

  利用同步块来处理,利用Vector来存储客户端请求。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用正向等待来判断任务池的非空状态。

  这种实例,可以借鉴到网络ServerSocket处理用户请求的模式中,有很好的扩展性与实用性。

  利用Vector来存储,依旧是每次集合的最后一个位置添加请求,从开始位置移除请求来处理。

  Channel参与者:

 
 
 
  1. package whut.threadpool2;
  2. import java.util.Vector;
  3. /*
  4.  * 这个主要的作用如下
  5.  * 0,缓冲客户请求线程(利用生产者与消费者模式)
  6.  * 1,存储客户端请求的线程
  7.  * 2,初始化启动一定数量的线程
  8.  * 3,主动来唤醒处于任务池中wait set的一些线程来执行任务
  9.  */
  10. public class Channel {
  11.     public final static int THREAD_COUNT=4;
  12.     public static void main(String[] args) {
  13.       //定义两个集合,一个是存放客户端请求的,利用Vector,
  14.       //一个是存储线程的,就是线程池中的线程数目
  15.                              
  16.       //Vector是线程安全的,它实现了Collection和List
  17.       //Vector 类可以实现可增长的对象数组。与数组一样,
  18.       //它包含可以使用整数索引进行访问的组件。但Vector 的大小可以根据需要增大或缩小,
  19.       //以适应创建 Vector 后进行添加或移除项的操作。
  20.       //Collection中主要包括了list相关的集合以及set相关的集合,Queue相关的集合
  21.       //注意:Map不是Collection的子类,都是java.util.*下的同级包
  22.       Vector pool=new Vector();
  23.       //工作线程,初始分配一定限额的数目
  24.       WorkerThread[] workers=new WorkerThread[THREAD_COUNT];
  25.                           
  26.       //初始化启动工作线程
  27.       for(int i=0;i
  28.       {
  29.           workers[i]=new WorkerThread(pool);
  30.           workers[i].start();
  31.       }
  32.                            
  33.       //接受新的任务,并且将其存储在Vector中
  34.       Object task=new Object();//模拟的任务实体类
  35.       //此处省略具体工作
  36.       //在网络编程中,这里就是利用ServerSocket来利用ServerSocket.accept接受一个Socket从而唤醒线程
  37.                            
  38.       //当有具体的请求达到
  39.       synchronized(pool)
  40.       {
  41.           pool.add(pool.size(), task);
  42.           pool.notifyAll();//通知所有在pool wait set中等待的线程,唤醒一个线程进行处理
  43.       }
  44.       //注意上面这步骤添加任务池请求,以及通知线程,都可以放在工作线程内部实现
  45.       //只需要定义该方法为static,在方法体用同步块,且共享的线程池也是static即可
  46.                            
  47.       //下面这步,可以有可以没有根据实际情况
  48.       //取消等待的线程
  49.       for(int i=0;i
  50.       {
  51.           workers[i].interrupt();
  52.       }
  53.     }
  54. }

工作线程:

 
 
 
  1. package whut.threadpool2;
  2. import java.util.List;
  3. public class WorkerThread extends Thread {
  4.     private List pool;//任务请求池
  5.     private static int fileCompressed=0;//所有实例共享的
  6.                      
  7.     public WorkerThread(List pool)
  8.     {
  9.           this.pool=pool; 
  10.     }
  11.                      
  12.     //利用静态synchronized来作为整个synchronized类方法,仅能同时一个操作该类的这个方法
  13.     private static synchronized void incrementFilesCompressed()
  14.     {
  15.         fileCompressed++;
  16.     }
  17.                      
  18.     public void run()
  19.     {
  20.         //保证无限循环等待中
  21.         while(true)
  22.         {
  23.             //共享互斥来访问pool变量
  24.             synchronized(pool)
  25.             {
  26.                 //利用多线程设计模式中的
  27.                 //Guarded Suspension Pattern,警戒条件为pool不为空,否则无限的等待中
  28.                 while(pool.isEmpty())
  29.                 {
  30.                     try{
  31.                         pool.wait();//进入pool的wait set中等待着,释放了pool的锁
  32.                     }catch(InterruptedException e)
  33.                     {
  34.                     }
  35.                 }
  36.                 //当线程被唤醒,需要重新获取pool的锁,
  37.                 //再次继续执行synchronized代码块中其余的工作
  38.                 //当不为空的时候,继续再判断是否为空,如果不为空,则跳出循环
  39.                 //必须先从任务池中移除一个任务来执行,统一用从末尾添加,从开始处移除
  40.                                  
  41.                 pool.remove(0);//获取任务池中的任务,并且要进行转换
  42.             }
  43.             //下面是线程所要处理的具体工作
  44.         }
  45.     }
  46. }

网站名称:Java多线程设计模式之线程池模式
URL标题:http://www.hantingmc.com/qtweb/news47/267947.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联