www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當(dāng)前位置:首頁 > 公眾號精選 > 21ic電子網(wǎng)
[導(dǎo)讀]大家好,我是hub哥,本文主要講解幾種常見并行模式, 具體目錄結(jié)構(gòu)如下圖 單例 單例是最常見的一種設(shè)計模式, 一般用于全局對象管理, 比如xml配置讀寫之類的. 一般分為懶漢式, 餓漢式. 懶漢式: 方法上加synchronized public static synchronized

大家好,我是hub哥,本文主要講解幾種常見并行模式, 具體目錄結(jié)構(gòu)如下圖


一文搞定!Java高并發(fā)之設(shè)計模式

單例

單例是最常見的一種設(shè)計模式, 一般用于全局對象管理, 比如xml配置讀寫之類的.

一般分為懶漢式, 餓漢式.

懶漢式: 方法上加synchronized


public static synchronized Singleton getInstance() {  if (single == null) {  single = new Singleton();  }  return single; }


這種方式, 由于每次獲取示例都要獲取鎖, 不推薦使用, 性能較差

懶漢式: 使用雙檢鎖 + volatile

private volatile Singleton singleton = null; public static Singleton getInstance() { if (singleton == null) { synchronized (Singleton.class) { if (singleton == null) { singleton = new Singleton(); } } } return singleton; }

本方式是對直接在方法上加鎖的一個優(yōu)化, 好處在于只有第一次初始化獲取了鎖.

后續(xù)調(diào)用getInstance已經(jīng)是無鎖狀態(tài). 只是寫法上稍微繁瑣點.

至于為什么要volatile關(guān)鍵字, 主要涉及到j(luò)dk指令重排, 詳見之前的博文: Java內(nèi)存模型與指令重排

懶漢式: 使用靜態(tài)內(nèi)部類

public class Singleton { private static class LazyHolder { private static final Singleton INSTANCE = new Singleton(); } private Singleton (){} public static final Singleton getInstance() { return LazyHolder.INSTANCE; }}

該方式既解決了同步問題, 也解決了寫法繁瑣問題. 推薦使用改寫法.

缺點在于無法響應(yīng)事件來重新初始化INSTANCE.

餓漢式

public class Singleton1 { private Singleton1() {} private static final Singleton1 single = new Singleton1(); public static Singleton1 getInstance() { return single; }}

缺點在于對象在一開始就直接初始化了.

Future模式

該模式的核心思想是異步調(diào)用. 有點類似于異步的ajax請求.

當(dāng)調(diào)用某個方法時, 可能該方法耗時較久, 而在主函數(shù)中也不急于立刻獲取結(jié)果.

因此可以讓調(diào)用者立刻返回一個憑證, 該方法放到另外線程執(zhí)行,后續(xù)主函數(shù)拿憑證再去獲取方法的執(zhí)行結(jié)果即可, 其結(jié)構(gòu)圖如下

一文搞定!Java高并發(fā)之設(shè)計模式

jdk中內(nèi)置了Future模式的支持, 其接口如下:

一文搞定!Java高并發(fā)之設(shè)計模式

通過FutureTask實現(xiàn)

注意其中兩個耗時操作.

  • 如果doOtherThing耗時2s, 則整個函數(shù)耗時2s左右.

  • 如果doOtherThing耗時0.2s, 則整個函數(shù)耗時取決于RealData.costTime, 即1s左右結(jié)束.


