1. การใช้พูลเธรดขั้นพื้นฐาน
1.1. ทำไมคุณถึงต้องการพูลเธรด?
ในธุรกิจประจำวันหากเราต้องการใช้มัลติเธรดเราจะสร้างเธรดก่อนที่ธุรกิจจะเริ่มต้นและทำลายเธรดหลังจากธุรกิจสิ้นสุดลง อย่างไรก็ตามสำหรับธุรกิจการสร้างและการทำลายของหัวข้อไม่มีส่วนเกี่ยวข้องกับธุรกิจของตัวเองและสนใจเฉพาะงานที่ดำเนินการโดยเธรด ดังนั้นฉันหวังว่าจะใช้ซีพียูให้ได้มากที่สุดเท่าที่จะเป็นไปได้ในการทำงานแทนที่จะสร้างและทำลายเธรดที่ไม่เกี่ยวข้องกับธุรกิจ พูลเธรดแก้ปัญหานี้ ฟังก์ชั่นของพูลเธรดคือการนำเธรดกลับมาใช้ใหม่
1.2. JDK ให้การสนับสนุนอะไรสำหรับเรา
ไดอะแกรมคลาสที่เกี่ยวข้องใน JDK จะแสดงในรูปด้านบน
หมวดพิเศษหลายประเภทที่จะกล่าวถึง
คลาส callable นั้นคล้ายกับคลาสที่เรียกใช้งานได้ แต่ความแตกต่างคือ callable มีค่าส่งคืน
Threadpoolexecutor เป็นการนำพูลเธรดสำคัญ
ผู้บริหารเป็นคลาสโรงงาน
1.3. การใช้พูลเธรด
1.3.1. ประเภทของพูลเธรด
Public ExecutorService NewFixedThreadPool (int nthreads) {ส่งคืน ThreadPoolexecutor ใหม่ (nthreads, nthreads, 0l, timeunit.milliseconds, LinkedBlockingQueue ใหม่ <Runnable> ()); threadpoolexecutor (1, 1, 0L, timeUnit.milliseconds, ใหม่ LinkedBlockingQueue <runnable> ()));} executorservice สาธารณะ newCachedThreadpool () {ส่งคืน threadpoolexecutor ใหม่ (0, integer.max_value, 60Lจากมุมมองของวิธีการเป็นที่ชัดเจนว่า recidethreadpool, singlethreathedexecutor และ cachedthreadpool เป็นอินสแตนซ์ที่แตกต่างกันของ threadpoolexecutor แต่พารามิเตอร์แตกต่างกัน
Public Threadpoolexecutor (int corepoolsize, int maximumpoolsize, regalivetime ยาว, หน่วยเวลา, การปิดกั้น <rovnable> workqueue) {this (corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue มาอธิบายสั้น ๆ เกี่ยวกับความหมายของพารามิเตอร์ในตัวสร้าง Threadpoolexecutor
ด้วยวิธีนี้การดูที่ recidethreadpool ที่กล่าวถึงข้างต้นจำนวนคอร์และจำนวนเธรดสูงสุดเท่ากันดังนั้นเธรดจะไม่ถูกสร้างและทำลายในระหว่างการทำงาน เมื่อจำนวนงานมีขนาดใหญ่และเธรดในพูลเธรดไม่สามารถพึงพอใจงานจะถูกบันทึกลงใน LinkedBlockingQueue และขนาดของ LinkedBlockingQueue เป็นจำนวนเต็ม MAX_VALUE ซึ่งหมายความว่าการเพิ่มงานอย่างต่อเนื่องจะทำให้หน่วยความจำใช้มากขึ้นเรื่อย ๆ
Cachedthreadpool แตกต่างกัน หมายเลขเธรดหลักของมันคือ 0 จำนวนที่เก็บสูงสุดคือจำนวนเต็ม max_value และคิวการปิดกั้นของมันคือ synchronousqueue ซึ่งเป็นคิวพิเศษและขนาดของมันคือ 0 เนื่องจากจำนวนเธรดหลักคือ 0 จึงจำเป็นต้องเพิ่มงานลงในแบบซิงโครนัส คิวนี้สามารถประสบความสำเร็จได้ก็ต่อเมื่อเธรดหนึ่งเพิ่มข้อมูลจากมันและเธรดอื่นจะได้รับข้อมูลจากมัน การเพิ่มข้อมูลไปยังคิวนี้เพียงอย่างเดียวจะส่งคืนความล้มเหลว เมื่อการส่งคืนล้มเหลวพูลเธรดจะเริ่มขยายเธรดซึ่งเป็นสาเหตุที่จำนวนเธรดใน CachedThreadPool ไม่ได้รับการแก้ไข เมื่อเธรดไม่ได้ใช้สำหรับ 60s เธรดจะถูกทำลาย
1.4. ตัวอย่างเล็ก ๆ ของการใช้พูลเธรด
1.4.1. พูลเธรดง่ายๆ
นำเข้า java.util.concurrent.executorservice; นำเข้า java.util.concurrent.executors; ชั้นเรียนสาธารณะ Threadpooldemo {คลาสคงที่ mytask ใช้งาน {@override public void run () {system.out.println (system.currenttimemillis () ลอง {thread.sleep (1,000); } catch (exception e) {e.printstacktrace (); }}} โมฆะคงที่สาธารณะหลัก (สตริง [] args) {myTask myTask = new myTask (); ExecutorService ES = Executors.NewFixedThreadPool (5); สำหรับ (int i = 0; i <10; i ++) {es.submit (mytask); - เนื่องจากใช้ Newfixedthreadpool (5) แต่เริ่มต้น 10 เธรด 5 จะถูกดำเนินการครั้งละ 5 และเป็นที่ชัดเจนว่าการใช้ซ้ำของเธรดจะถูกนำมาใช้ใหม่ ThreadID ซ้ำกันนั่นคือ 5 งานแรกและงาน 5 ครั้งสุดท้ายจะถูกดำเนินการโดยเธรดชุดเดียวกัน
สิ่งที่ใช้ที่นี่
es.submit (mytask);
นอกจากนี้ยังมีวิธีการส่ง:
es.execute (mytask);
ความแตกต่างคือการส่งจะส่งคืนวัตถุในอนาคตซึ่งจะมีการแนะนำในภายหลัง
1.4.2.scheduledthreadpool
นำเข้า java.util.concurrent.executors; นำเข้า java.util.concurrent.scheduledexecutorservice; นำเข้า java.util.concurrent.timeUnit; ชั้นเรียนสาธารณะ // หากงานก่อนหน้ายังไม่เสร็จสิ้นการจัดส่งจะไม่เริ่ม SES.SCHEDULEWITHFIXEDDELAY (ใหม่ runnable () {@Override โมฆะสาธารณะ run () {ลอง {thread.sleep (1000); system.out.println (System.currentTimeMillis ()/1000); 0 วินาทีจากนั้นดำเนินการทุก ๆ 2 วินาทีในรอบ}}เอาท์พุท:
1454832514
1454832517
1454832520
1454832523
1454832526
-
เนื่องจากการดำเนินการงานใช้เวลา 1 วินาทีการจัดตารางงานจะต้องรอให้งานก่อนหน้าเสร็จสมบูรณ์ นั่นคือทุก ๆ 2 วินาทีที่นี่หมายความว่างานใหม่จะเริ่มต้น 2 วินาทีหลังจากงานก่อนหน้านี้เสร็จสิ้น
2. ขยายและปรับปรุงพูลเธรด
2.1. อินเตอร์เฟสการโทรกลับ
มี APIs โทรกลับในพูลเธรดเพื่อให้เรามีการดำเนินการเพิ่มเติม
ExecutorService ES = New ThreadPoolexecutor (5, 5, 0L, TimeUnit.Seconds, LinkedBlockingQueue ใหม่ <Runnable> ()) {@Override Void Void beforeexecute (เธรด t, runnable r) } @Override void protected aftExecute (runnable r, throwable t) {system.out.println ("การดำเนินการเสร็จสมบูรณ์"); } @Override Void Protected สิ้นสุด () {System.out.println ("Thread Pool Exit"); -เราสามารถใช้วิธีการ beforeexecute, aftExecute และวิธีการสิ้นสุดของ ThreadPoolexecutor เพื่อใช้การจัดการบันทึกหรือการดำเนินการอื่น ๆ ก่อนและหลังการดำเนินการเธรดออกจากพูลเธรด
2.2. กลยุทธ์การปฏิเสธ
บางครั้งงานหนักมากส่งผลให้ระบบโหลดมากเกินไป ดังที่ได้กล่าวไว้ข้างต้นเมื่อจำนวนงานเพิ่มขึ้นงานทั้งหมดจะถูกวางไว้ในคิวการปิดกั้นของ idceivethreadpool ส่งผลให้เกิดการใช้หน่วยความจำมากเกินไปและในที่สุดหน่วยความจำล้น ควรหลีกเลี่ยงสถานการณ์ดังกล่าว ดังนั้นเมื่อเราพบว่าจำนวนเธรดเกินจำนวนเธรดสูงสุดเราควรเลิกงานบางอย่าง เมื่อทิ้งเราควรเขียนงานแทนที่จะทิ้งมันโดยตรง
มีตัวสร้างอื่นใน Threadpoolexecutor
Public Threadpoolexecutor (int corepoolsize, int maximumpoolsize, jeepalivetime ยาว, หน่วยเวลา, blockkingqueue <runnable> workqueue, ThreadFactory ThreadFactory, Handler HESHIVE <FOLSIZE orderalargumentException (); if (workqueue == null || threadFactory == null || handler == null) โยน nullpointerexception ใหม่ (); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos (keepalivetime); this.threadFactory = ThreadFactory; this.handler = Handler; -
เราจะแนะนำ ThreadFactory ในภายหลัง
ตัวจัดการปฏิเสธการดำเนินการตามนโยบายซึ่งจะบอกเราว่าจะทำอย่างไรถ้างานไม่สามารถดำเนินการได้
มีกลยุทธ์ทั้งหมด 4 กลยุทธ์ข้างต้น
Abortpolicy: หากไม่สามารถยอมรับงานได้ข้อยกเว้นจะถูกโยนลงไป
Callerrunspolicy: หากไม่สามารถยอมรับงานได้ให้เธรดการโทรเสร็จสมบูรณ์
DiscardoldestPolicy: หากไม่สามารถยอมรับงานได้งานที่เก่าแก่ที่สุดจะถูกทิ้งและดูแลโดยคิว
DiscardPolicy: หากไม่สามารถยอมรับงานได้งานจะถูกยกเลิก
ExecutorService ES = New ThreadPoolexecutor (5, 5, 0L, TimeUnit.seconds, LinkedBlockingQueue ใหม่ <Runnable> (), ใหม่ DENCENTEDEXECUTIONHANDLER () {@Override void ปฏิเสธการใช้งาน - แน่นอนว่าเรายังสามารถใช้อินเทอร์เฟซ DensedExecutionHandler ได้ด้วยตนเองเพื่อกำหนดนโยบายการปฏิเสธด้วยตนเอง
2.3. ปรับแต่ง ThreadFactory
ฉันเพิ่งเห็นว่าสามารถระบุ ThreadFactory ในตัวสร้างของ ThreadPoolexecutor
เธรดในพูลเธรดถูกสร้างขึ้นโดยโรงงานเธรดและเราสามารถปรับแต่งโรงงานเธรด
โรงงานเธรดเริ่มต้น:
คลาสสแตติก defaultThreadFactory ใช้ ThreadFactory {Private Static Final Atomicinteger Poolnumber = ใหม่ Atomicinteger (1); กลุ่ม ThreadGroup ภาคสุดท้ายส่วนตัว; Private Atomicinteger ThreadNumber = ใหม่ Atomicinteger (1); Private Final String NamePrefix; DefaultThreadFactory () {SecurityManager S = System.getSecurityManager (); กลุ่ม = (s! = null)? s.getThreadGroup (): tread.currentthread (). getThreadGroup (); nameprefix = "pool-" + poolnumber.getandincrement () + "-thread-"; } เธรดสาธารณะ newThread (runnable r) {เธรด t = เธรดใหม่ (กลุ่ม, r, nameprefix + threadnumber.getandincrement (), 0); if (t.isdaemon ()) t.setdaemon (เท็จ); if (t.getPriority ()! = thread.norm_priority) t.setPriority (thread.norm_priority); กลับ t; -3. Forkjoin
3.1. ความคิด
มันเป็นความคิดในการแบ่งและพิชิต
ส้อม/เข้าร่วมคล้ายกับอัลกอริทึม MapReduce ความแตกต่างระหว่างทั้งสองคือ: ส้อม/เข้าร่วมแบ่งออกเป็นงานเล็ก ๆ เฉพาะเมื่อจำเป็นเช่นถ้างานมีขนาดใหญ่มากในขณะที่ MapReduce เริ่มทำขั้นตอนแรกสำหรับการแบ่งส่วน ดูเหมือนว่า Fork/เข้าร่วมเหมาะสำหรับระดับด้ายภายใน JVM ในขณะที่ MapReduce เหมาะสำหรับระบบกระจาย
4.2. การใช้อินเทอร์เฟซ
RecursiveAction: ไม่มีค่าส่งคืน
Recursivetask: มีค่าส่งคืน
4.3. ตัวอย่างง่ายๆ
นำเข้า java.util.arraylist; นำเข้า java.util.concurrent.forkjoinpool; นำเข้า java.util.concurrent.forkjointask; นำเข้า java.util.concurrent.recursivetask; เริ่มต้นยาวส่วนตัว ปลายยาวส่วนตัว; Public Counttask (Long Start, Long End) {super (); this.start = เริ่ม; this.end = สิ้นสุด; } @Override ป้องกันการคำนวณยาว () {Long sum = 0; บูลีน cancompute = (end - start) <threshold; if (ยกเลิกการคำนวณ) {สำหรับ (long i = start; i <= end; i ++) {sum = sum+i; }} else {// แบ่งออกเป็น 100 งานขนาดเล็กขั้นตอนยาว = (start + end)/100; arrayList <CountTask> subtasks = new ArrayList <CountTask> (); POS ยาว = เริ่ม; สำหรับ (int i = 0; i <100; i ++) {Long Lastone = pos+step; if (lastone> end) {lastone = end; } CountTask subtask = new CountTask (pos, lastone); pos + = ขั้นตอน + 1; Subtasks.add (Subtask); Subtask.fork (); // ผลักดันงานย่อยไปยังพูลเธรด} สำหรับ (CountTask T: Subtasks) {sum += t.join (); // รองานย่อยทั้งหมดสิ้นสุด}} ส่งคืนผลรวม; } โมฆะคงที่สาธารณะหลัก (สตริง [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); CountTask task = new CountTask (0, 200000L); ForkJoinTask <long> result = forkJoinpool.submit (งาน); ลอง {long res = result.get (); System.out.println ("sum =" + res); } catch (exception e) {// todo: จัดการข้อยกเว้น e.printstacktrace (); - ตัวอย่างข้างต้นอธิบายถึงงานสรุป แบ่งงานสะสมออกเป็น 100 งานแต่ละงานจะทำผลรวมของตัวเลขเท่านั้นและหลังจากการเข้าร่วมครั้งสุดท้ายผลรวมที่คำนวณโดยแต่ละงานจะถูกสะสม
4.4. องค์ประกอบการใช้งาน
4.4.1.Workqueue และ CTL
แต่ละเธรดจะมีคิวงาน
Workqueue ชั้นสุดท้ายแบบคงที่
ในคิวการทำงานจะมีชุดของฟิลด์ที่จัดการเธรด
Int EventCount ผันผวน; // การเข้ารหัสนับจำนวนการยับยั้ง; <0 ถ้าไม่ทำงาน
int nextwait; // บันทึกการเข้ารหัสของบริกรเหตุการณ์ถัดไป
int narrows; // จำนวนเหล็กกล้า
คำใบ้ int; // คำแนะนำดัชนีเหล็ก
สระว่ายน้ำสั้น; // ดัชนีของคิวนี้ในพูล
โหมดสั้นสุดท้าย; // 0: lifo,> 0: fifo, <0: แบ่งปัน
ผันผวน int qlock; // 1: ล็อค, -1: สิ้นสุด; อย่างอื่น 0
ฐานข้อมูลที่ผันผวน // ดัชนีของสล็อตถัดไปสำหรับการสำรวจความคิดเห็น
int top; // ดัชนีของสล็อตถัดไปสำหรับการกด
Forkjointask <?> [] อาร์เรย์; // องค์ประกอบ (ไม่ได้ถูกจัดสรรในขั้นต้น)
สระว่ายน้ำ ForkJoinpool สุดท้าย; // พูลที่มีอยู่ (อาจเป็นโมฆะ)
เจ้าของ ForkJoinworkerThread สุดท้าย; // การเป็นเจ้าของเธรดหรือ NULL หากใช้ร่วมกัน
Parker ด้ายผันผวน; // == เจ้าของระหว่างการโทรไปที่จอด; อื่นเป็นโมฆะ
ผันผวน forkjointask <?> currentjoin; // งานที่เข้าร่วมใน AwaitJoin
ForkJoinTask <?> CurrentSteal; // งานที่ไม่ใช่ท้องถิ่นปัจจุบันกำลังดำเนินการ
ควรสังเกตที่นี่ว่ามีความแตกต่างอย่างมากระหว่าง JDK7 และ JDK8 ในการดำเนินการของ ForkJoin สิ่งที่เรากำลังแนะนำที่นี่มาจาก JDK8 ในพูลเธรดบางครั้งไม่ใช่ทุกเธรดที่กำลังดำเนินการบางเธรดจะถูกระงับและเธรดที่ถูกระงับเหล่านั้นจะถูกเก็บไว้ในสแต็ก มันเป็นตัวแทนภายในโดยรายการที่เชื่อมโยง
Nextwait จะชี้ไปที่เธรดรอถัดไป
ดัชนีดัชนีของตัวห้อยในพูลเธรด PoolIndex
EventCount เมื่อเริ่มต้น EventCount เกี่ยวข้องกับ PoolIndex รวม 32 บิตบิตแรกระบุว่าเปิดใช้งานหรือไม่และ 15 บิตระบุจำนวนครั้งที่ถูกระงับ
EventCount ส่วนที่เหลือแสดงถึง PoolIndex ใช้หนึ่งฟิลด์เพื่อแสดงถึงความหมายหลายอย่าง
Workqueue Workqueue แสดงโดย Forkjointask <?> [] อาร์เรย์ ด้านบนและฐานแสดงถึงปลายทั้งสองของคิวและข้อมูลอยู่ระหว่างสองนี้
รักษา CTL (ประเภทยาว 64 บิต) ใน ForkJoinpool
CTL ยาวผันผวน;
* Field CTL เต็มไปด้วย:
* AC: จำนวนคนทำงานที่ใช้งานอยู่ลบเป้าหมายการขนาน (16 บิต)
* TC: จำนวนคนงานทั้งหมดลบเป้าหมายการขนาน (16 บิต)
* ST: จริงถ้าพูลถูกยกเลิก (1 บิต)
* EC: จำนวนการรอคอยของด้ายรอด้านบน (15 บิต)
* ID: PoolIndex ของ Treiber Stack ของบริกร (16 บิต)
AC หมายถึงจำนวนเธรดที่ใช้งานอยู่ลบด้วยระดับการขนาน (อาจเป็นจำนวน CPU)
TC หมายถึงจำนวนเธรดทั้งหมดลบด้วยความเท่าเทียมกัน
ST ระบุว่ามีการเปิดใช้งานพูลเธรดเองหรือไม่
EC แสดงจำนวนเธรดที่ถูกระงับในเวลารอคอย
ID ระบุว่า poolIndex กำลังรอเธรดที่ด้านบน
เห็นได้ชัดว่า ST+EC+ID เป็นสิ่งที่เราเพิ่งเรียกว่า EventCount
แล้วทำไมคุณต้องสังเคราะห์ตัวแปรด้วยตัวแปร 5 ตัว? ในความเป็นจริงความสามารถมีอยู่ประมาณเดียวกันกับ 5 ตัวแปร
ความสามารถในการอ่านของการใช้รหัสตัวแปรจะแย่ลงมาก
เหตุใดจึงต้องใช้ตัวแปร? อันที่จริงนี่เป็นสิ่งที่ฉลาดที่สุดเพราะตัวแปรทั้ง 5 ตัวนี้ทั้งหมด ในมัลติเธรดถ้ามีการใช้ตัวแปร 5 ตัวจากนั้นเมื่อปรับเปลี่ยนตัวแปรใดตัวหนึ่งวิธีการตรวจสอบความสมบูรณ์ของตัวแปร 5 ตัว จากนั้นการใช้ตัวแปรจะช่วยแก้ปัญหานี้ได้ หากแก้ไขด้วยล็อคประสิทธิภาพจะลดลง
การใช้ตัวแปรทำให้มั่นใจได้ถึงความสอดคล้องและความเป็นอะตอมของข้อมูล
การเปลี่ยนแปลงของ ForkJoin Squadron CTL นั้นทำได้โดยใช้การดำเนินงาน CAS ดังที่ได้กล่าวไว้ในบทความชุดก่อนหน้านี้ CAS เป็นการดำเนินการที่ปราศจากล็อคและมีประสิทธิภาพที่ดี
เนื่องจากการดำเนินการ CAS สามารถกำหนดเป้าหมายได้เพียงตัวแปรเดียวเท่านั้นการออกแบบนี้จึงเหมาะสมที่สุด
4.4.2. การโจรกรรม
ต่อไปเราจะแนะนำเวิร์กโฟลว์ของพูลเธรดทั้งหมด
แต่ละเธรดเรียกทำงาน
Void Runworker สุดท้าย (Workqueue W) {W.GrowArray (); // จัดสรรคิวสำหรับ (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} ฟังก์ชั่นการสแกน () คือการสแกนสำหรับงานที่จะทำ
r เป็นจำนวนที่ค่อนข้างสุ่ม
การสแกน int สุดท้ายส่วนตัว (workqueue w, int r) {workqueue [] ws; int m; ยาว c = ctl; // สำหรับการตรวจสอบความสอดคล้องถ้า ((ws = workqueues)! = null && (m = ws.length - 1)> = 0 && w! = null) {สำหรับ (int j = m + m + 1, ec = w.eventcount ;;) {workqueue q; int b, e; Forkjointask <?> [] a; Forkjointask <?> t; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = ((a.length - 1) & b) << ashift) + abase; if ((t = ((forkjointask <?>) u.getObjectVolatile (a, i)))! = null) {ถ้า (ec <0) helprelease (c, ws, w, q, b); อื่นถ้า (q.base == b && u.compareandswapobject (a, i, t, null)) {u.putorderedint (q, qbase, b + 1); if ((b + 1) - q.top <0) signalwork (ws, q); w.runtask (t); } } หยุดพัก; } อื่นถ้า (-J <0) {if ((EC | (e = (int) c)) <0) // ไม่ใช้งานหรือยกเลิกการรอการคืนเงิน (W, C, EC); อื่นถ้า (ctl == c) {// พยายามหยุดการทำงานและ enqueue long nc = (ยาว) ec | ((C - AC_UNIT) & (AC_MASK | TC_MASK)); w.nextwait = e; W.EventCount = EC | int_sign; if (! U.Compareandswaplong (นี่, CTL, C, NC)) W.EventCount = EC; // กลับออก} break; }}} return 0; - มาดูวิธีการสแกนกันเถอะ พารามิเตอร์หนึ่งของการสแกนคือ Workqueue ดังที่ได้กล่าวไว้ข้างต้นแต่ละเธรดจะมีการทำงานและการทำงานของหลายเธรดจะถูกบันทึกไว้ในงาน r คือหมายเลขสุ่ม ใช้ R เพื่อค้นหางานและมีงานที่ต้องทำใน Workqueue
จากนั้นผ่านฐาน Workqueue รับการชดเชยฐาน
B = Q.Base
-
Long I = (((A.Length - 1) & b) << Ashift) + Abase;
-
จากนั้นรับงานสุดท้ายผ่านการชดเชยและเรียกใช้งานนี้
t = ((forkjointask <?>) u.getObjectVolatile (a, i))
-
w.runtask (t);
-
จากการวิเคราะห์คร่าวๆนี้เราพบว่าหลังจากเธรดปัจจุบันเรียกใช้วิธีการสแกนมันจะไม่ดำเนินการงานในงานปัจจุบัน แต่จะได้รับงานอื่น ๆ ผ่านหมายเลขสุ่ม r นี่เป็นหนึ่งในกลไกหลักของ ForkJoinpool
เธรดปัจจุบันจะไม่เพียง แต่มุ่งเน้นไปที่งานของตัวเองเท่านั้น แต่ยังให้ความสำคัญกับงานอื่น ๆ สิ่งนี้จะช่วยป้องกันความหิวจากการเกิดขึ้น สิ่งนี้จะช่วยป้องกันไม่ให้บางเธรดไม่สามารถทำงานให้เสร็จได้ทันเวลาเนื่องจากการติดอยู่หรือเหตุผลอื่น ๆ หรือเธรดมีงานจำนวนมาก แต่เธรดอื่น ๆ ไม่มีอะไรทำ
แล้วลองดูที่วิธี Runtask
โมฆะสุดท้าย runtask (forkjointask <?> งาน) {ถ้า ((currentsteal = task)! = null) {forkJoinworkerThread Thread; task.doexec (); ForkJoinTask <?> [] a = อาร์เรย์; int md = โหมด; ++ nsteals; currentsteal = null; ถ้า (md! = 0) pollandexecall (); อื่นถ้า (a! = null) {int s, m = a.length - 1; Forkjointask <?> t; ในขณะที่ ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandsetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doexec (); }} if ((thread = ownow)! = null) // ไม่จำเป็นต้องทำในที่สุด clause thread.aftertoplevelexec (); -มีชื่อที่น่าสนใจ: Currentsteal งานที่ถูกขโมยเป็นสิ่งที่ฉันเพิ่งอธิบาย
task.doexec ();
งานนี้จะเสร็จสมบูรณ์
หลังจากทำงานของคนอื่นเสร็จแล้วคุณจะทำงานของคุณเองให้เสร็จ
รับงานแรกโดยได้รับตำแหน่งสูงสุด
ในขณะที่ ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandsetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doexec ();}ถัดไปใช้กราฟเพื่อสรุปกระบวนการของพูลเธรดในขณะนี้
ตัวอย่างเช่นมีสองเธรด T1 และ T2 T1 จะได้รับภารกิจสุดท้ายของ T2 ผ่านฐานของ T2 (แน่นอนว่าเป็นงานสุดท้ายของเธรดผ่านหมายเลขสุ่ม R) และ T1 จะทำงานเป็นครั้งแรกผ่านด้านบนของตัวเอง ในทางตรงกันข้าม T2 จะทำเช่นเดียวกัน
งานที่คุณทำสำหรับเธรดอื่น ๆ เริ่มต้นจากฐานและงานที่คุณทำเพื่อตัวเองเริ่มต้นจากด้านบน สิ่งนี้จะช่วยลดความขัดแย้ง
หากไม่พบงานอื่น
อย่างอื่นถ้า (-J <0) {ถ้า ((EC | (e = (int) c)) <0) // ไม่ใช้งานหรือยุติการรอการคืนเงิน (W, C, EC); อื่นถ้า (ctl == c) {// พยายามหยุดการทำงานและ enqueue long nc = (ยาว) ec | ((C - AC_UNIT) & (AC_MASK | TC_MASK)); w.nextwait = e; W.EventCount = EC | int_sign; if (! U.Compareandswaplong (นี่, CTL, C, NC)) W.EventCount = EC; // กลับออก} break; - ก่อนอื่นค่าของ CTL จะถูกเปลี่ยนผ่านชุดของการรัน NC จะได้รับและจากนั้นค่าใหม่จะถูกกำหนดด้วย CAS จากนั้นโทรหา Workwork () เพื่อเข้าสู่สถานะการรอคอย (เรียกว่าวิธีการของ Unsafe Park ที่กล่าวถึงในบทความชุดก่อนหน้า)
สิ่งที่เราต้องอธิบายที่นี่คือการเปลี่ยนค่า CTL ที่นี่อันดับแรก AC -1 ใน CTL และ AC ครอบครอง CTL 16 บิตสูงสุดดังนั้นจึงไม่สามารถเป็น -1 ได้โดยตรง แต่แทนที่จะบรรลุผลของการสร้าง CTL -1 สูงสุด 16 บิตของ CTL -1 ผ่าน AC_UNIT (0x100000000000000) ของ CTL 16 บิตแรก
ดังที่ได้กล่าวไว้ก่อนหน้านี้ EventCount จะบันทึก PoolIndex และผ่าน PoolIndex และ Next Wait ใน Workqueue คุณสามารถสำรวจเธรดที่รอคอยทั้งหมด