- N +

java多線程機制(單線程和多線程處理機制)

大家好,關于java多線程機制很多朋友都還不太明白,不過沒關系,因為今天小編就來為大家分享關于單線程和多線程處理機制的知識點,相信應該可以解決大家的一些困惑和問題,如果碰巧可以解決您的問題,還望關注下本站哦,希望對各位有所幫助!

Java多線程同步內部如何實現的

提示

請帶著這些問題繼續后文,會很大程度上幫助你更好的理解相關知識點。@pdai

為什么要有線程池?Java是實現和管理線程池有哪些方式?請簡單舉例如何使用。為什么很多公司不允許使用Executors去創建線程池?那么推薦怎么使用呢?ThreadPoolExecutor有哪些核心的配置參數?請簡要說明ThreadPoolExecutor可以創建哪是哪三種線程池呢?當隊列滿了并且worker的數量達到maxSize的時候,會怎么樣?說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略?默認是什么策略?簡要說下線程池的任務執行機制?execute–>addWorker–>runworker(getTask)線程池中任務是如何提交的?線程池中任務是如何關閉的?在配置線程池的時候需要考慮哪些配置因素?如何監控線程池的狀態?為什么要有線程池

線程池能夠對線程進行統一分配,調優和監控:

降低資源消耗(線程無限制地創建,然后使用完畢后銷毀)提高響應速度(無須創建線程)提高線程的可管理性ThreadPoolExecutor例子

Java是如何實現和管理線程池的?

從JDK5開始,把工作單元與執行機制分離開來,工作單元包括Runnable和Callable,而執行機制由Executor框架提供。

WorkerThread

SimpleThreadPool

程序中我們創建了固定大小為五個工作線程的線程池。然后分配給線程池十個工作,因為線程池大小為五,它將啟動五個工作線程先處理五個工作,其他的工作則處于等待狀態,一旦有工作完成,空閑下來工作線程就會撿取等待隊列里的其他工作進行執行。

這里是以上程序的輸出。

輸出表明線程池中至始至終只有五個名為"pool-1-thread-1"到"pool-1-thread-5"的五個線程,這五個線程不隨著工作的完成而消亡,會一直存在,并負責執行分配給線程池的任務,直到線程池消亡。

Executors類提供了使用了ThreadPoolExecutor的簡單的ExecutorService實現,但是ThreadPoolExecutor提供的功能遠不止于此。我們可以在創建ThreadPoolExecutor實例時指定活動線程的數量,我們也可以限制線程池的大小并且創建我們自己的RejectedExecutionHandler實現來處理不能適應工作隊列的工作。

這里是我們自定義的RejectedExecutionHandler接口的實現。

RejectedExecutionHandlerImpl.java

ThreadPoolExecutor提供了一些方法,我們可以使用這些方法來查詢executor的當前狀態,線程池大小,活動線程數量以及任務數量。因此我是用來一個監控線程在特定的時間間隔內打印executor信息。

MyMonitorThread.java

這里是使用ThreadPoolExecutor的線程池實現例子。

WorkerPool.java

注意在初始化ThreadPoolExecutor時,我們保持初始池大小為2,最大池大小為4而工作隊列大小為2。因此如果已經有四個正在執行的任務而此時分配來更多任務的話,工作隊列將僅僅保留他們(新任務)中的兩個,其他的將會被RejectedExecutionHandlerImpl處理。

上面程序的輸出可以證實以上觀點。

注意executor的活動任務、完成任務以及所有完成任務,這些數量上的變化。我們可以調用shutdown()方法來結束所有提交的任務并終止線程池。

ThreadPoolExecutor使用詳解

其實java線程池的實現原理很簡單,說白了就是一個線程集合workerSet和一個阻塞隊列workQueue。當用戶向線程池提交一個任務(也就是線程)時,線程池會先將任務放入workQueue中。workerSet中的線程會不斷的從workQueue中獲取線程然后執行。當workQueue中沒有任務的時候,worker就會阻塞,直到隊列中有任務了就取出來繼續執行。

Execute原理

