博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
day 04 Java并发多线程
阅读量:4329 次
发布时间:2019-06-06

本文共 30082 字,大约阅读时间需要 100 分钟。

http://www.cnblogs.com/hellocsl/p/3969768.html?utm_source=tuicool&utm_medium=referral PS:而JVM 每遇到一个线程,就为其分配一个Program Counter Register(程序计数器), VM Stack(虚拟机栈)和Native Method Stack (本地方法栈) 引用别人的博客,关于Java内存管理,博客很好

 并发编程的挑战

 

 

PS:  出现那么多的锁就是为了减少  获得锁和释放锁的 性能消耗; 而且锁只能升级不能降级

 

/** * Alipay.com Inc. * Copyright (c) 2004-2015 All Rights Reserved. */package chapter02;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;/** * 计数器 *  * @author tengfei.fangtf * @version $Id: Snippet.java, v 0.1 2015-7-31 下午11:32:42 tengfei.fangtf Exp $ */public class Counter {    private AtomicInteger atomicI = new AtomicInteger(0);    private int           i       = 0;    public static void main(String[] args) {        final Counter cas = new Counter();        List
ts = new ArrayList
(600); long start = System.currentTimeMillis(); for (int j = 0; j < 100; j++) { Thread t = new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10000; i++) { cas.count(); cas.safeCount(); } } }); ts.add(t); } for (Thread t : ts) { t.start(); } // 等待所有线程执行完成 for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(cas.i); System.out.println(cas.atomicI.get()); //获取值 System.out.println(System.currentTimeMillis() - start); } /** * 使用CAS实现线程安全计数器 */ private void safeCount() { for (;;) { int i = atomicI.get(); boolean suc = atomicI.compareAndSet(i, ++i); if (suc) { break; } } } /** * 非线程安全计数器 */ private void count() { i++;//一个cpu加了,但是另一个不一定加 }} PS: 原子性的那个操作一直都不会变

   这一章看得不太明白

 

 

PS: main方法天生就是一个多线程