public class FutureDemo1 {
public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<String> future = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { return new RealData().costTime(); } }); ExecutorService service = Executors.newCachedThreadPool(); service.submit(future);
System.out.println("RealData方法調(diào)用完畢"); // 模擬主函數(shù)中其他耗時操作 doOtherThing(); // 獲取RealData方法的結(jié)果 System.out.println(future.get()); }
private static void doOtherThing() throws InterruptedException { Thread.sleep(2000L); }}
class RealData {
public String costTime() { try { // 模擬RealData耗時操作 Thread.sleep(1000L); return "result"; } catch (InterruptedException e) { e.printStackTrace(); } return "exception"; }
}

通過Future實現(xiàn)

與上述FutureTask不同的是, RealData需要實現(xiàn)Callable接口.

public class FutureDemo2 {
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newCachedThreadPool(); Future<String> future = service.submit(new RealData2());
System.out.println("RealData2方法調(diào)用完畢"); // 模擬主函數(shù)中其他耗時操作 doOtherThing(); // 獲取RealData2方法的結(jié)果 System.out.println(future.get()); }
private static void doOtherThing() throws InterruptedException { Thread.sleep(2000L); }}
class RealData2 implements Callable<String>{
public String costTime() { try { // 模擬RealData耗時操作 Thread.sleep(1000L); return "result"; } catch (InterruptedException e) { e.printStackTrace(); } return "exception"; }
@Override public String call() throws Exception { return costTime(); }}

另外Future本身還提供了一些額外的簡單控制功能, 其API如下

// 取消任務(wù) boolean cancel(boolean mayInterruptIfRunning); // 是否已經(jīng)取消 boolean isCancelled(); // 是否已經(jīng)完成 boolean isDone(); // 取得返回對象 V get() throws InterruptedException, ExecutionException; // 取得返回對象, 并可以設(shè)置超時時間 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

生產(chǎn)消費者模式

生產(chǎn)者-消費者模式是一個經(jīng)典的多線程設(shè)計模式. 它為多線程間的協(xié)作提供了良好的解決方案。

在生產(chǎn)者-消費者模式中,通常由兩類線程,即若干個生產(chǎn)者線程和若干個消費者線程。

生產(chǎn)者線程負責(zé)提交用戶請求,消費者線程則負責(zé)具體處理生產(chǎn)者提交的任務(wù)。

生產(chǎn)者和消費者之間則通過共享內(nèi)存緩沖區(qū)進行通信, 其結(jié)構(gòu)圖如下

一文搞定!Java高并發(fā)之設(shè)計模式

PCData為我們需要處理的元數(shù)據(jù)模型, 生產(chǎn)者構(gòu)建PCData, 并放入緩沖隊列.

消費者從緩沖隊列中獲取數(shù)據(jù), 并執(zhí)行計算.

生產(chǎn)者核心代碼

while(isRunning) { Thread.sleep(r.nextInt(SLEEP_TIME)); data = new PCData(count.incrementAndGet); // 構(gòu)造任務(wù)數(shù)據(jù) System.out.println(data + " is put into queue"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { // 將數(shù)據(jù)放入隊列緩沖區(qū)中 System.out.println("faild to put data : " + data); } }

消費者核心代碼

while (true) { PCData data = queue.take(); // 提取任務(wù) if (data != null) { // 獲取數(shù)據(jù), 執(zhí)行計算操作 int re = data.getData() * 10; System.out.println("after cal, value is : " + re); Thread.sleep(r.nextInt(SLEEP_TIME));           }       }

生產(chǎn)消費者模式可以有效對數(shù)據(jù)解耦, 優(yōu)化系統(tǒng)結(jié)構(gòu).

降低生產(chǎn)者和消費者線程相互之間的依賴與性能要求.

一般使用BlockingQueue作為數(shù)據(jù)緩沖隊列, 他是通過鎖和阻塞來實現(xiàn)數(shù)據(jù)之間的同步,

如果對緩沖隊列有性能要求, 則可以使用基于CAS無鎖設(shè)計的ConcurrentLinkedQueue.

分而治之

嚴(yán)格來講, 分而治之不算一種模式, 而是一種思想.

它可以將一個大任務(wù)拆解為若干個小任務(wù)并行執(zhí)行, 提高系統(tǒng)吞吐量.

我們主要講兩個場景, Master-Worker模式, ForkJoin線程池.

Master-Worker模式

該模式核心思想是系統(tǒng)由兩類進行協(xié)助工作: Master進程, Worker進程.

Master負責(zé)接收與分配任務(wù), Worker負責(zé)處理任務(wù). 當(dāng)各個Worker處理完成后,

將結(jié)果返回給Master進行歸納與總結(jié).

假設(shè)一個場景, 需要計算100個任務(wù), 并對結(jié)果求和, Master持有10個子進程.

一文搞定!Java高并發(fā)之設(shè)計模式

Master代碼

public class MasterDemo { // 盛裝任務(wù)的集合 private ConcurrentLinkedQueue<TaskDemo> workQueue = new ConcurrentLinkedQueue<TaskDemo>(); // 所有worker private HashMap<String, Thread> workers = new HashMap<>(); // 每一個worker并行執(zhí)行任務(wù)的結(jié)果 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>();
public MasterDemo(WorkerDemo worker, int workerCount) { // 每個worker對象都需要持有queue的引用, 用于領(lǐng)任務(wù)與提交結(jié)果 worker.setResultMap(resultMap); worker.setWorkQueue(workQueue); for (int i = 0; i < workerCount; i++) { workers.put("子節(jié)點: " + i, new Thread(worker)); } }
// 提交任務(wù) public void submit(TaskDemo task) { workQueue.add(task); }
// 啟動所有的子任務(wù) public void execute(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { entry.getValue().start(); } }
// 判斷所有的任務(wù)是否執(zhí)行結(jié)束 public boolean isComplete() { for (Map.Entry<String, Thread> entry : workers.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } }
return true; }
// 獲取最終匯總的結(jié)果 public int getResult() { int result = 0; for (Map.Entry<String, Object> entry : resultMap.entrySet()) { result += Integer.parseInt(entry.getValue().toString()); }
return result; }
}

Worker代碼

public class WorkerDemo implements Runnable{
private ConcurrentLinkedQueue<TaskDemo> workQueue; private ConcurrentHashMap<String, Object> resultMap;
@Override public void run() { while (true) { TaskDemo input = this.workQueue.poll(); // 所有任務(wù)已經(jīng)執(zhí)行完畢 if (input == null) { break; } // 模擬對task進行處理, 返回結(jié)果 int result = input.getPrice(); this.resultMap.put(input.getId() + "", result); System.out.println("任務(wù)執(zhí)行完畢, 當(dāng)前線程: " + Thread.currentThread().getName()); } }
public ConcurrentLinkedQueue<TaskDemo> getWorkQueue() { return workQueue; }
public void setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue) { this.workQueue = workQueue; }
public ConcurrentHashMap<String, Object> getResultMap() { return resultMap; }
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; }}
public class TaskDemo {
private int id; private String name; private int price;
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getPrice() { return price; }
public void setPrice(int price) { this.price = price; }}

主函數(shù)測試

MasterDemo master = new MasterDemo(new WorkerDemo(), 10); for (int i = 0; i < 100; i++) { TaskDemo task = new TaskDemo(); task.setId(i); task.setName("任務(wù)" + i); task.setPrice(new Random().nextInt(10000)); master.submit(task); }
master.execute();
while (true) { if (master.isComplete()) { System.out.println("執(zhí)行的結(jié)果為: " + master.getResult()); break; } }


ForkJoin線程池

該線程池是jdk7之后引入的一個并行執(zhí)行任務(wù)的框架, 其核心思想也是將任務(wù)分割為子任務(wù),

有可能子任務(wù)還是很大, 還需要進一步拆解, 最終得到足夠小的任務(wù).

將分割出來的子任務(wù)放入雙端隊列中, 然后幾個啟動線程從雙端隊列中獲取任務(wù)執(zhí)行.

子任務(wù)執(zhí)行的結(jié)果放到一個隊列里, 另起線程從隊列中獲取數(shù)據(jù), 合并結(jié)果.

一文搞定!Java高并發(fā)之設(shè)計模式

假設(shè)我們的場景需要計算從0到20000000L的累加求和. CountTask繼承自RecursiveTask, 可以攜帶返回值.

每次分解大任務(wù), 簡單的將任務(wù)劃分為100個等規(guī)模的小任務(wù), 并使用fork()提交子任務(wù).

在子任務(wù)中通過THRESHOLD設(shè)置子任務(wù)分解的閾值, 如果當(dāng)前需要求和的總數(shù)大于THRESHOLD, 則子任務(wù)需要再次分解,如果子任務(wù)可以直接執(zhí)行, 則進行求和操作, 返回結(jié)果. 最終等待所有的子任務(wù)執(zhí)行完畢, 對所有結(jié)果求和.

public class CountTask extends RecursiveTask<Long>{ // 任務(wù)分解的閾值 private static final int THRESHOLD = 10000; private long start; private long end;

public CountTask(long start, long end) { this.start = start; this.end = end; }
public Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { // 分成100個小任務(wù) long step = (start + end) / 100; ArrayList<CountTask> subTasks = new ArrayList<CountTask>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) { lastOne = end; } CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; // 將子任務(wù)推向線程池 subTasks.add(subTask); subTask.fork(); }
for (CountTask task : subTasks) { // 對結(jié)果進行join sum += task.join(); } } return sum; }
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); // 累加求和 0 -> 20000000L CountTask task = new CountTask(0, 20000000L); ForkJoinTask<Long> result = pool.submit(task); System.out.println("sum result : " + result.get()); }}

ForkJoin線程池使用一個無鎖的棧來管理空閑線程, 如果一個工作線程暫時取不到可用的任務(wù), 則可能被掛起.

掛起的線程將被壓入由線程池維護的棧中, 待將來有任務(wù)可用時, 再從棧中喚醒這些線程.


作者:大道方圓

來源:www.cnblogs.com/xdecode/p/9137793.html


免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

21ic電子網(wǎng)

掃描二維碼,關(guān)注更多精彩內(nèi)容

本站聲明: 本文章由作者或相關(guān)機構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
關(guān)閉
關(guān)閉