并发的多面性
基本的并发
定义任务
线程可以驱动任务,我们通过实现 Runnable 接口来提供,需要实现 Runnable 接口的 run() 方法。
package concurrency;
/**
* Created by wwh on 16-3-24.
*/
public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "LiftOff!") + "), ";
}
public void run() {
while (countDown-- > 0) {
System.out.println(status());
/* Thread.yield() 是对线程调度器的一种建议,可以将CPU从一个线程转移给另一个线程 */
Thread.yield();
}
}
}
package concurrency;
/**
* Created by wwh on 16-3-24.
*/
public class MainThread {
public static void main(String[] args) {
LiftOff lauch = new LiftOff();
lauch.run();
}
}
我们也可以通过继承 Thread 类覆盖 run() 方法来实现线程类,但继承Thread类有一个缺点就是单继承,而实现Runnable接口则弥补了它的缺点,可以实现多继承。而且实现 Runnable 接口适合多线程共享资源,继承 Thread 类适合各个线程完成自己的任务,因为继承 Thread 类相当于每个线程有一份各自的资源,而实现 Runnable 还可以让多个线程共享一份代码,具体戳这里。
Thread 类
将 Runnable 对象转变为工作任务的传统方式是将它提交给一个 Thread 构造器。
public class BasicThreads {
public static void main(String[] args) {
for (int i = 0; i < 5; ++i) {
new Thread(new LiftOff()).start();
}
System.out.println("Waiting for LiftOff");
}
}
Thread 构造器只需要一个 Runnable 对象。调用 Thread 对象的 start() 方法为该线程执行必须的初始化操作,然后内部调用 Runnable 的 run() 方法。
使用 Executor
Java SE5 并发包中引入执行器可以为我们管理线程 Thread 对象。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by wwh on 16-3-24.
*/
public class CachedThreadPool {
public static void main(String []args) {
/* ExecutorServive 是具有声明周期的 Executor */
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; ++i) {
exec.execute(new LiftOff());
}
/* shutdown 被用来防止新任务被提交给 Executor */
exec.shutdown();
}
}
ThreadPool 种类很多。如下图:
包括
CacheThreadPool:为每个任务都创建一个线程
FixedThreadPool:一次性预先分配好固定大小的线程
SingleThreadExecutor:线程数唯一,提交多个任务会排队等候
ScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
从任务中产生返回值
如果我们希望任务完成时能够返回一个值,那么可以实现 Callable 接口而不是 Runnable 接口,实现 Callable 接口要求覆盖 Call() 方法。
package concurrency;
import java.util.ArrayList;
import java.util.concurrent.*;
/**
* Created by wwh on 16-3-24.
*/
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() throws Exception {
return "reslut of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String []args) {
ExecutorService exec = Executors.newCachedThreadPool();
/* 定义 Future 对象,在将来获取 */
ArrayList<Future<String>> results =
new ArrayList<Future<String>>();
for (int i = 0; i < 10; ++i) {
/* submit() 方法会产生 Future 对象,可以通过 Future 对象的 isDone() 方法来判断查询 Future 是否已经完成,当完成时,可以使用 get() 方法获取结果 */
results.add(exec.submit(new TaskWithResult(i)));
}
for (Future<String> fs : results) {
try {
System.out.println(fs.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
exec.shutdown();
}
}
}
}
优先级
线程的优先级将该线程的重要性传递给了调度器。调度器会倾向于让优先级高的线程先运行。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by wwh on 16-3-24.
*/
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d;
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
while (true) {
/* 这里只有循环次数比较大才能看出优先级的优势 */
for (int i = 0; i < 100000000; ++i) {
d += (Math.PI + Math.E) / (double)i;
if (i % 1000 == 0) {
Thread.yield();
}
}
System.out.println(this);
if (--countDown == 0) {
return;
}
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; ++i) {
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
加入一个线程
一个线程可以在其他线程之上调用 join() 方法,其效果是等待一段时间直到第二个线程结束才继续执行。join() 调用时可以携带超时参数。
package concurrency;
/**
* Created by wwh on 16-3-24.
*/
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
/* 等待线程 */
sleeper.join();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Interrupted");
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
drpey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
共享受限资源
多个线程可能出现访问共享资源的情况。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by wwh on 16-4-7.
*/
public class Counter implements Runnable {
private static int counter = 0;
public static int getCounter() {
return counter;
}
public void run() {
for (int i = 0; i < 1000000; ++i) {
counter++;
}
}
public static void test(int n) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < n; ++i) {
executorService.execute(new Counter());
}
executorService.shutdown();
}
public static void main(String[] args) throws InterruptedException {
Counter.test(10);
System.out.println(Counter.getCounter());
}
}
以上代码没有进行同步,多个线程同时增加计数器。所以导致结果不正确。
为了解决共享资源问题,Java 为我们提供了几种方式。
synchronzied:包括两种用法,synchronzied 方法和 synchronized 块。对于有 synchronzied 关键字修饰的类方法或代码块,执行时首先要获取该类实例的锁,执行完毕后释放。在执行过程中要有其它线程等请求该 synchronzied 方法或代码块则被阻塞。
Lock:互斥锁,显示锁对象。Lock 对象必须被显式创建、锁定和释放。
ReentrantLock:可重入锁,允许尝试着去获取锁。
这几种方式后续会详细解释。