當一個任務提交至線程池之后:

線程池首先當前運行的線程數量是否少于corePoolSize。如果是,則創建一個新的工作線程來執行任務。如果都在執行任務,則進入2.判斷BlockingQueue是否已經滿了,倘若還沒有滿,則將線程放入BlockingQueue。否則進入3.如果創建一個新的工作線程將使當前運行的線程數量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務。

當ThreadPoolExecutor創建新線程時,通過CAS來更新線程池的狀態ctl.

參數corePoolSize線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等于corePoolSize,即使有其他空閑線程能夠執行新來的任務,也會繼續創建線程;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建并啟動所有核心線程。workQueue用來保存等待被執行的任務的阻塞隊列.在JDK中提供了如下阻塞隊列:具體可以參考JUC集合:BlockQueue詳解ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;LinkedBlockingQueue:基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue:具有優先級的無界阻塞隊列;

LinkedBlockingQueue比ArrayBlockingQueue在插入刪除節點性能方面更優,但是二者在put(),take()任務的時均需要加鎖,SynchronousQueue使用無鎖算法,根據節點的狀態判斷執行,而不需要用到鎖,其核心是Transfer.transfer().

maximumPoolSize線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小于maximumPoolSize;當阻塞隊列是無界隊列,則maximumPoolSize則不起作用,因為無法提交至核心線程池的線程會一直持續地放入workQueue.keepAliveTime線程空閑時的存活時間,即當線程沒有任務執行時,該線程繼續存活的時間;默認情況下,該參數只在線程數大于corePoolSize時才有用,超過這個時間的空閑線程將被終止;unitkeepAliveTime的單位threadFactory創建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名。默認為DefaultThreadFactoryhandler線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:AbortPolicy:直接拋出異常,默認策略;CallerRunsPolicy:用調用者所在的線程來執行任務;DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;DiscardPolicy:直接丟棄任務;

當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。

三種類型newFixedThreadPool

線程池的線程數量達corePoolSize后,即使線程池沒有可執行任務時,也不會釋放線程。

FixedThreadPool的工作隊列為無界隊列LinkedBlockingQueue(隊列容量為Integer.MAX_VALUE),這會導致以下問題:

線程池里的線程數量不超過corePoolSize,這導致了maximumPoolSize和keepAliveTime將會是個無用參數由于使用了無界隊列,所以FixedThreadPool永遠不會拒絕,即飽和策略失效newSingleThreadExecutor

初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行.

由于使用了無界隊列,所以SingleThreadPool永遠不會拒絕,即飽和策略失效

newCachedThreadPool

線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列;和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷;執行過程與前兩種稍微不同:

主線程調用SynchronousQueue的offer()方法放入task,倘若此時線程池中有空閑的線程嘗試讀取SynchronousQueue的task,即調用了SynchronousQueue的poll(),那么主線程將該task交給空閑線程.否則執行(2)當線程池為空或者沒有空閑的線程,則創建新的線程執行任務.執行完任務的線程倘若在60s內仍空閑,則會被終止.因此長時間空閑的CachedThreadPool不會持有任何線程資源.關閉線程池

遍歷線程池中的所有線程,然后逐個調用線程的interrupt方法來中斷線程.

關閉方式-shutdown

將線程池里的線程狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程.

關閉方式-shutdownNow

將線程池里的線程狀態設置成STOP狀態,然后停止所有正在執行或暫停任務的線程.只要調用這兩個關閉方法中的任意一個,isShutDown()返回true.當所有任務都成功關閉了,isTerminated()返回true.

ThreadPoolExecutor源碼詳解幾個關鍵屬性內部狀態

其中AtomicInteger變量ctl的功能非常強大:利用低29位表示線程池中線程數,通過高3位表示線程池的運行狀態:

RUNNING:-1<<COUNT_BITS,即高3位為111,該狀態的線程池會接收新任務,并處理阻塞隊列中的任務;SHUTDOWN:0<<COUNT_BITS,即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;STOP:1<<COUNT_BITS,即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;TIDYING:2<<COUNT_BITS,即高3位為010,所有的任務都已經終止;TERMINATED:3<<COUNT_BITS,即高3位為011,terminated()方法已經執行完成任務的執行

