Java 5.0 多线程编程实践

2016-02-19 21:41 4 1 收藏

今天图老师小编给大家介绍下Java 5.0 多线程编程实践,平时喜欢Java 5.0 多线程编程实践的朋友赶紧收藏起来吧!记得点赞哦~

【 tulaoshi.com - 编程语言 】

  Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

  简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:

  1. 建立监听端口,创建线程池。

(本文来源于图老师网站,更多请访问https://www.tulaoshi.com/bianchengyuyan/)

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

(本文来源于图老师网站,更多请访问https://www.tulaoshi.com/bianchengyuyan/)

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。

  ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

  private static final int PORT = 19527;
  serverListenSocket = new ServerSocket(PORT);
  serverListenSocket.setReuseAddress(true);
  serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

  while(true){
  Socket socket = serverListenSocket.accept();
  pool.execute(new ServiceThread(socket));
  }

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

  private static ReentrantLock lock = new ReentrantLock ();
  private static int count = 0;
  private int getCount(){
  int ret = 0;
  try{
  lock.lock();
  ret = count;
  }finally{
  lock.unlock();
  }
  return ret;
  }
  private void increaseCount(){
  try{
  lock.lock();
  ++count;
  }finally{
  lock.unlock();
  }
  }

  服务线程在开始给客户端打印一个欢迎信息,

  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount+" ";
  dos = new DataOutputStream(connectedSocket.getOutputStream());
  dos.write(helloString.getBytes());

服务器端的完整实现

  服务器端的完整实现代码如下:

  package com.andrew;

  import java.io.DataOutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.net.ServerSocket;
  import java.net.Socket;
  import java.util.concurrent.ArrayBlockingQueue;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionHandler;
  import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;

  public class Server {
  private static int produceTaskSleepTime = 100;
  private static int consumeTaskSleepTime = 1200;
  private static int produceTaskMaxNumber = 100;
  private static final int CORE_POOL_SIZE = 2;
  private static final int MAX_POOL_SIZE = 100;
  private static final int KEEPALIVE_TIME = 3;
  private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
  private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
  private static final String HOST = "127.0.0.1";
  private static final int PORT = 19527;
  private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
  //private ThreadPoolExecutor serverThreadPool = null;
  private ExecutorService pool = null;
  private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
  private ServerSocket serverListenSocket = null;
  private int times = 5;
  public void start() {
  // You can also init thread pool in this way.
  /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
  MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
  rejectedExecutionHandler);*/
  pool = Executors.newFixedThreadPool(10);
  try {
   serverListenSocket = new ServerSocket(PORT);
   serverListenSocket.setReuseAddress(true);

   System.out.println("I'm listening");
   while (times--  0) {
    Socket socket = serverListenSocket.accept();
    String welcomeString = "hello";
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
    pool.execute(new ServiceThread(socket));
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  cleanup();
  }

  public void cleanup() {
  if (null != serverListenSocket) {
   try {
    serverListenSocket.close();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  //serverThreadPool.shutdown();
  pool.shutdown();
  }

  public static void main(String args[]) {
  Server server = new Server();
  server.start();
  }
  }

  class ServiceThread implements Runnable, Serializable {
  private static final long serialVersionUID = 0;
  private Socket connectedSocket = null;
  private String helloString = null;
  private static int count = 0;
  private static ReentrantLock lock = new ReentrantLock();

  ServiceThread(Socket socket) {
  connectedSocket = socket;
  }

  public void run() {
  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount + "";

  ExecutorService executor = Executors.newSingleThreadExecutor();
  Future future = executor.submit(new TimeConsumingTask());

  DataOutputStream dos = null;
  try {
   dos = new DataOutputStream(connectedSocket.getOutputStream());
   dos.write(helloString.getBytes());
   try {
    dos.write("let's do soemthing other.".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } finally {
   if (null != connectedSocket) {
    try {
     connectedSocket.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   if (null != dos) {
    try {
     dos.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   executor.shutdown();
  }
  }

  private int getCount() {
  int ret = 0;
  try {
   lock.lock();
   ret = count;
  } finally {
   lock.unlock();
  }
  return ret;
  }

  private void increaseCount() {
  try {
   lock.lock();
   ++count;
  } finally {
   lock.unlock();
  }
  }
  }

  class TimeConsumingTask implements Callable {
  public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
  }

  }

来源:https://www.tulaoshi.com/n/20160219/1626879.html

延伸阅读
我这里可以大概给你介绍一下,但对于每一种编程模型要看具体的示例是什么,而且我不可能给你罗列所有的代码,请谅解。 其实我们编程只要尽量站到比较高的层次,很多道理其实你会发现你已经懂了。 就多线程来说,我们开始设想只有两个线程(2时是不是算数学归纳法?)那么如果两个独立的线程会发生什么呢? 1。当一个线程进...
线程组 线程是被个别创建的,但可以将它们归类到线程组中,以便于调试和监视。只能在创建线程的同时将它与一个线程组相关联。在使用大量线程的程序中,使用线程组组织线程可能很有帮助。可以将它们看作是计算机上的目录和文件结构。 !-- frame contents -- !-- /frame contents -- 线程间发信 ...
import java.io.*;//多线程编程public class MultiThread{public static void main(String args[]){System.out.println("我是主线程!");//下面创建线程实例thread1ThreadUseExtends thread1=new ThreadUseExtends();//创建thread2时以实现了Runnable接口的THhreadUseRunnable类实例为参数Thread thread2=new Thread(new ThreadU...
一 Java 语言的
限制线程优先级和调度 Java 线程模型涉及可以动态更改的线程优先级。本质上,线程的优先级是从 1 到 10 之间的一个数字,数字越大表明任务越紧急。JVM 标准首先调用优先级较高的线程,然后才调用优先级较低的线程。但是,该标准对具有相同优先级的线程的处理是随机的。!-- frame contents -- !-- /frame contents --如何处理这些线...

经验教程

760

收藏

30
微博分享 QQ分享 QQ空间 手机页面 收藏网站 回到头部