第3章 多线程通信 前面探讨了多线程的基础知识、多线程之间如何安全地访问共享资源等。这一章着重探讨一下多线程之间是如何交互的。 3.1 wait()与notify() java.lang.Object类中内置了用于线程通信的方法wait()、notify()与notifyAll()。 public class Object { public final void wait() throws InterruptedException {} public final native void wait(long timeout) throws InterruptedException; public final native void notify(); public final native void notifyAll(); } 调用Object对象的wait()方法,会导致当前线程进入WAITING等待状态,直到另外一个线程,调用该对象的notify()或notifyAll()方法,处于WAITING状态的线程才重新转为RUNNABLE状态。 调用对象的wait()方法前,当前线程必须要拥有指定对象的监视器锁。调用wait()方法后,会释放对象的监视器锁,然后当前线程进入WAITING状态。 synchronized(obj) { obj.wait(); } 3.1.1 阻塞当前线程 参见如下代码,在主函数中启动一个线程,当i=5的时候,调用object对象的wait()方法,这会导致当前线程8被阻塞(不是主线程被阻塞)。 public static void main(String[] args) { Object object = new Object(); new Thread(new Runnable() { public void run() { for(int i=0;i<10;i++) { System.out.println(Thread.currentThread().getId() + ",i=" +i); if(i==5) { synchronized(object) { try { System.out.println(Thread.currentThread().getId() + "开始等待..."); object.wait(); } catch (Exception e) { e.printStackTrace(); } } } } } }).start(); } 程序运行结果如下,线程8被阻塞后,进入长期等待状态。 8,i=0 8,i=1 8,i=2 8,i=3 8,i=4 8,i=5 8开始等待... 在主函数中,增加一个新的线程,延迟1秒后,发送notify()通知。 new Thread(new Runnable() { public void run() { System.out.println(Thread.currentThread().getId() + " running..."); try { Thread.sleep(1000); synchronized(object) { System.out.println(Thread.currentThread().getId() + ",发送notify通知..."); object.notify(); } } catch (Exception e) {} } }).start(); 重新运行程序,运行结果如下。线程8进入WAITING状态后,线程9延迟1秒后发出了notify()通知。线程8接到通知后继续运行直至结束。 8,i=0 8,i=1 8,i=2 8,i=3 8,i=4 8,i=5 8开始等待... 9 running... 9,发送notify通知... 8,i=6 8,i=7 8,i=8 8,i=9 注意,不管是调用object的wait()方法,还是调用object的notify()方法,都要提前获得object对象的监视器锁: synchronized(object) { ... } 3.1.2 案例分析:厨师与侍者1 假设有一个小饭店,里面只有一个厨师和一个侍者(服务员)。厨师只有收到服务员的通知才开始做菜,没有工作时厨师就处于等待状态。服务员只有收到顾客的订单,才会通知厨师工作,没有订单时,服务员也处于等待状态。厨师做完菜,会通知服务员取餐。使用多线程,模拟这个小饭店的工作状况。 厨师什么时候做菜,应该做什么菜,由服务员根据订单通知厨师。而服务员什么时候可以取菜,则是看厨师什么时候做完菜,做完后才会通知服务员取菜。 操作步骤如下: (1)新建订单类,模拟最多10个订单。 class Order{ private static int i=0; private int m_count; public Order(){ m_count = i++; if(m_count==10){ System.out.println("没有食物了,结束!"); System.exit(0); } } } (2)新建饭店类,成员变量订单表示饭店当前是否存在订单。 class Restaurant { public Order order; } (3)新建厨师类Chef,Chef启动后,就处于等待waiter通知的状态。饭店当前订单为空时,模拟生成新订单。 class Chef extends Thread{ private Restaurant restaurant; private Waiter waiter; public Chef(Restaurant restaurant,Waiter waiter){ this.restaurant = restaurant; this.waiter = waiter; } public void run(){ while(true){ if(restaurant.order == null){ restaurant.order = new Order(); System.out.println("厨师-" + Thread.currentThread().getId() + ",接到新订单"); synchronized(waiter){ System.out.println("厨师-" + Thread.currentThread().getId() + ",通知waiter取食物"); waiter.notify(); } try { Thread.sleep(1000); } catch (Exception e) { } } } } } (4)新建侍者类Waiter。厨师做完菜后,通知侍者取餐。 class Waiter extends Thread{ private Restaurant restaurant; public Waiter(Restaurant r){ restaurant = r; } public void run() { while(restaurant.order == null){ synchronized(this){ try { System.out.println("Waiter-" + Thread.currentThread().getId() + ",等待中"); wait(); restaurant.order = null; System.out.println("Waiter-" + Thread.currentThread().getId() + ",收到通知,取走订单 "); } catch (Exception e) { } } } } } (5)代码测试。 public static void main(String[] args) { Restaurant restaurant = new Restaurant(); Waiter waiter = new Waiter(restaurant); waiter.start(); Chef chef = new Chef(restaurant,waiter); chef.start(); } 程序运行结果如下: Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 厨师-9,接到新订单 厨师-9,通知waiter取食物 Waiter-8,收到通知,取走订单 Waiter-8,等待中 没有食物了,结束! 3.1.3 案例分析:厨师与侍者2 3.1.2节的厨师与侍者案例引自Thinking in Java,都是在厨师类中模拟创建的订单,然后通知服务员(侍者)取菜。这个流程过于简单了,与实际情况不符。本节在3.1.2节的基础上,做进一步优化。模拟顾客在饭店通过服务员点菜,然后服务员通知厨师做菜;厨师做菜完成后会通知服务员取菜;厨师与服务员无事时,则处于空闲等待状态。 实现步骤如下: (1)定义订单类,设置订单编号和订单内容。 public class Order implements Serializable{ private String dno; private String info; public String getDno() { return dno; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public Order(String dno) { this.dno = dno; } } (2)定义饭店类,模拟顾客进入饭店,随机生成订单。 class Restaurant implements Runnable { private Waiter waiter; public void setWaiter(Waiter waiter) { this.waiter = waiter; } public void run() { while(true) { int rand = (int)(Math.random()*5000); try { System.out.println("饭店等待顾客中---------------"); Thread.sleep(rand); String dno = "d" + System.currentTimeMillis(); Order order = new Order(dno); order.setInfo("宫保鸡丁一份..."); System.out.println("顾客来了,通知服务员点菜,生成订单:" + dno ); synchronized (waiter) { waiter.setOrder(order); //把订单给服务员 waiter.setMsgID(1); waiter.notify(); } } catch (Exception e) { } } } } (3)创建Waiter类,它可以接收新订单通知,也可以接收取菜通知。 class Waiter implements Runnable{ private Order order; private Chef chef; private int msgID = 1; //如果是新订单通知ID=1, 如果是取菜通知ID=2 public void setMsgID(int msgID) { this.msgID = msgID; } public void setChef(Chef chef) { this.chef = chef; } public void setOrder(Order order) { this.order = order; } public void run() { while(true) { synchronized(this) { try { System.out.println("服务员空闲等待中..."); this.wait(); } catch (Exception e) { e.printStackTrace(); } } if(msgID == 1) { //服务员收到新订单通知 System.out.println("waiter收到订单:" + this.order.getDno() + "," + this.order.getInfo()); //通知厨师做菜 synchronized(chef) { System.out.println("waiter通知厨师做菜..."); chef.setOrder(order); chef.notify(); } }else { //服务员收到了取菜通知 System.out.println("waiter取菜给顾客...."); } } } } (4)定义厨师类。厨师收到服务员的做菜通知后开始烹饪,菜做完后通知服务员取菜。 class Chef implements Runnable{ private Order order; private Waiter waiter; public void setOrder(Order order) { this.order = order; } public void setWaiter(Waiter waiter) { this.waiter = waiter; } public void run() { while(true) { synchronized(this) { try { System.out.println("厨师空闲等待中..."); this.wait(); } catch (Exception e) { e.printStackTrace(); } } //厨师收到订单通知 int rand = (int)(Math.random()*800); try { Thread.sleep(rand); } catch (Exception e) { } System.out.println("厨师做菜完成,通知waiter取菜..."); synchronized(this.waiter) { waiter.setMsgID(2); waiter.notify(); } } } } (5)在主函数中,模拟饭店的点菜流程,进行测试。 public static void main(String[] args) { Chef chef = new Chef(); Waiter waiter = new Waiter(); waiter.setChef(chef); chef.setWaiter(waiter); Restaurant rest = new Restaurant(); rest.setWaiter(waiter); new Thread(waiter).start(); new Thread(chef).start(); new Thread(rest).start(); } 程序运行结果如下,本节案例与真实项目环境更加贴近。 厨师空闲等待中... 服务员空闲等待中... 饭店等待顾客中--------------- 顾客来了,通知服务员点菜,生成订单:d1597404292617 waiter收到订单:d1597404292617,宫保鸡丁一份... waiter通知厨师做菜... 服务员空闲等待中... 饭店等待顾客中--------------- 厨师做菜完成,通知waiter取菜... 厨师空闲等待中... waiter取菜给顾客... 服务员空闲等待中... 顾客来了,通知服务员点菜,生成订单:d1597404293698 饭店等待顾客中--------------- waiter收到订单:d1597404293698,宫保鸡丁一份... waiter通知厨师做菜... 服务员空闲等待中... 厨师做菜完成,通知waiter取菜... 厨师空闲等待中... 3.1.4 案例分析:两个线程交替输出信息 本节案例是让两个线程交替输出信息:一个线程输出aa,另一个线程输出bb。 这个案例的关键在于共享对象的状态为开关项,当某个线程锁定对象并读取对象状态时,另一个线程必须等待。 操作步骤如下: (1)新建状态类State,设置布尔值bRet作为开关项。 class State { public boolean bRet = false; } (2)新建任务类PrintA,锁定状态对象,并判断开关项为真时进入等待状态。 class PrintA implements Runnable { private State state; public PrintA(State state){ this.state = state; } public void run(){ while (true) { try { synchronized (state) { if(state.bRet){ state.wait(); } System.out.println("aa..."); Thread.sleep(1000); state.bRet = true; state.notify(); } }catch(Exception e){ e.printStackTrace(); } }}} (3)新建任务类PrintB,锁定状态对象,并判断开关项为假时进入等待状态。 class PrintB implements Runnable { private State state; public PrintB(State state){ this.state = state; } public void run(){ while (true) { try { synchronized(state) { if(!state.bRet){ state.wait(); } System.out.println("bb..."); Thread.sleep(1000); state.bRet = false; state.notify(); } }catch(Exception e){ e.printStackTrace(); } } } } (4)代码测试。 public static void main(String[] args) { State state = new State(); new Thread(new PrintA(state)).start(); new Thread(new PrintB(state)).start(); } 测试结果如下,交替输出aa...和bb...,循环往复。 aa... bb... aa... bb... aa... 3.2 join线程排队 线程A调用线程B对象的join()方法,会导致线程A的运行中断,直到线程B运行完毕或超时,线程A才继续运行。 public class Thread { public final void join() throws InterruptedException { } public final synchronized void join(long millis) throws InterruptedException {} } 需要注意的是,join(long?millis)方法中传入的超时参数不能为负数,否则将抛出IllegalArgumentException异常。如果millis参数为0,则表示永远等待。 3.2.1 加入者与休眠者 分别定义加入者Joiner与休眠者Sleeper。在Sleeper运行过程中,如果Joiner加入Sleeper的运行中,会导致Sleeper的运行被阻塞,直到Joiner运行完毕,Sleeper才会继续运行。 开发步骤如下: (1)新建类Joiner,循环输出10个k值。 class Joiner extends Thread{ public void run() { System.out.println("Joiner 线程id=" + Thread.currentThread().getId()+ " run..."); try { for(int i=0;i<10;i++){ Thread.sleep(100); System.out.println("线程" + Thread.currentThread().getId() + "---k=" + i); } } catch (Exception e) { } System.out.println(Thread.currentThread().getId()+ " end..."); } } (2)新建类Sleeper,循环输出10个i值。当i=5时,joiner加入。 class Sleeper extends Thread{ private Joiner joiner; public void setJoiner(Joiner joiner) { this.joiner = joiner; } public void run() { System.out.println("Sleeper线程id=" + Thread.currentThread().getId() + " run..."); try { for(int i=0;i<10;i++){ if(i==5 && joiner != null) { System.out.println("joiner加入,线程" + Thread.currentThread().getId() + "被阻塞"); joiner.join(); } Thread.sleep(100); System.out.println("线程" + Thread.currentThread().getId() + "---i=" + i); } } catch (Exception e) { } System.out.println(Thread.currentThread().getId()+ " end..."); } } (3)代码测试,同时启动Joiner与Sleeper两个线程。当i=5时,Joiner加入Sleeper,这将导致Sleeper的运行被阻塞。 public static void main(String[] args) { Sleeper sleeper = new Sleeper(); Joiner joiner = new Joiner(); sleeper.setJoiner(joiner); sleeper.start(); joiner.start(); } 代码运行结果如下,在Joiner加入前,Joiner与Sleeper两个线程同时运行。当Joiner加入后,Sleeper被阻塞。直到Joiner运行结束,Sleeper才重新被激活,并运行到结束。 Sleeper线程id=8 run... Joiner 线程id=9 run... 线程8---i=0 线程9---k=0 线程8---i=1 线程9---k=1 线程8---i=2 线程9---k=2 线程8---i=3 线程9---k=3 线程8---i=4 joiner加入,线程8被阻塞 线程9---k=4 线程9---k=5 线程9---k=6 线程9---k=7 线程9---k=8 线程9---k=9 9 end... 线程8---i=5 线程8---i=6 线程8---i=7 线程8---i=8 线程8---i=9 8 end... 3.2.2 案例:紧急任务处理 在工作的过程中经常会遇到这种情况:当你正在处理日常工作时,突然之间来了紧急任务。这时你被迫停下手里的工作优先处理紧急任务,等到紧急任务处理完成后再继续刚才未完成的工作。 操作步骤如下: (1)新建线程Worker代表日常任务的完成。当出现紧急任务时,日常任务被停止,直到紧急任务结束才继续。 class Worker extends Thread { private UrgentTask joiner; public void run() { int i = 0; while(i < 9){ try { if(i == 8){ UrgentTask urgent = joiner; System.out.println("突然接到了紧急工作,需要去完成..."); urgent.start(); urgent.join(); } System.out.println("我正在做日常工作,完成度" + (i * 10) + "%..."); TimeUnit.SECONDS.sleep(1); i += 2; } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("我已经完成了日常工作,完成度100%..."); } public void setJoiner(UrgentTask joiner) { this.joiner = joiner; } } (2)新建类UrgentTask,表示紧急任务。 class UrgentTask extends Thread { public void run() { int i = 0; while(i < 9){ try{ System.out.println("紧急任务处理,完成度" + (i * 10) + "%+++"); TimeUnit.SECONDS.sleep(1); i += 3; }catch(Exception e){ e.printStackTrace(); } } System.out.println("紧急工作完成度100%+++"); } } (3)分别创建日常工作对象和紧急任务对象。当日常工作进行到一半时,紧急任务 加入。 public static void main(String[] args) { try { Worker worker = new Worker(); UrgentTask j = new UrgentTask(); worker.start(); TimeUnit.SECONDS.sleep(3); worker.setJoiner(j); } catch (Exception e) { e.printStackTrace(); } } 程序运行结果如下: 我正在做日常工作,完成度0%... 我正在做日常工作,完成度20%... 我正在做日常工作,完成度40%... 我正在做日常工作,完成度60%... 突然接到了紧急工作,需要去完成... 紧急任务处理,完成度0%+++ 紧急任务处理,完成度30%+++ 紧急任务处理,完成度60%+++ 紧急工作完成度100%+++ 我正在做日常工作,完成度80%... 我已经完成了日常工作,完成度100%... 3.2.3 join限时阻塞 join(long millis)就是指定阻塞时间,超时自动解锁。 修改3.2.1节中Sleeper的代码,原来的加入代码为joiner.join(),现在修改为joiner.join(300),即Joiner只阻塞Sleeper线程300 ms。 其他代码不变,测试结果如下。从测试结果可以看出,Sleeper被阻塞300 ms后继续运行,无须等到Joiner运行完毕。 Sleeper线程id=8 run... Joiner 线程id=9 run... 线程8---i=0 线程9---k=0 线程8---i=1 线程9---k=1 线程8---i=2 线程9---k=2 线程8---i=3 线程9---k=3 线程8---i=4 joiner加入,线程8被阻塞 线程9---k=4 线程9---k=5 线程9---k=6 线程9---k=7 线程8---i=5 线程9---k=8 线程8---i=6 线程9---k=9 9 end... 线程8---i=7 线程8---i=8 线程8---i=9 8 end... 参考join(long millis)方法的源码如下,join()方法的底层实现依赖的是wait()方法。执行新线程join()后,被阻塞的线程进入WAITING等待状态。join()延时结束后,通过notify()通知,会唤醒处于WAITING状态的休眠线程。 public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("超时值为负"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } } 3.3 线程中断 线程类Thread中的停止线程的stop()方法与暂停线程的suspend()方法已被废弃,如果想停止一个正在运行中的线程,可以尝试使用interrupt()方法。 interrupt()方法并不能马上停止线程的运行,它只是给线程设置一个中断状态值,这相当于一个停止线程运行的建议,线程是否能够停止,由操作系统和CPU决定。 isInterrupted()方法用于判断当前线程是否处于中断状态。 public class Thread implements Runnable { public final void stop() {} public final void suspend() {} public void interrupt() {} public boolean isInterrupted() {} } 3.3.1 中断运行态线程 启动一个线程,延迟500 ms后,调用这个线程对象的interrupt()方法。观察这个线程的中断状态,查看这个线程是否能够停止。 public static void main(String[] args) { Thread t = new Thread(new Runnable() { public void run() { for(int i=0;i<10;i++) { if(Thread.currentThread().isInterrupted()) { System.out.println("收到中断通知,结束线程..."); break; }else { System.out.println(Thread.currentThread().getId() + ",i=" + i); try { Thread.sleep(100); } catch (Exception e) { } } } } }); t.start(); try { Thread.sleep(500); } catch (Exception e) { } t.interrupt(); System.out.println(t.getId() + "中断状态:" + t.isInterrupted()); } 反复运行上面的代码,可能得到完全不同的三种结果,分别如下: (1)调用线程的interrupt()方法后,最容易出现的结果就是中断状态为false,也就是说interrupt()方法没有起到任何效果。 8,i=0 8,i=1 8,i=2 8,i=3 8,i=4 8中断状态:false 8,i=5 8,i=6 8,i=7 8,i=8 8,i=9 (2)调用线程的interrupt()方法后,线程中断状态可能被设置为true。但是线程仍然继续运行,并没有被停止。 8,i=0 8,i=1 8,i=2 8,i=3 8,i=4 8中断状态:true 8,i=5 8,i=6 8,i=7 8,i=8 8,i=9 (3)调用线程的interrupt()方法后,线程真的停止运行了。这种情况出现的次数较少,需要反复尝试才可以看到。 8,i=0 8,i=1 8,i=2 8,i=3 8,i=4 8中断状态:true 收到中断通知,结束线程... 3.3.2 中断阻塞态线程 修改3.3.1节中的代码,让线程启动后进入WAITING状态。这时调用线程的interrupt()方法,观察程序运行状态。 public static void main(String[] args) { Object obj = new Object(); Thread t = new Thread(new Runnable() { public void run() { for(int i=0;i<10;i++) { if(Thread.currentThread().isInterrupted()) { System.out.println("收到中断通知,结束线程..."); break; }else { System.out.println(Thread.currentThread().getId() + ",i=" + i); try { Thread.sleep(100); synchronized(obj) { obj.wait(); } } catch (Exception e) { e.printStackTrace(); } } } } }); t.start(); } 没有调用线程对象的interrupt()方法时,程序启动后进入WAITING状态。输出结果如下所示: 8,i=0 8中断状态:false 调用线程对象的interrupt()方法,程序运行结果发生变化。 public static void main(String[] args) {