execute–>addWorker–>runworker(getTask)

線程池的工作線程通過Woker類實現,在ReentrantLock鎖的保證下,把Woker實例插入到HashSet后,并啟動Woker中的線程。從Woker類的構造方法實現可以發現:線程工廠在創建線程thread時,將Woker實例本身this作為參數傳入,當執行start方法啟動線程thread時,本質是執行了Worker的runWorker方法。firstTask執行完成之后,通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;

execute()方法

ThreadPoolExecutor.execute(task)實現了Executor.execute(task)

為什么需要doublecheck線程池的狀態?

在多線程環境下,線程池的狀態時刻在變化,而ctl.get()是非原子操作,很有可能剛獲取了線程池狀態后線程池狀態就改變了。判斷是否將command加入workque是線程池之前的狀態。倘若沒有doublecheck,萬一線程池處于非running狀態(在多線程環境下很有可能發生),那么command永遠不會執行。

addWorker方法

從方法execute的實現可以看出:addWorker主要負責創建新的線程并執行任務線程池創建新線程執行任務時,需要獲取全局鎖:

Worker類的runworker方法繼承了AQS類,可以方便的實現工作線程的中止操作;實現了Runnable接口,可以將自身作為一個任務在工作線程中執行;當前提交的任務firstTask作為參數傳入Worker的構造方法;

一些屬性還有構造方法:

runWorker方法是線程池的核心:

線程啟動之后,通過unlock方法釋放鎖,設置AQS的state為0,表示運行可中斷;Worker執行firstTask或從workQueue中獲取任務:進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)檢查線程池狀態,倘若線程池處于中斷狀態,當前線程將中斷。執行beforeExecute執行任務的run方法執行afterExecute方法解鎖操作

通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;

getTask方法

下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個方法我們可以看出線程池是怎么讓超過corePoolSize的那部分worker銷毀的。

注意這里一段代碼是keepAliveTime起作用的關鍵:

allowCoreThreadTimeOut為false,線程即使空閑也不會被銷毀;倘若為ture,在keepAliveTime內仍空閑則會被銷毀。

如果線程允許空閑等待而不被銷毀timed==false,workQueue.take任務:如果阻塞隊列為空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務,并執行;

如果線程不允許無休止空閑timed==true,workQueue.poll任務:如果在keepAliveTime時間內,阻塞隊列還是沒有任務,則返回null;

任務的提交submit任務,等待線程池execute執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點并保存在waiters鏈表中,并阻塞等待運行結果;FutureTask任務執行完成后,通過UNSAFE設置waiters相應的waitNode為null,并通過LockSupport類unpark方法喚醒主線程;

在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。

Callable接口類似于Runnable,只是Runnable沒有返回值。Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到異步執行任務各種結果;Future.get方法會導致主線程阻塞,直到Callable任務執行完成;submit方法

AbstractExecutorService.submit()實現了ExecutorService.submit()可以獲取執行完的返回值,而ThreadPoolExecutor是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法。

通過submit方法提交的Callable任務會被封裝成了一個FutureTask對象。通過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法;

FutureTask對象

publicclassFutureTask<V>implementsRunnableFuture<V>可以將FutureTask提交至線程池中等待被執行(通過FutureTask的run方法來執行)

內部狀態

內部狀態的修改通過sun.misc.Unsafe修改

get方法

內部通過awaitDone方法對主線程進行阻塞,具體實現如下:

如果主線程被中斷,則拋出中斷異常;

判斷FutureTask當前的state,如果大于COMPLETING,說明任務已經執行完成,則直接返回;如果當前state等于COMPLETING,說明任務已經執行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;通過WaitNode類封裝當前線程,并通過UNSAFE添加到waiters鏈表;最終通過LockSupport的park或parkNanos掛起線程;

run方法

FutureTask.run方法是在線程池中被執行的,而非主線程