package chapter04;import java.lang.management.ManagementFactory;import java.lang.management.ThreadInfo;import java.lang.management.ThreadMXBean;/** * 6-1 */public class MultiThread {    public static void main(String[] args) {        // 获取Java线程管理MXBean        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();        // 不需要获取同步的monitor和synchronizer信息,仅仅获取线程和线程堆栈信息        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);        // 遍历线程信息,仅打印线程ID和线程名称信息        for (ThreadInfo threadInfo : threadInfos) {            System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());        }    }}

PS : suspend 、resume 、stop方法 

PS: 优雅的结束线程

package chapter04;import java.util.concurrent.TimeUnit;/** * 6-9 */public class Shutdown {    public static void main(String[] args) throws Exception {        Runner one = new Runner();        Thread countThread = new Thread(one, "CountThread");        countThread.start();        // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束        TimeUnit.SECONDS.sleep(1);        countThread.interrupt();        Runner two = new Runner();        countThread = new Thread(two, "CountThread");        countThread.start();        // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束        TimeUnit.SECONDS.sleep(1);        two.cancel();    }    private static class Runner implements Runnable {        private long             i;        private volatile boolean on = true;        @Override        public void run() {            while (on && !Thread.currentThread().isInterrupted()) {                i++;            }            System.out.println("Count i = " + i);        }        public void cancel() {            on = false;        }    }}

-------------------------------------------------------------------------------------------------------------------------

 

 

PS:进程通俗的讲就是一个应用程序,  太会在  内存中分配独立的运行空间    线程:它是位于进程中,负责当前进程中的某个具备独立运行资格的空间。

.1.1.    synchronized

package cn.itcast_01_mythread.thread.testThread;public class MySynchronized {    public static void main(String[] args) {        final MySynchronized mySynchronized = new MySynchronized();//这才是公用的锁        final MySynchronized mySynchronized2 = new MySynchronized();        new Thread("thread1") {            public void run() {                synchronized (mySynchronized) {                try {                    System.out.println(this.getName()+" start");                    int i =1/0;   //如果发生异常,jvm会将锁释放                    Thread.sleep(5000);                    System.out.println(this.getName()+"醒了");                    System.out.println(this.getName()+" end");                } catch (InterruptedException e) {                    e.printStackTrace();                }                }            }        }.start();        new Thread("thread2") {            public void run() {                synchronized (mySynchronized) {         //争抢同一把锁时,线程1没释放之前,线程2只能等待//                    synchronized (mySynchronized2) {    //如果不是一把锁,可以看到两句话同时打印                    System.out.println(this.getName()+" start");                    System.out.println(this.getName()+" end");                                    }            }        }.start();    }}
  • synchronized的缺陷
 

    synchronized是java中的一个关键字,也就是说是Java语言内置的特性。

 

     如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

 

     1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;

 

      2)线程执行发生异常(他挂了),此时JVM会让线程自动释放锁。

------------------------------------------------------------ PS:也就是   释放锁有两种方式    自己释放;自己挂了。

---------------------------------------------------------------------------------------

PS :因为Synchronoied使用起来不方便,java5以后出现了Lock

PS : CountDownLatch、ReentrantLock和ReentrantReadWriteLock都是同步组件

package chapter05;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 10-20 */public class ConditionUseCase {    Lock      lock      = new ReentrantLock();    Condition condition = lock.newCondition();    public void conditionWait() throws InterruptedException {        lock.lock();        try {            condition.await();        } finally {            lock.unlock();        }    }    public void conditionSignal() throws InterruptedException {        lock.lock();        try {            condition.signal();        } finally {            lock.unlock();        }    }}

 

Answer: 1.因为HashMap会照成环形数据结构,一直有next,然后就死锁了

              2.HashTable使用synchronied并发效率非常低下

 

 

PS: Synchronizd

PS :ReentrantLock是唯一实现Lock的接口

 

 

PS:lock的用法

 
import java.util.ArrayList;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class MyLockTest {    private static ArrayList
arrayList = new ArrayList
(); static Lock lock = new ReentrantLock(); // 注意这个地方,因为Lock是一个借口,通常ReentrantLock进行实现 public static
void main(String[] args) { new Thread() { public void run() { Thread thread = Thread.currentThread(); lock.lock();//获取锁 try { System.out.println(thread.getName() + "得到了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock();//不释放就死锁了 } }; }.start(); new Thread() { public void run() { Thread thread = Thread.currentThread(); lock.lock(); try { System.out.println(thread.getName() + "得到了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock(); } }; }.start(); }} PS:每次需要手动关闭锁

 

tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。

 

PS:他会尝试获取锁
 

 

import java.util.ArrayList;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 观察现象:一个线程获得锁后,另一个线程取不到锁,不会一直等待 * @author * */public class MyTryLock {    private static ArrayList
arrayList = new ArrayList
(); static Lock lock = new ReentrantLock(); // 注意这个地方 public static void main(String[] args) { new Thread() { public void run() { Thread thread = Thread.currentThread(); boolean tryLock = lock.tryLock(); System.out.println(thread.getName()+" "+tryLock); if (tryLock) { try { System.out.println(thread.getName() + "得到了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock(); } } }; }.start(); new Thread() { public void run() { Thread thread = Thread.currentThread(); boolean tryLock = lock.tryLock(); System.out.println(thread.getName()+" "+tryLock); if (tryLock) { try { System.out.println(thread.getName() + "得到了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock(); } } }; }.start(); }} PS:一旦某个线程tryLock false以后, 其他线程就获取不了 了
 

当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程(并非中断A的操作)

  注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。

  因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。

 
package cn.itcast_01_mythread.thread.lock;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 观察现象:如果thread-0得到了锁,阻塞。。。thread-1尝试获取锁,如果拿不到,则可以被中断等待 * @author * */public class MyInterruptibly {     private Lock lock = new ReentrantLock();               public static void main(String[] args)  {            MyInterruptibly test = new MyInterruptibly();            MyThread thread0 = new MyThread(test);            MyThread thread1 = new MyThread(test);            thread0.start();            thread1.start();                         try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            thread1.interrupt();    //如果线程1 陷入等待,  这可以让他结束等待            System.out.println("=====================");        }                   public void insert(Thread thread) throws InterruptedException{            lock.lockInterruptibly();   //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出            try {                  System.out.println(thread.getName()+"得到了锁");

//long startTime = System.currentTimeMillis();

//for( ; ;) {
/*if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
break;*/
//插入数据
//}
                 Thread.sleep(5000);

}            finally {                System.out.println(Thread.currentThread().getName()+"执行finally");                lock.unlock();                System.out.println(thread.getName()+"释放了锁");            }          }    }         class MyThread extends Thread {        private MyInterruptibly test = null;        public MyThread(MyInterruptibly test) {            this.test = test;        }        @Override        public void run() {                         try {                test.insert(Thread.currentThread());            } catch (Exception e) {                System.out.println(Thread.currentThread().getName()+"被中断");            }        }}

 PS : ReadWriteLock

package cn.itcast_01_mythread.thread.lock;/** * 一个线程又要读又要写,用synchronize来实现的话,读写操作都只能锁住后一个线程一个线程地进行 * @author * */public class MySynchronizedReadWrite {        public static void main(String[] args)  {        final MySynchronizedReadWrite test = new MySynchronizedReadWrite();                 new Thread(){            public void run() {                test.get(Thread.currentThread());            };        }.start();                 new Thread(){            public void run() {                test.get(Thread.currentThread());            };        }.start();             }           public synchronized void get(Thread thread) {        long start = System.currentTimeMillis();        int i=0;        while(System.currentTimeMillis() - start <= 1) {
//执行时间不操过1s i++; if(i%4==0){ System.out.println(thread.getName()+"正在进行写操作"); }else { System.out.println(thread.getName()+"正在进行读操作"); } } System.out.println(thread.getName()+"读写操作完毕"); }}

Thread-0正在进行读操作

Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0读写操作完毕
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1读写操作完毕

 
读写锁:读的时候,其他线程还可以操作,写的时候绝对不行。   里面有两个方法
 
import java.util.concurrent.locks.ReentrantReadWriteLock;/** * 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程 *  * 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。 * 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。   PS : 多线程就是可以同时读操作,写操作的时候不可以读,读的时候不可以写 * @author * *bee:读的时候多个线程可以同时操作,  写线程不能操作 */
public class MyReentrantReadWriteLock {     private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();             public static void main(String[] args)  {            final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();                         new Thread(){                public void run() {                    test.get(Thread.currentThread());                    test.write(Thread.currentThread());                };            }.start();                         new Thread(){                public void run() {                    test.get(Thread.currentThread());                    test.write(Thread.currentThread());                };            }.start();                     }                  /**         * 读操作,用读锁来锁定         * @param thread         */        public void get(Thread thread) {            rwl.readLock().lock();            try {                long start = System.currentTimeMillis();                                 while(System.currentTimeMillis() - start <= 1) {                    System.out.println(thread.getName()+"正在进行读操作");                }                System.out.println(thread.getName()+"读操作完毕");            } finally {                rwl.readLock().unlock();            }        }        /**         * 写操作,用写锁来锁定         * @param thread         */        public void write(Thread thread) {            rwl.writeLock().lock();;            try {                long start = System.currentTimeMillis();                                 while(System.currentTimeMillis() - start <= 1) {                    System.out.println(thread.getName()+"正在进行写操作");                }                System.out.println(thread.getName()+"写操作完毕");            } finally {                rwl.writeLock().unlock();            }        }}

---------------------------------------------------

1.3 关于volatile的介绍

 

package cn.itcast_01_mythread.volatiletest;public class TestVolatile {	public static volatile int numb = 0;	public static void main(String[] args) throws Exception {		for (int i = 0; i < 100; i++) {			new Thread(new Runnable() {				@Override				public void run() {					for (int i = 0; i < 1000; i++) {						numb++;					}				}			}).start();		}				Thread.sleep(2000);		System.out.println(numb);	}} 
PS: 因为 每当启动一个线程的时候都会创建一个栈内存,他们共享着堆空间的数据。当想要对 堆上某个数据进行操作的时候,  就会复制相应的数据到 自己的栈空间  进行操作,  volatile就是为了各个线程间同步数据的问题。 这和synchronized还不太一样,synchronized是完全把数据上锁了。  volatile还用是提高数据保存的位置。

1.4 并发的执行

 

PS:当抢小米手机的时候,通常会异步解耦。其实就是一个修改库存的过程,成千上万个用户同时访问服务器,服务器这边会使用线程池对线程进行管理,防止创建线程过多,服务器奔溃。 同时,会使用任务消息队列 ,在java中使用 JMS规范的ActiveMQ(为了解决大并发的请求,放入编写好的消息队列中)来解决。比如,有5台手机,有100人发起请求,前五个线程可以获得手机,后面直接提示jj。这样一个解耦
  • PS: 线程是不可以无限增长的,所以用一个线程池进行管理
  •  

package cn.itcast_01_mythread.pool;import java.util.ArrayList;import java.util.HashMap;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;public class TestPool {    public static void main(String[] args) throws Exception {        Future
submit = null; Random random = new Random(); //创建固定数量线程池 ExecutorService exec = Executors.newFixedThreadPool(4); //创建调度线程池 //ScheduledExecutorService exec = Executors.newScheduledThreadPool(4); //用来记录各线程的返回结果 ArrayList
> results = new ArrayList
>(); for (int i = 0; i < 10; i++) { //fixedPool提交线程,runnable无返回值,callable有返回值 submit = exec.submit(new TaskRunnable(i)); /*submit = exec.submit(new TaskCallable(i));*/ //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致 /*submit = exec.submit(new TaskCallable(i));*/ //对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行 //submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS); //存储线程执行结果 results.add(submit); } //打印结果 for(Future f: results){ boolean done = f.isDone(); System.out.println(done?"已完成":"未完成"); //从结果的打印顺序可以看到,即使未完成,也会阻塞等待 System.out.println("线程返回future结果: " + f.get()); } exec.shutdown(); }}

 

pool-1-thread-1 启动时间:1510802909未完成pool-1-thread-3 启动时间:1510802909pool-1-thread-4 启动时间:1510802909pool-1-thread-2 启动时间:1510802909pool-1-thread-1 is working...0线程返回future结果: null未完成pool-1-thread-1 启动时间:1510802909pool-1-thread-3 is working...2pool-1-thread-3 启动时间:1510802909pool-1-thread-1 is working...4pool-1-thread-1 启动时间:1510802909pool-1-thread-2 is working...1线程返回future结果: nullpool-1-thread-2 启动时间:1510802910已完成线程返回future结果: null未完成pool-1-thread-1 is working...6pool-1-thread-3 is working...5pool-1-thread-1 启动时间:1510802910pool-1-thread-3 启动时间:1510802910pool-1-thread-4 is working...3线程返回future结果: null已完成线程返回future结果: null已完成线程返回future结果: null已完成线程返回future结果: null未完成pool-1-thread-3 is working...9pool-1-thread-1 is working...8pool-1-thread-2 is working...7线程返回future结果: null已完成线程返回future结果: null已完成线程返回future结果: null

 PS :Runnable和Callanble的区别 

package cn.itcast_01_mythread.pool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPoolWithRunable {        /**     * 通过线程池执行线程     * @param args     */    public static void main(String[] args) {        //创建一个线程池        ExecutorService pool = Executors.newCachedThreadPool();        for(int i = 1; i < 5; i++){            pool.execute(new Runnable() {
//提交任务 @Override public void run() { System.out.println("thread name: " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } pool.shutdown(); }}

 

提交 Callable,该方法返回一个 Future 实例表示任务的状态

调用submit提交任务, 匿名Callable,重写call方法, 有返回值, 获取返回值会阻塞,一直要等到线程任务返回结果

见代码:ThreadPoolWithcallable

pool-1-thread-1 启动时间:1510803239pool-1-thread-2 启动时间:1510803239pool-1-thread-2 is working...1pool-1-thread-2 启动时间:1510803239pool-1-thread-3 启动时间:1510803239未完成pool-1-thread-4 启动时间:1510803239pool-1-thread-4 is working...3pool-1-thread-4 启动时间:1510803239pool-1-thread-4 is working...5pool-1-thread-4 启动时间:1510803239pool-1-thread-1 is working...0线程返回future结果: 0已完成线程返回future结果: 1未完成pool-1-thread-1 启动时间:1510803240pool-1-thread-3 is working...2线程返回future结果: 2已完成线程返回future结果: 3未完成pool-1-thread-3 启动时间:1510803240pool-1-thread-2 is working...4线程返回future结果: 4pool-1-thread-2 启动时间:1510803241已完成线程返回future结果: 5未完成pool-1-thread-1 is working...7pool-1-thread-4 is working...6线程返回future结果: 6已完成线程返回future结果: 7未完成pool-1-thread-3 is working...8线程返回future结果: 8未完成pool-1-thread-2 is working...9线程返回future结果: 9

  

package cn.itcast_01_mythread.pool;import java.util.ArrayList;import java.util.HashMap;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;public class TestPool {    public static void main(String[] args) throws Exception {        Future
submit = null; Random random = new Random(); //创建固定数量线程池 ExecutorService exec = Executors.newFixedThreadPool(4); //创建调度线程池 //ScheduledExecutorService exec = Executors.newScheduledThreadPool(4); //用来记录各线程的返回结果 ArrayList
> results = new ArrayList
>(); for (int i = 0; i < 10; i++) { //fixedPool提交线程,runnable无返回值,callable有返回值 //submit = exec.submit(new TaskRunnable(i)); submit = exec.submit(new TaskCallable(i)); //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致 /*submit = exec.submit(new TaskCallable(i));*/ //对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行 //submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS); //存储线程执行结果 results.add(submit); } //打印结果 for(Future f: results){ boolean done = f.isDone(); System.out.println(done?"已完成":"未完成"); //从结果的打印顺序可以看到,即使未完成,也会阻塞等待 System.out.println("线程返回future结果: " + f.get()); } exec.shutdown(); }}

 

package cn.itcast_01_mythread.pool;import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * callable 跟runnable的区别: * runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值 *  * callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果 * */public class ThreadPoolWithcallable {    public static void main(String[] args) throws InterruptedException, ExecutionException {        ExecutorService pool = Executors.newFixedThreadPool(4);                 for(int i = 0; i < 10; i++){            Future
submit = pool.submit(new Callable
(){
//线程是有返回值的 @Override public String call() throws Exception { //System.out.println("a"); Thread.sleep(5000); return "b--"+Thread.currentThread().getName(); } }); //从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务执行完成 才能返回结果!!!!! 所以一般不要使用,他会阻塞主线程,如果必须想要得到结果在使用 System.out.println(submit.get());, } pool.shutdown(); }}

 

package cn.itcast_01_mythread.pool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;/** * 列出并发包中的各种线程池 * @author * */public class ExecutorDemo {        public static void main(String[] args) {        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();//单线程,任务顺序执行        //线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,        //如果线程超过60秒内没执行,那么将被终止并从池中删除,        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();        int cpuNums = Runtime.getRuntime().availableProcessors();        System.out.println(cpuNums);//CPU的核数        //在构造函数中的参数4是线程池的大小,你可以随意设置,也可以和cpu的核数量保持        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(cpuNums);        //用来调度即将执行的任务的线程池,可能是不是直接执行, 每隔多久执行一次... 策略型的        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(8);                //只有一个线程,用来调度任务在指定时间执行        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();    }}
PS:上面可以看到Runnable和callable分别是线程中的不同的实现方式。Runnable不会返回结果,而callable会返回结果,但是在拿结果的时候会阻塞。 在线程池中通过固定数量的线程进行相同的操作,有不同 的实现 。

 

.1.     java并发包消息队列及在开源软件中的应用

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。           类似与锁

主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

         插入:

                   1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常,不好

        2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

        3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续, 有阻塞, 放不进去就等待

         读取:

        4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; 

        5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止; 阻塞, 取不到就一直等

         其他

int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准, 不能保证数据的准确性

boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改    变了返回true

public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true

int drainTo(Collection<? super E> c); //移除此队列中所有可用的元素,并将它们添加到给定 collection 中。取出放到集合中

int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,指定了移   动的数量; 取出指定个数放到集合

 

BlockingQueue有四个具体的实现类,常用的两种实现类为:

 

1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

 

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

         LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

 

LinkedBlockingQueue和ArrayBlockingQueue区别:

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

 

 

生产者消费者的示例代码:

见代码

package cn.itcast_02_blockingqueue.main;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import cn.itcast_02_blockingqueue.consumer.Consumer;import cn.itcast_02_blockingqueue.producer.Producer;public class Test {    public static void main(String[] args) throws Exception {        BlockingQueue
queue = new LinkedBlockingQueue
(2); // BlockingQueue
queue = new LinkedBlockingQueue
(); // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE // BlockingQueue
queue = new ArrayBlockingQueue
(2); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); for (int i = 0; i < 3; i++) { new Thread(producer, "Producer" + (i + 1)).start();//线程就是这样启动的
}        for (int i = 0; i < 5; i++) {            new Thread(consumer, "Consumer" + (i + 1)).start();        }                new Thread(producer, "Producer" + (5)).start();    }}
package cn.itcast_02_blockingqueue.producer;import java.util.concurrent.BlockingQueue;public class Producer implements Runnable {      BlockingQueue
queue; public Producer(BlockingQueue
queue) { this.queue = queue; } @Override public void run() { try { System.out.println("I have made a product:" + Thread.currentThread().getName()); String temp = "A Product, 生产线程:" + Thread.currentThread().getName(); queue.put(temp);//如果队列是满的话,会阻塞当前线程 } catch (InterruptedException e) { e.printStackTrace(); } } }
package cn.itcast_02_blockingqueue.consumer;import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable{      BlockingQueue
queue; public Consumer(BlockingQueue
queue){ this.queue = queue; } @Override public void run() { try { String consumer = Thread.currentThread().getName(); System.out.println(consumer); String temp = queue.take();//如果队列为空,会阻塞当前线程 System.out.println(consumer+"get a product:"+temp); } catch (InterruptedException e) { e.printStackTrace(); } } }

 

4.kafka 和 redis的应用,Storm中用到了很多的Blockquene

PS: 比如,双十一在大屏上显示实时的消费金额,  在上面我们讲到,kafka她类似于消息队列,spout从kafka中那数据,根据业务把数据分发到各个节点blot取出来处理。把每一个任务再细分拓扑,到redis,然后再显示到屏幕上。  比如你在淘宝上下订单,并不是保存到数据库那么简单的。

 

 

转载于:https://www.cnblogs.com/bee-home/p/7839974.html

你可能感兴趣的文章
前台实现ajax 需注意的地方
查看>>
Jenkins安装配置
查看>>
个人工作总结05(第二阶段)
查看>>
Java clone() 浅拷贝 深拷贝
查看>>
深入理解Java虚拟机&运行时数据区
查看>>
02-环境搭建
查看>>
spring第二冲刺阶段第七天
查看>>
搜索框键盘抬起事件2
查看>>
阿里百川SDK初始化失败 错误码是203
查看>>
透析Java本质-谁创建了对象,this是什么
查看>>
BFS和DFS的java实现
查看>>
关于jquery中prev()和next()的用法
查看>>
一、 kettle开发、上线常见问题以及防错规范步骤
查看>>
eclipse没有server选项
查看>>
CRC码计算及校验原理的最通俗诠释
查看>>
QTcpSocket的连续发送数据和连续接收数据
查看>>
使用Gitbook来编写你的Api文档
查看>>
jquery扩展 $.fn
查看>>
Markdown指南
查看>>
influxDB的安装和简单使用
查看>>