线程同步&异步

同步&异步

1. 同步等待

1.1 什么是保护性暂停

In concurrent programming, guarded suspension is a software design pattern for managing operations that require both a lock to be acquired and a precondition to be satisfied before the operation can be executed. The guarded suspension pattern is typically applied to method calls in object-oriented programs, and involves suspending the method call, and the calling thread, until the precondition (acting as a guard) is satisfied.

—— wikipedia

保护性暂停模式是让一个线程等待另一个线程的结果。

举个例子说明:现在有两个线程,Thread1负责写入结果,Thread2负责读取结果。

同步
import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);


        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            response.set(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            System.out.println(response.get());
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

按照上面的代码,两个线程独立执行,thread2能否获取response的值全靠缘分。

既然要控制先后顺序,自然就要使用wait和notify进行同步(先忽略join、future、CountdownLatch等同步工具)

import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {
        //公共变量
        AtomicReference<Integer> response = new AtomicReference<>(null);

        Thread thread1 = new Thread(()->{
                System.out.println("thread1 before set .....");
                synchronized(response) {
                    response.set(1);
                    response.notify();
                }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
                System.out.println("thread2 before get .....");
                synchronized(response) {
                    //如果已有response直接获取,否则阻塞
                    if (response.get() == null){
                        try {
                            response.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(response.get());

                }

                System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }


}

为了代码的简洁和易用,我们将同步逻辑封装到一个对象中,就实现了一个简易的保护性暂停对象:

保护性暂停
import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws Exception {

        GuardObject guardObject = new GuardObject();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            guardObject.setObj(1);
            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            try {
                System.out.println(guardObject.getObj());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

   static class GuardObject{
        private Integer response;

        public synchronized int getObj() throws InterruptedException {
            if(response == null){
                wait();
            }
            return response;
        }

        public synchronized void setObj(Integer value){
            response = value;
            notify();
        }
    }
}

JDK中的thread.join、FutureTask都是用保护性暂停的设计模式来实现的。

1.2 FutureTask

  1. 入口: excutor.submit(callable)
excutor
  1. 生成RunnableFuture, 加入Worker。线程池会把Callable对象包装进 RunnableFutureRunnableFuture既是Future又是Runnable。然后执行该
// java.util.concurrent.AbstractExecutorService

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException       {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
} 


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
  1. 工作线程调用 FuturerTask的run方法。
// java.util.concurrent.ThreadPoolExecutor#addWorker   
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. Future调用get, 如果没有返回值(outcome变量),会将当前线程封装进WaitNode,并调用UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q)存入waiters(头插法), 然后调用LockSupport.unpark阻塞当前线程。
//java.util.concurrent.FutureTask
	public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
	}


    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  1. FuturerTask的run方法执行完毕,通过set(result)将结果赋值给outcome, 并激活waiters中所有的阻塞
//java.util.concurrent.FutureTask   
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

2. 异步处理

FutureTask是一种进程等待另一个进程执行结束(同步), 但有时候我们需要的是异步操作,又该如何设计呢?通过注册+回调机制来实现。

FutureAsync
import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        FutureAsync<Integer> future = new FutureAsync<>();

        Thread thread1 = new Thread(()->{
            System.out.println("thread1 before set .....");
            try {
                Thread.sleep(1000);
                future.fireSuccess(1);
            } catch (InterruptedException e) {
               future.fireFailure();
            }

            System.out.println("thread1 after set .....");

        });

        Thread thread2 = new Thread(()->{
            System.out.println("thread2 before get .....");
            future.addListener(new FutureCallback() {
                   @Override
                   public void onSuccess(Object value) {
                       System.out.println(String.valueOf(value));
                   }

                   @Override
                   public void onFailure() {
                       System.out.println("error");
                   }
            });

            System.out.println("thread2 after get .....");
        });

        thread1.start();
        thread2.start();

    }

    static class FutureAsync<T>{
       List<FutureCallback> callbacks = new ArrayList<>();

       void addListener(FutureCallback callback){
           callbacks.add(callback);
       }

       void fireSuccess(T value){
           callbacks.stream().forEach((callback)->{callback.onSuccess(value);});
       }
        void fireFailure(){
            callbacks.stream().forEach((callback)->{callback.onFailure();});
        }
    }

    interface FutureCallback<T>{
        void  onSuccess(T value);
        void onFailure();
    }
}