通過執行Callable任務的call方法;如果call執行成功,則通過set方法保存結果;如果call執行有異常,則通過setException保存異常;任務的關閉

shutdown方法會將線程池的狀態設置為SHUTDOWN,線程池進入這個狀態后,就拒絕再接受任務,然后會將剩余的任務全部執行完

shutdownNow做的比較絕,它先將線程池狀態設置為STOP,然后拒絕所有提交的任務。最后中斷左右正在運行中的worker,然后清空任務隊列。

更深入理解

為什么線程池不允許使用Executors去創建?推薦方式是什么?

線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。說明:Executors各個方法的弊端:

newFixedThreadPool和newSingleThreadExecutor:??主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。newCachedThreadPool和newScheduledThreadPool:??主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。推薦方式1

首先引入:commons-lang3包

推薦方式2

首先引入:com.google.guava包

推薦方式3

spring配置線程池方式:自定義線程工廠bean需要實現ThreadFactory,可參考該接口的其它默認實現類,使用方式直接注入bean調用execute(Runnabletask)方法即可

配置線程池需要考慮因素

從任務的優先級,任務的執行時間長短,任務的性質(CPU密集/IO密集),任務的依賴關系這四個角度來分析。并且近可能地使用有界的工作隊列。

性質不同的任務可用使用不同規模的線程池分開處理:

CPU密集型:盡可能少的線程,Ncpu+1IO密集型:盡可能多的線程,Ncpu*2,比如數據庫連接池混合型:CPU密集型的任務與IO密集型任務的執行時間差別較小,拆分為兩個線程池;否則沒有必要拆分。監控線程池的狀態

可以使用ThreadPoolExecutor以下方法:

getTaskCount()Returnstheapproximatetotalnumberoftasksthathaveeverbeenscheduledforexecution.getCompletedTaskCount()Returnstheapproximatetotalnumberoftasksthathavecompletedexecution.返回結果少于getTaskCount()。getLargestPoolSize()Returnsthelargestnumberofthreadsthathaveeversimultaneouslybeeninthepool.返回結果小于等于maximumPoolSizegetPoolSize()Returnsthecurrentnumberofthreadsinthepool.getActiveCount()Returnstheapproximatenumberofthreadsthatareactivelyexecutingtasks.參考文章《Java并發編程藝術》https://www.jianshu.com/p/87bff5cc8d8chttps://blog.csdn.net/programmer_at/article/details/79799267https://blog.csdn.net/u013332124/article/details/79587436https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice

由于問答代碼塊插入受限,部分代碼未完全展示,若有需要可閱讀原文:戳我閱讀原文

Java:關于多線程與多核,如何將多核都利用上呢

你自己寫個多線程的程序跑起來,把任務管理器打開,打開“性能”選項卡,觀察你就發現了,其實不管你你的是4核,8核,1024核,基本都是在一個格子里有動作的,跟理論上幾個線程就在幾個核里跑不一致的,操作系統自身的設計導致的。核雖然多,縣城雖然多,但是還沒有真正設計成幾個線程就在幾個核里跑的算法模式。至于怎么都利用上不是你說了算,os說了算。別想太多。

cpu多線程和jvm多線程

CPU多線程和JVM多線程是兩個不同的概念,分別涉及到不同層面的多線程處理。

1.CPU多線程(CPUMultithreading):CPU多線程是指CPU在執行任務時同時處理多個線程的能力。現代的多核CPU通常支持硬件多線程技術,例如超線程(Hyper-Threading)技術,它可以將單個物理核心模擬成兩個邏輯核心,使得每個物理核心可以同時執行兩個線程。這樣可以提高CPU的并行處理能力,使得多個線程可以在同一時間片內并行執行,從而提高系統的整體性能。

2.JVM多線程(JVMMultithreading):JVM多線程是指Java虛擬機(JVM)在執行Java程序時支持多線程的能力。Java語言內置了對多線程的支持,可以創建和管理多個線程,并通過JVM的線程調度器在不同的線程之間進行切換。多線程在Java程序中可以實現并發執行,提高程序的性能和響應能力。通過使用Java的并發庫(如java.util.concurrent包),可以更方便地實現線程間的同步、互斥和協作。

