`

线程同步工具类

阅读更多

同步工具类包括信号量(Semaphore)、栅栏(barrier)、闭锁(CountDownLatch)

 

闭锁(CountDownLatch)

public class RunMain {
	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
		final CountDownLatch startGate = new CountDownLatch(1);
		final CountDownLatch endGate = new CountDownLatch(nThreads);

		for (int i = 0; i < nThreads; i++) {
			Thread t = new Thread() {
				public void run() {
					try {
						startGate.await();
						try {
							task.run();
						} finally {
							endGate.countDown();
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

				};
			};
			t.start();
		}
		long start = System.nanoTime();
		startGate.countDown();
		endGate.await();
		long end = System.nanoTime();
		return end - start;
	}

}

 

FutrureTask

     FutureTask也可以做闭锁,FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于一下3中状态

  • 等待运行
  • 正在运行
  • 运行完成:表示计算的所有可能结束方式包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会永远停止在这个状态上。

    Future.get的行为取决于任务的状态,如果任务已经完成,那么get会立即返回结果,否则将阻塞知道完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

public class RunMain {

	private FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
		public String call() throws Exception {
			System.err.println("数据加载中");
			Thread.sleep(3000);
			return "数据加载完成";
		};
	});

	private Thread thread = new Thread(future);

	public void start() {
		thread.start();
	}

	public String get() throws InterruptedException, ExecutionException {
			return future.get();
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException{
		RunMain run  = new RunMain();
		run.start();
		System.out.println(run.get());
	}
}

 

信号量(Semaphore)

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量,计数信号量还可以用来实现某种资源池或者对容器施加边界

public class RunMain<T> {
	private Set<T> set;
	private Semaphore sem;

	public RunMain(int bound) {
		set = Collections.unmodifiableSet(new HashSet<T>());
		sem = new Semaphore(bound);
	}

	public boolean add(T t) throws InterruptedException {
		sem.acquire();
		boolean boo = false;
		try {
			boo = set.add(t);
			return boo;
		} finally {
			if (!boo)
				sem.release();
		}
	}

	public boolean remove(T t) {
		boolean boo = set.remove(t);
		if (boo)
			sem.release();
		return boo;

	}
}

 

栅栏(barrier) 

 所有线程必须同时到达栅栏的位置,才能继续执行,闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置聚集,它在并行迭代算法中非常有用。

代码摘自:http://www.cnblogs.com/dolphin0520/p/3920397.html

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }
        });
         
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics