1. Penggunaan dasar kumpulan benang
1.1. Mengapa Anda membutuhkan kumpulan utas?
Dalam bisnis harian, jika kami ingin menggunakan multi-threading, kami akan membuat utas sebelum bisnis dimulai, dan menghancurkan utas setelah bisnis berakhir. Namun, untuk bisnis, penciptaan dan penghancuran utas tidak ada hubungannya dengan bisnis itu sendiri, dan hanya peduli dengan tugas yang dilakukan oleh utas. Oleh karena itu, saya berharap untuk menggunakan sebanyak mungkin CPU untuk melakukan tugas, daripada membuat dan menghancurkan utas yang tidak terkait dengan bisnis. Kumpulan utas memecahkan masalah ini. Fungsi kumpulan utas adalah menggunakan kembali utas.
1.2. Dukungan apa yang disediakan JDK untuk kami
Diagram kelas terkait dalam JDK ditunjukkan pada gambar di atas.
Beberapa kategori khusus yang akan disebutkan.
Kelas yang dapat dipanggil mirip dengan kelas yang dapat dijalankan, tetapi perbedaannya adalah bahwa Callable memiliki nilai pengembalian.
ThreadPoolExecutor adalah implementasi penting dari kumpulan utas.
Pelaksana adalah kelas pabrik.
1.3. Penggunaan kumpulan benang
1.3.1. Jenis kumpulan benang
Public Static ExecutorService newfixedThreadPool (int nthreads) {return new ThreadPoolExecutor (nthreads, nthreads, 0L, timeunit.milliseconds, newSExecutor {{newReadExutor () {{newReadExor () {{{publicExecutexorexExecteRexEcle (); ThreadPoolExecutor (1, 1, 0L, timeunit.milliseconds, new LinkedBlockingQueue <Runnable> ()));} Public Static ExecutorService newCachedThreadPool () {return threadPoolExecutor (0, integer.max_value, 60l, timeunit.seadpoolExecutor (0, integer.max_value, 60l, timeunit.seadsque, ne baru, ne baru, ne baru, 60l, timeunit.SeAt.SECRON, NEWONOUSQUE.MAX_VALUE, 60L, timeunit.SeAt.Dari perspektif metode, jelas bahwa FixedThreadPool, SinglethreadExecutor, dan CachedThreadPool adalah contoh yang berbeda dari ThreadPoolExecutor, tetapi parameternya berbeda.
Thread PublicPoolExecutor (int corePoolsize, int maximumpoolsize, Keepalivetime panjang, unit timeunit, blockingqueue <Runnable> workqueue) {this (corePoolsize, maximumpoolsize, stepalivetime, unit, workqueue, executors.defaultThreadFactory (), defauultime, unit, workqueue (executors. Mari kita jelaskan secara singkat arti parameter dalam konstruktor ThreadPoolExecutor.
Dengan cara ini, melihat fixedthreadpool yang disebutkan di atas, jumlah inti dan jumlah maksimum utasnya sama, sehingga utas tidak akan dibuat dan dihancurkan selama bekerja. Ketika jumlah tugas besar dan utas di kumpulan utas tidak dapat dipenuhi, tugas akan disimpan ke LinkedBlockingQueue, dan ukuran LinkedBlockingQueue adalah integer.max_value. Ini berarti bahwa penambahan tugas yang berkelanjutan akan membuat memori lebih banyak lagi.
CachedThreadpool berbeda. Nomor utas intinya adalah 0, jumlah maksimum penyimpanan adalah integer.max_value, dan antrian pemblokirannya adalah synchronousqueue, yang merupakan antrian khusus, dan ukurannya adalah 0. Karena jumlah utas inti adalah 0, perlu untuk menambahkan tugas ke Synchronousqueue. Antrian ini hanya dapat berhasil ketika satu utas menambahkan data darinya dan utas lain mendapatkan data darinya. Menambahkan data ke antrian ini saja akan mengembalikan kegagalan. Ketika pengembalian gagal, kumpulan utas mulai memperluas utas, itulah sebabnya jumlah utas di CachedThreadpool tidak diperbaiki. Ketika utas tidak digunakan untuk 60 -an, utasnya dihancurkan.
1.4. Contoh kecil penggunaan kumpulan benang
1.4.1. Kumpulan benang sederhana
impor java.util.concurrent.executorservice; import java.util.concurrent.executors; kelas publik Threadpooldemo {public static class myTask mengimplementasikan runnable {@Override public run () {System.out.println (System.currentTimeMillis (). coba {thread.sleep (1000); } catch (Exception e) {E.PrintStackTrace (); }}} public static void main (string [] args) {mytask mytask = myTask baru (); ExecutorService ES = Executors.newfixedThreadPool (5); untuk (int i = 0; i <10; i ++) {es.submit (myTask); }}} Karena newfixedthreadpool (5) digunakan, tetapi 10 utas dimulai, 5 dieksekusi pada suatu waktu, dan jelas bahwa penggunaan kembali utas dilihat. ThreadID diulangi, yaitu, 5 tugas pertama dan 5 tugas terakhir dieksekusi oleh batch utas yang sama.
Apa yang digunakan di sini
es.submit (myTask);
Ada juga cara untuk mengirimkan:
es.execute (myTask);
Perbedaannya adalah bahwa pengiriman akan mengembalikan objek di masa depan, yang akan diperkenalkan nanti.
1.4.2.scheduledthreadpool
impor java.util.concurrent.Executors; impor java.util.concurrent.scheduledExecutorService; impor java.util.concurrent.timeunit; Public Class Threadpooldemo {public static Main (] args) {jadwal executorsorsorsservice. // Jika tugas sebelumnya belum selesai, pengiriman tidak akan dimulai. ses.scheduleWithFixedDelay (runnable baru () {@override public void run () {coba {thread.sleep (1000); System.out.println (System.currentTimeMillis ()/1000);} {Exception e) {// todo: handle exception}}}}}} {Exception e) {// todo: handle}}} {{//toDo: handle} {{//toDo: handle} {{//{/1 Mulai 0 detik, dan kemudian jalankan sekali setiap 2 detik dalam siklus}}Keluaran:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Karena eksekusi tugas memakan waktu 1 detik, penjadwalan tugas harus menunggu tugas sebelumnya diselesaikan. Artinya, setiap 2 detik di sini berarti bahwa tugas baru akan dimulai 2 detik setelah tugas sebelumnya selesai.
2. Perpanjang dan tingkatkan kumpulan benang
2.1. Antarmuka panggilan balik
Ada beberapa API panggilan balik di kumpulan utas untuk memberi kami operasi yang diperpanjang.
ExecutorService ES = ThreadPoolExecutor baru (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingQueue <Runnable> ()) {@Override Void batalExecute yang dilindungi (thread t, runnable r) {System.out.println ("bersiap untuk mengeksekusi"); } @Override Protected void afterexecute (runnable r, throwable t) {System.out.println ("Eksekusi selesai"); } @Override Protected void terminated () {System.out.println ("Thread Pool Exit"); }};Kami dapat mengimplementasikan metode befteexecute, afterexecute, dan yang diakhiri dari ThreadPoolExecutor untuk mengimplementasikan manajemen log atau operasi lain sebelum dan sesudah pelaksanaan utas, keluar dari kumpulan utas.
2.2. Strategi penolakan
Terkadang, tugasnya sangat berat, menghasilkan terlalu banyak beban pada sistem. Seperti disebutkan di atas, ketika jumlah tugas meningkat, semua tugas akan ditempatkan dalam antrian pemblokiran fixedthreadpool, menghasilkan terlalu banyak konsumsi memori dan akhirnya meluap memori. Situasi seperti itu harus dihindari. Jadi ketika kita menemukan bahwa jumlah utas melebihi jumlah maksimum utas, kita harus menyerah beberapa tugas. Saat dibuang, kita harus menuliskan tugas alih -alih membuangnya secara langsung.
Ada konstruktor lain di ThreadPoolExecutor.
Thread Public PublicExecutor (int corepoolsize, int maximumpoolsize, long stexalivetime, timeUnit unit, blockingqueue <runnable> workqueue, threadfactory threadFactory, ditolak executionHandler handler) {if (corepoolsize <0 || maximumpensize <= 0 || IlegalargumentException (); if (workqueue == null || threadFactory == null || handler == null) lempar nullpointerException baru (); this.corepoolsize = corePoolsize; this.maximumpoolSize = maximumpoolSize; this.workqueue = workqueue; this.eepalivetime = unit.tonanos (Keepalivetime); this.threadFactory = threadFactory; this.handler = handler; } Kami akan memperkenalkan ThreadFactory nanti.
Penangan menolak implementasi kebijakan, yang akan memberi tahu kami apa yang harus dilakukan jika tugas tidak dapat dieksekusi.
Ada 4 strategi di atas.
Abortpolicy: Jika tugas tidak dapat diterima, pengecualian dilemparkan.
Callerrunspolicy: Jika tugas tidak dapat diterima, biarkan utas panggilan selesai.
DiscardoldestPolicy: Jika tugas tidak dapat diterima, tugas tertua akan dibuang dan dikelola oleh antrian.
Discardpolicy: Jika tugas tidak dapat diterima, tugas akan dibuang.
ExecutorService es = new ThreadPoolExecutor (5, 5, 0L, TimeUnit.Seconds, baru LinkedBlockingQueue <Runnable> (), new RejectExecutionHandler () {@Override public void {runnable r, threadpoolExecutor executor) {System.out.out.println (r.) (r.) {System.out.out.println (r.) (r.) {System.out.out.println (r.) (r.) {System.out.out.println (roRNable r, roundpoolExecutorororororor) {System.out.out.println (roRnable r, r.) {System.out.out.println (r.) (R.) {System.out.out. Tentu saja, kami juga dapat menerapkan antarmuka RejectExecutionHandler kami sendiri untuk mendefinisikan kebijakan penolakan sendiri.
2.3. Kustomisasi ThreadFactory
Saya baru saja melihat bahwa ThreadFactory dapat ditentukan dalam konstruktor ThreadPoolExecutor.
Utas di kumpulan utas semuanya dibuat oleh pabrik utas, dan kami dapat menyesuaikan pabrik utas.
Pabrik utas default:
kelas statis defaultThreadFactory mengimplementasikan threadFactory {private static atomicinteger poolnumber = atomicinteger baru (1); grup threadgroup terakhir pribadi; Private Final AtomicInteger ThreadNumber = New AtomicInteger (1); nameprefix final private; DefaultThreadFactory () {SecurityManager S = System.GetSecurityManager (); grup = (s! = null)? s.getThreadGroup (): thread.currentThread (). getThreadGroup (); nameprefix = "pool-" + poolnumber.getAndincrement () + "-thread-"; } utas publik newThread (runnable r) {thread t = thread baru (grup, r, namePrefix + threadNumber.getAndIncrement (), 0); if (t.isdaemon ()) t.setdaemon (false); if (t.getPriority ()! = thread.norm_priority) t.setPriority (thread.norm_priority); mengembalikan t; }}3. Forkjoin
3.1. Pikiran
Itu adalah gagasan untuk membagi dan menaklukkan.
FORK/JOW mirip dengan algoritma MapReduce. Perbedaan antara keduanya adalah: garpu/bergabung dibagi menjadi tugas -tugas kecil hanya jika diperlukan, seperti jika tugasnya sangat besar, sementara MapReduce selalu mulai melakukan langkah pertama untuk segmentasi. Tampaknya Fork/Join lebih cocok untuk level utas dalam JVM, sementara MapReduce cocok untuk sistem terdistribusi.
4.2.Menggunakan antarmuka
RecursiveAction: Tidak ada nilai pengembalian
Recursivetask: ada nilai pengembalian
4.3. Contoh sederhana
impor java.util.arraylist; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkjointask; import java.util.concurrent.recursivetask; counttask kelas publik; awal yang panjang secara pribadi; akhir panjang pribadi; Publik CountTask (Long Start, Long End) {super (); this.start = mulai; this.end = end; } @Override Protected Long Compute () {long sum = 0; boolean canCompute = (end - start) <threshold; if (canCompute) {for (long i = start; i <= end; i ++) {sum = sum+i; }} else {// dibagi menjadi 100 tugas kecil Long Step = (start + end)/100; ArrayList <OuntTask> Subtasks = ArrayList baru <Ounttask> (); Pos Long = Mulai; untuk (int i = 0; i <100; i ++) {long lastOne = pos+step; if (lastOne> end) {lastOne = end; } Counttask subtask = counttask baru (pos, lastOne); POS + = Langkah + 1; Subtasks.Add (Subtask); subtask.fork (); // dorong subtasks ke thread pool} untuk (counttask t: subtasks) {sum += t.join (); // menunggu semua subtuks untuk mengakhiri}} return sum; } public static void main (string [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); Counttask Task = Counttask baru (0, 200000L); Forkjointask <long> hasil = forkjoinpool.submit (tugas); coba {long res = result.get (); System.out.println ("sum =" + res); } catch (Exception e) {// todo: handle Exception E.PrintStackTrace (); }}} Contoh di atas menjelaskan tugas menyimpulkan. Bagilah tugas yang terakumulasi menjadi 100 tugas, masing -masing tugas hanya melakukan jumlah angka, dan setelah gabungan akhir, jumlah yang dihitung oleh setiap tugas kemudian diakumulasikan.
4.4. Elemen implementasi
4.4.1.workqueue dan CTL
Setiap utas akan memiliki antrian kerja
Workqueue kelas akhir statis
Dalam antrian kerja, akan ada serangkaian bidang yang mengelola utas.
Volatile int EventCount; // jumlah inaktivasi yang dikodekan; <0 jika tidak aktif
int nextwait; // Catatan yang dikodekan dari pelayan acara berikutnya
int rarrows; // Jumlah baja
int petunjuk; // petunjuk indeks baja
Poolindex pendek; // indeks antrian ini di kolam renang
mode pendek terakhir; // 0: lifo,> 0: fifo, <0: dibagikan
qlock int volatil; // 1: terkunci, -1: hentikan; lain 0
basis int yang mudah menguap; // indeks slot berikutnya untuk jajak pendapat
int top; // indeks slot berikutnya untuk dorong
Forkjointask <?> [] Array; // Elemen (awalnya tidak dialokasikan)
Final Forkjoinpool Pool; // kolam yang berisi (mungkin nol)
Pemilik Final ForkjoinWorkerThread; // memiliki utas atau nol jika dibagikan
Parker benang yang mudah menguap; // == Pemilik selama panggilan ke parkir; lain null
forkjointask volatile <?> Currentjoin; // Tugas bergabung dalam AWAITJIN
Forkjointask <?> Currentsteal; // tugas non-lokal saat ini sedang dieksekusi
Perlu dicatat di sini bahwa ada perbedaan besar antara JDK7 dan JDK8 dalam implementasi Forkjoin. Apa yang kami perkenalkan di sini adalah dari JDK8. Di kumpulan utas, kadang -kadang tidak semua utas dieksekusi, beberapa utas akan ditangguhkan, dan benang yang ditangguhkan itu akan disimpan dalam tumpukan. Itu diwakili secara internal oleh daftar tertaut.
Nextwait akan menunjuk ke utas menunggu berikutnya.
Indeks Indeks Subskrip di PoolIndex Thread Pool.
EventCount Saat diinisialisasi, EventCount terkait dengan PoolIndex. Sebanyak 32 bit, bit pertama menunjukkan apakah diaktifkan, dan 15 bit menunjukkan berapa kali telah ditangguhkan
EventCount, sisanya mewakili PoolIndex. Gunakan satu bidang untuk mewakili banyak makna.
WorkQueue Workqueue diwakili oleh Forkjointask <?> [] Array. Top dan base mewakili kedua ujung antrian, dan data adalah di antara keduanya.
Pertahankan CTL (Tipe Panjang 64-Bit) di Forkjoinpool
CTL panjang yang mudah menguap;
* Field CTL sudah lama dikemas dengan:
* AC: Jumlah pekerja yang berjalan aktif dikurangi paralelisme target (16 bit)
* TC: Jumlah total pekerja dikurangi paralelisme target (16 bit)
* St: true jika biliar diakhiri (1 bit)
* EC: Hitungan menunggu utas tunggu teratas (15 bit)
* ID: Poolindex dari Top of Treiber Stack of Waiters (16 bit)
AC mewakili jumlah utas aktif dikurangi tingkat paralelisme (mungkin jumlah CPU)
TC berarti jumlah total utas dikurangi paralelisme
ST menunjukkan apakah kumpulan utas itu sendiri diaktifkan
EC mewakili jumlah utas yang ditangguhkan pada waktu tunggu teratas
ID menunjukkan PoolIndex menunggu utas di bagian atas
Jelas bahwa ST+EC+ID adalah apa yang baru saja kami sebut EventCount.
Jadi mengapa Anda harus mensintesis variabel dengan 5 variabel? Bahkan, kapasitas menempati hampir sama dengan 5 variabel.
Keterbacaan menggunakan kode variabel akan jauh lebih buruk.
Jadi mengapa menggunakan variabel? Bahkan, ini adalah hal yang paling pintar, karena 5 variabel ini adalah keseluruhan. Dalam multi-threading, jika 5 variabel digunakan, kemudian ketika memodifikasi salah satu variabel, bagaimana memastikan integritas 5 variabel. Kemudian menggunakan variabel akan menyelesaikan masalah ini. Jika diselesaikan dengan kunci, kinerja akan terdegradasi.
Menggunakan variabel memastikan konsistensi dan atomisitas data.
Perubahan CTL Skuadron Forkjoin semuanya dilakukan dengan menggunakan operasi CAS. Seperti disebutkan dalam seri artikel sebelumnya, CAS adalah operasi bebas kunci dan memiliki kinerja yang baik.
Karena operasi CAS hanya dapat menargetkan satu variabel, desain ini optimal.
4.4.2. Pencurian pekerjaan
Selanjutnya, kami akan memperkenalkan alur kerja seluruh kumpulan utas.
Setiap utas memanggil runworker
final void runworker (workqueue w) {w.growarray (); // Alokasikan antrian untuk (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} Fungsi pemindaian () adalah untuk memindai tugas -tugas yang harus dilakukan.
R adalah angka yang relatif acak.
private final int scan (workqueue w, int r) {workqueue [] ws; int m; Long C = CTL; // Untuk konsistensi periksa if ((ws = workqueues)! = null && (m = ws.length - 1)> = 0 && w! = null) {for (int j = m + m + 1, ec = w.eventcount ;;) {workqueue q; int b, e; Forkjointask <?> [] A; Forkjointask <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = (((a.length - 1) & b) << ashift) + abase; if ((t = ((forkjointask <?>) u.getObjectVolatile (a, i)))! = null) {if (ec <0) helprelease (c, ws, w, q, b); lain if (q.base == b && u.ComeandsWapObject (a, i, t, null)) {u.putorderedInt (q, qbase, b + 1); if ((b + 1) - q.top <0) sinyalwork (ws, q); w.runtask (t); } } merusak; } else if (--j <0) {if ((ec | (e = (int) c)) <0) // tidak aktif atau mengakhiri pengembalian menunggu (w, c, ec); lain if (ctl == c) {// Cobalah untuk menonaktifkan dan enqueue long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; w.eventcount = ec | Int_sign; if (! u.cpareandswaplong (ini, ctl, c, nc)) w.eventcount = ec; // mundur} break; }}} return 0; } Mari kita lihat metode pemindaian. Salah satu parameter pemindaian adalah workqueue. Seperti disebutkan di atas, setiap utas akan memiliki workqueue, dan workqueue dari beberapa utas akan disimpan dalam workqueues. R adalah angka acak. Gunakan R untuk menemukan workqueue dan memiliki tugas yang harus dilakukan dalam workqueue.
Kemudian, melalui basis workqueue, dapatkan basis offset.
b = q.base
..
long i = (((a.length - 1) & b) << ashift) + abase;
..
Kemudian dapatkan tugas terakhir melalui offset dan jalankan tugas ini
t = ((forkjointask <?>) u.getObjectVolatile (a, i))
..
w.runtask (t);
..
Melalui analisis kasar ini, kami menemukan bahwa setelah utas saat ini memanggil metode pemindaian, itu tidak akan menjalankan tugas dalam workqueue saat ini, tetapi akan mendapatkan tugas kerja lainnya melalui angka acak r. Ini adalah salah satu mekanisme utama Forkjoinpool.
Utas saat ini tidak hanya akan fokus pada tugasnya sendiri, tetapi akan memprioritaskan tugas -tugas lain. Ini mencegah kelaparan terjadi. Ini mencegah beberapa utas tidak dapat menyelesaikan tugas dalam waktu karena macet atau alasan lain, atau utas memiliki banyak tugas, tetapi utas lain tidak ada hubungannya.
Lalu mari kita lihat metode runtask
final void runtask (forkjointask <?> Tugas) {if ((currentSteal = tugas)! = null) {forkjoinWorkerthread thread; task.doexec (); Forkjointask <?> [] A = array; int md = mode; ++ nSteals; CurrentSteal = null; if (md! = 0) pollandexecall (); lain if (a! = null) {int s, m = a.length - 1; Forkjointask <?> T; while ((s = atas - 1) - base> = 0 && (t = (forkjointask <?>) u.getaDsetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doexec (); }} if ((thread = pemilik)! = null) // tidak perlu dilakukan di akhirnya klausa thread.aftertoplevelevelexec (); }}Ada nama yang menarik: Currentsteal, tugas yang dicuri memang apa yang baru saja saya jelaskan.
task.doexec ();
Tugas ini akan selesai.
Setelah menyelesaikan tugas orang lain, Anda akan menyelesaikan tugas Anda sendiri.
Dapatkan tugas pertama dengan mendapatkan yang teratas
while ((s = atas - 1) - base> = 0 && (t = (forkjointask <?>) u.getaDsetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doexec ();}Selanjutnya, gunakan grafik untuk meringkas proses kumpulan utas sekarang.
Misalnya, ada dua utas T1 dan T2. T1 akan mendapatkan tugas terakhir T2 melalui basis T2 (tentu saja, itu sebenarnya tugas terakhir utas melalui angka acak r), dan T1 juga akan melakukan tugas pertama melalui atasannya sendiri. Sebaliknya, T2 akan melakukan hal yang sama.
Tugas yang Anda ambil untuk utas lain mulai dari pangkalan, dan tugas yang Anda ambil sendiri mulai dari atas. Ini mengurangi konflik
Jika tidak ada tugas lain yang ditemukan
lain if (--j <0) {if ((ec | (e = (int) c)) <0) // tidak aktif atau mengakhiri pengembalian menunggu (w, c, ec); lain if (ctl == c) {// Cobalah untuk menonaktifkan dan enqueue long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; w.eventcount = ec | Int_sign; if (! u.cpareandswaplong (ini, ctl, c, nc)) w.eventcount = ec; // mundur} break; } Kemudian pertama -tama, nilai CTL akan diubah melalui serangkaian proses, NC akan diperoleh, dan kemudian nilai baru akan ditetapkan dengan CAS. Kemudian hubungi Awaitwork () untuk memasuki keadaan menunggu (disebut metode taman tidak aman yang disebutkan dalam seri artikel sebelumnya).
Yang perlu kita jelaskan di sini adalah mengubah nilai CTL. Di sini, pertama, AC -1 di CTL, dan AC menempati 16 bit teratas CTL, sehingga tidak dapat langsung -1, tetapi sebaliknya mencapai efek dari membuat 16 bit teratas CTL -1 melalui AC_Unit (0x1000000000000) dari 16 bit CTL pertama.
Seperti yang disebutkan sebelumnya, EventCount menyimpan PoolIndex, dan melalui PoolIndex dan Nextwait di Workqueue, Anda dapat melintasi semua utas menunggu.