總結起來,CPU多線程是指CPU硬件層面上的多線程處理能力,而JVM多線程是指在Java虛擬機上執行的Java程序所具備的多線程能力。CPU多線程是通過硬件技術實現的,而JVM多線程是通過編程語言和虛擬機提供的機制來實現的。在使用Java編寫多線程程序時,可以充分利用CPU的多線程能力,讓程序在多個線程之間并行執行,提高系統的性能和效率。

Java初學有必要深入多線程編程嗎,如何學習

多線程作為JAVA學習的主要難點之一,有開發復雜,出現問題難以復現等特性,但卻是不得不掌握的知識點,因為JAVA中使用多線程的場景實在是太多了。

先看下多線程(所有語言)的發展背景:早期的計算器都是單核CPU,想要獲得更高的性能只能是擴展硬件(摩爾定律),但是很快硬件的發展達到了瓶頸,要提高計算能力只能是橫向擴展(增加計算機CPU核心,現在也沒有單核心的計算機了吧),因為一個CPU在同一個時間點上只能處理一個線程,現在的服務器少則16核,如果使用單線程編程,相當于你只用了1/16的CPU資源,暴殄天物!所以多線程是很有必要學習的。

多線程可以用來在什么場景使用呢?

1,密集型計算:將一個大任務進行拆分,使用多線程進行執行,假設從1加到100萬,你單線程需要8分鐘,然后你分為16個線程做計算(半分鐘),加上匯總的時間和創建銷毀線程的時間,不會超過一分鐘,7分鐘喝咖啡足夠了吧。

下載大文件的時候拆分成幾個小文件,充分利用帶寬!

2,異步調用:多線程和異步不是一個概念,但是異步一定是多線程的,如果是同步調用發生阻塞的時候,CPU資源就浪費了,但如果是異步,可以執行別的線程,提高CPU使用率!

3,web容器技術:一個請求使用一個線程去處理(多數容器已棄用,改用netty架構,一個線程遍歷連接,分發給線程池進行任務處理)

4,線程池:例如數據庫連接池,JAVA中的線程池等,線程池創建多個線程來處理數據,避免頻繁創建線程的開銷!

5,批處理:用于多個batch任務可并行處理,batch任務中的job可并行處理的情況!

可以說多線程代表著高效率的運行程序,所以有很大的理由學習好多線程!

怎么學好多線程呢?

①,明白計算機原理:多核CPU的運行方式,線程執行,什么時候容易阻塞,寄存器,內存(可對照理解JAVA內存模型)等!

②,線程基本操作:線程的創建,實現,開始線程,掌握線程狀態,線程中斷,線程休眠與喚醒等!

③,掌握多線程常用技術:線程池的幾種創建方式,使用synchonize,讀寫鎖等加鎖操作,使用阻塞隊列實現順序執行,使用threadlocal實現線程本地變量,使用future實現異步回調,使用fork-join框架并行處理任務,JAVA8的并行流式處理也是不錯的選擇!

④,學會拋棄多線程:netty使用網絡IO多路復用避免多線程開銷,redis使用單線程才能被作為分布式鎖,全局唯一id生成的線程安全策略!

不是說多線程復雜就不用,也不是說多線程高效就一定用,一切根據場景來定,多線程開發中的實際案例,可隨時交流,更多的技術分享,敬請關注。。。

java多線程知識講解

對于Java編程的多線程知識,我們還是要了解很多的,首先我們要知道。java中的線程分為兩種:守護線程(Daemon)和用戶線程(User)。任何線程都可以設置為守護線程和用戶線程,通過方法Thread.setDaemon(boolon);true則把該線程設置為守護線程,反之則為用戶線程。

Thread.setDaemon()必須在Thread.start()之前調用,否則運行時會拋出異常。

OK,關于java多線程機制和單線程和多線程處理機制的內容到此結束了,希望對大家有所幫助。

返回列表
上一篇:
下一篇: