Skip to content

Chapter 04, Synchronizing concurrent operations

Dong Ha. Park edited this page Nov 27, 2015 · 15 revisions

Chap 04 : 병렬적인 연산들의 동기화

Synchronizing concurrent operations에 대한 번역.

현재 번역은 '진행 중'입니다. 후반부에서 appendix C에 대한 참조가 있었습니다. 사진을 추가하면서 코드를 작성해 같이 추가할 예정입니다.


1. 특정 이벤트 또는 조건 대기

Section 4.1 : Waiting for Event

당신이 야간 열차로 여행한다고 가정하자. 역에 하차하는 한가지 방법은 밤새 깨어 있으면서 열차가 멈출때 확인하는 것입니다. 역을 놓치진 않겠지만 도착했을 때 피곤할 것이다. 다른 방법으로는 기차가 도착 예정 시간을 시간표에서 보고 도착하기 전에 알람을 설정하고 잠을 자는 것이다. 열차가 지연되면, 너무 일찍 깰수도 있지만 역을 지나치지 않으니 괜찮다. 또 알람 시계의 베터리가 없어, 역을 지나쳐 깊은 잠을 잘 가능성이 있다. 언제 자면서 갈 수 있고 누군가가 기차가 역에 도착했을 때 깨워준다면 얼마나 이상적 이겠는가.

쓰레드와 어떤 관련이 있는가? 한 쓰레드가 작업이 다른 쓰레드의 작업이 끝나기를 기다린다면, 여러 옵션이 있다. 첫째, 공유된 데이터(뮤텍스로 보호된)의 플레그를 확인하고, 다른 쓰레드는 작업이 완료되었을 때 플레그를 설정한다. 쓰레드가 반복적으로 플레그를 체크하는데 프로세스를 소비하는 것과 대기 쓰레드에 의해 뮤텍스가 잠기는 것은 낭비다. 임의의 다른 쓰레드에 의해 잠길 수 없다. 모두가 대기하고 있는 쓰레드에 대해 동작한다

두번째 옵션은 대기 쓰레드가 체크하는 사이에 std::this_thread::sleep_for()함수를 사용하여 짧은 시간동안 잠기는 것이다.루프안에서 함수는 대기하기 전에 뮤텍스틑 해제하고 잠그는 것을 반복해서 다른 쓰레드가 플레그를 설정할 수 있는 기획를 얻는다. 이것은 쓰레드가 자는 동안 프로세스 시간을 소비하지 않기 때문에 개선이지만 잠든동안에 권한을 얻기 어렵다. 짧은 대기시간은 여전히 프로세스 시간을 소비하고, 긴 대기시간은 작업이 완료되어도 대기하여 지연이 발생된다. 오버슬립이 프로그램 운영에 직접 영향을 주는 것은 드물지만 게임에서 프레임이 버려지거나 실시간 어플리케이션에서 시간 분할의 오버런을 의미한다.

셋째 바람직한 옵션은 C++ 표준 라이브러리의 이벤트 대기 기능을 이용하는 것이다. 다른 쓰레드에 의해 트리거된 이벤트를 기대라는 기본적인 메카리즘은 조건 변수이다. 개념적으로 조건변수는 이벤트, 다른 조건 그리고 조건이 충족하기를 기다릴 수 있는 쓰레드들과 관련이 있다. 쓰레드는 조건이 충족된것을 결정하며, 프로세스를 진행하기 위해 조건 변수를 기다리는다른 쓰레드에 통보를 할 수 있다.

조건변수를 사용해 특정 조건 기다리기

Waiting for a condition with condition variables

C++ 표준 라이브러리는 2개의 조건 변수 std::condition_variable, std::condition_variable_any 를 제공한다. 모두<condition_variable> 라이브러리 헤더에 선언되었다. 모두 적절한 동기화를 제공하기 위해 뮤텍스와 함께 작동할 필요가 있다. 전자는 std::mutex와 작업하는 것으로 제한되는 것에 반하여 후자는 뮤텍스와 같게되는 최소 기준과 작업할 수 있다. 그래서 _any 접미사가 붙는다. 따라서 std::condition_variable_any는 좀더 일반적이고, 규모, 성능, 시스템 리소스에 대해서 추가 비용의 가능성이 있다. 그래서 std::condition_variable는 유연성이 필요하지 않으면 선호된다.

그래서 어떻게 데이터가 처리될때가지 잠긴 상태에서 대기중인 쓰레드 예제를 처리하기 위해 std::condition_variable을 사용하는가? 첫째로, 당신은 두 개의 스레드 사이의 데이터를 전달하는 데 사용될 큐를 가지고있다.데이터가 준비되면 쓰레드는 std::lock_guard을 사용하여 큐를 보호하고 데이를 큐에 삽입한다. 그리고 대기하고 있는 쓰레드에게 알리기 위해 std::condition_variable에 있는 notify_one()을 호출한다.

펜스의 다른쪽에 처리중인 쓰레드가 있습니다. 쓰레드는 첫번째로 뮤텍스를 잠그나, 이번에는 std::lock_guard보다 std::unique_lock이 좋은데 잠시 후 이유를 알 수 있다. 쓰레드는 std::condition_variable 에 있는 wait()를 호출하고 나서, 잠금 객체와 대기를 위한 람다 표현 조건 통과한다. 람다 함수는 다른 표현의 일부로 익명 함수를 작성할 수 있도록 C++11에서 추가되었고 wait()와 같은 표준 라이브러리 함수에 조건을 지정하는데 적합하다. 이경우 단순한 람다함수는 data_queue가 비었는지 체크한다—어떤 데이터가 큐안에서 처리될 준비가 되었다면

람다 함수는 상세한 설명을 appendix A를 참조

wait()의 구현은 조건을 검사하고 (제공된 람다함수를 호출하여) 조건을 만족하면 true를 만족하지 않으면 false를 반환한다. wait()는 뮤텍스를 해제하고 대기상태또는 잠겨있는 쓰레드에 넣는다. 조건변수는 데이터가 준비된 쓰레드에서 notify_one()을 호출하여 통보하면, 잠든 상태(잠기지 않은)에서 쓰레드는 깨어나 뮤택스 잠금을 다시 획득하고, 상태를 다시 체크하고 조건이 만족한다면 잠김 뮤텍스와 wait()에서 반환된다. 조건을 충족하지 않으면 쓰레드는 뮤텍스 잠금을 해제하고, 대기를 다시 시작한다. 이것이 std::lock_guard대신에 std::unique_lock이 필요한지에 대한 이유다. —대기중인 쓰레드는 대기하고 잠기는 동안 다시 뮤텍스 잠금을 해제해야 하고. std::lock_guard은 유연함을 제공하지 않는다. 뮤텍스는 쓰레드가 잠든동안 잠근 상태를 유지한다면 , 데이터 준비 쓰레드는 큐에 아이템을 추가할때 뮤텍스 잠금을 할 수 없을 것이고, 대기중인 쓰레드는 조건을 충족하는 것을 볼 수 없을 것이다.

리스트 4.1에서 큐가 비어있지 않았음을 체크하는 동안 기다리기 위해 단순한 람다 함수를 사용하지만, 어느 함수나 호출된 객체는 절달될 수 있다. 이미 상태를 확인하는 함수가 있다면 이 함수를 직접 전달 할 수 있다. 람다로 포장할 필요가 없다. wait()를 호출하는 동안에 조건 변수는 임의의 회수로 체크할 수 있다. 그러나 항상 뮤텍스로 잠그고 조건 테스트는 true를 반환하는 함수를 반환해야 한다. 대기 쓰레드가 뮤택스 상태 체크가 필요할때 다른 쓰레드로부터 통지를 바로 받지 않았다면 이것을 가짜 깨움이라고 부른다. 번역 이상 그 상태 체크에 사이드 임팩트가 있는 함수는 사용하는 것은 바람직하지 않다. 그래도 사용한다면 여러번 발생되는 사이드 임팩트에 대해 준비해야 한다. std::unique_lock의 가변적인 해제는 wait()를 호출할때 사용되지 않습니다. 이또한 데이터를 처리할때 한번 사용되나 그것을 처리하기 전입니다. 처리데이터는 챕터 3에서 본것같이 시간을 많이 소비하는 작업을 할 수 있다. 이것은 필요 이상으로 뮤택스에 대한 잠금을 유지하는 나쁜 아이디어다. 큐를 사용하여 쓰레드간 데이터를 전송하는 것은 4.1절의 공통된 사니리오다. 이것은 잘된다. 동기화는 큐 자체에 제한을 할 때 동기화 문제와 경쟁 상태에 가능한 수를 최대한 줄일 수 있다. 이런한 관점에서 목록 4.1에서 지금 일반적인 쓰레드에 안전한 큐를 추출하는 작업을 할 수 있다.

조건변수를 이용해 thread에 안전한 큐(queue) 만들기

Building a thread-safe queue with condition variables

일반적인 큐를 설계한다면 세션 3.2.3으로 돌아가 쓰레드에 안전한 스택처럼 요구될 가능성이 있는 것에 대해 몇 분을 소비할 가치가 있다. C++ 표준 라이브러리를 살펴보면 std::queue<> 컨테이너 어뎁터는 다음 목록을 보여준다. 생성자,할당자와 교환 작업을 무시하면 전체 큐의 상태를 조회하는 것과 큐의 요소를 조회하는 것 큐를 수정하는 3개의 오퍼레이션 그룹이 남는다.이것은 세션 3.2.3의 스택과 같이 인터페이스에 내제된 경쟁상태에 관한 동일한 문제가 있다. 따라서 단일 함수를 호출할 때 스택의 top()pop()의 조합처럼 front(), pop()의 조합이 필요하다. 목록 4.1에서 새로운 뉴앙스를 추가했다. 쓰래드간 큐를 사용할 때 수신 쓰레드는 자주 데이터를 기다릴 필요가 있어 2가지의 pop(), try_pop()을 제공한다. try_pop()은 큐의 데이터를 pop할때 바로 반환한다. wait_and_pop은 데이터가 있을 때까지 대기한다. 스택처럼 생성자와 할당자를 제거하여 코드를 줄였다. try_pop()은 값을 반환할 상태면 true 아니면 false를 반환한다. wait_for_pop()은 값을 바로 반환하기 때문에 반환값이 없다.

목록 4.1에서 push()와 wait_and_pop()을 뽑아낼 수 있다. 뮤택스와 조건 변수는 threadsafe_queue 인스턴스에 포함되지 않으며, push()를 호출하기 위해서 별도의 변수들은 더이상 필요하지 않는다. 또한 wait_and_pop은 조건변수를 관리하기 위해 기다린다. wait_and_pop은 목록 3.5.에 stack 예가 그대로 사용된다. empty()는 상수 멤버 함수이지만 다른 파라미터가 있는 복사 생성자를 참조한다.다른 쓰레드는 비 상수 객체를 참조할 수 있는데 이를 돌연변이 멤버함수라고 부른다. 그래서 뮤택스 잠금이 필요하다. 뮤텍스로 잠그는 동안 돌연변이 동작이므로 empty()와 복사생성자를 잠글 수 있게 뮤택스 오브젝트는 변경 가능한 표시를 해야 한다.

조건 변수 또한 한개 이상의 쓰레드가 같은 이벤트를 대기할때 유용하다. 쓰레드가 작업부하를 분할하는데 사용된다면 한나의 쓰레드는 통지에 응답해야 한다.목록4.1와 같이 동일한 구조로 사용할 수 있으며 오직 데이터 처리 프로세스의 여러 인스턴스를 실행한다.새로운 데이터가 준비되면 notify_one()을 호출하여 현재 wait()가 실행중인 쓰레드중의 하나를 트리거하여 상태를 검사하고 wait()는 반환된다.( 단지 data_queue에 아이템을 추가했기 때문에)쓰레드가 통보 받가나 통보를 기다린다는 보장은 없다. 모든 프로세싱 쓰레드는 여전히 데이터를 처리할 것이다.

다른 가능성은 여러 쓰레드가 가은 이벤트를 기다리고 모두 응답이 필요하면 공유데이터를 초기화되는 경우가 발생될 수 있다. 그리고 프로세싱 쓰레드들은 모두 같은 데이터를 사요하지만 초기화를 기다릴 필요가 있다.쓰레드들은 주기적으로 재초기화와 같은 공유 데이터의 업데이트를 기다릴 필요가 있다. 이경우 쓰레드는 notify_one()보다는 notify_all()을 호출할 수 있는 데이터를 준비해야 한다. 대기 쓰레드는 오직 한번 기다린다. 그래서 상태가 true이면 조건 변수를 다시는 기다리지 않는다. 조건 변수는 동기화 메커니즘의 최선의 선택을하지 않을 수 있다. 가용한 데이터의 특징을 기다리는 것이 옳다.

2. future를 사용한 1회성 이벤트 대기

Section 4.2 : Waiting for One-off events with futures
On-work

이런 경우를 생각해보죠. 당신은 비행기를 타고 해외로 휴가를 가는 중입니다. 공항에 도착하면 다양한 탑승수속이(check-in procedures) 있죠. 비행기에 탑승하기 전까지 시간을 어떻게 보내든 아무런 문제도 없습니다. 하지만 비행기는 한번 뿐입니다. 놓치면, 그 다음은 없죠.

C++ 표준 라이브러리는 이런 종류의 1회성 이벤트를 future라는 이름으로 모델화 합니다. 만약 한 thread가 특별한 1회성 이벤트를 기다려야 한다면, 그 스레드는 future라고 불리는 1회성 이벤트를 얻습니다.(obtain) 그 스레드는 짧은 시간동안, 주기적으로 이 이벤트가 발생했는지 검사합니다. (앞선 비행기 예시에서 출발시간표를 확인하는 것과 같습니다.) 다른 일(task)을 진행하면서 말이죠.(비싼 공항카페에서 식사하는 것처럼). 달리 말해(Alternatively), 스레드는 future가 필요할 때 까지 다른일들을 진행하고, 그 future가 발생하면서 ready상태가 될 때까지 기다립니다. future는 스레드가 필요로 하는 데이터가 있겠죠. (가령 비행기를 몇번 게이트에서 탑승하는지...) 또는 그렇지 않을 수도 있습니다. 한번 이벤트가 발생해서 future가 ready가 되면, 그 future는 다시 초기화(reset)될 수 없습니다.

C++ 표준 라이브러리에서 future는 2 종류로 나뉩니다. <future> 라이브러리 헤더에 2개의 템플릿 클래스로 선언되어있죠 : unique futures(std::future<>)와 shared future(std::shared_future<>). 이 future들은 std::unique_ptrstd::shared_ptr을 따라서 만든 것입니다. std::future의 인스턴스는 관련된 이벤트를 표현하는 하나의, 유일한 인스턴스입니다. 반면, std::shared_future는 하나의 같은 이벤트에 대해서, 여러 인스턴스를 표현하는(refer) 것이죠. 이 경우, 지정된 이벤트가 발생하면 shared_future의 모든 인스턴스는 동시에 ready 상태가 되고, 이벤트와 관련된 데이터에 접근하게 됩니다.

In the latter case, all the instances will become ready at the same time, and they may all access any data associated with the event.

이 연관 데이터들이 바로 future가 템플릿인 이유입니다. std::unique_ptr, std::shared_ptr처럼, 템플릿 매개변수(parameter)는 연관 데이터의 타입입니다. std::future<void>std::shared_future<void>는 연관데이터가 없다는 것을 실체화(specialize)한 것이죠. 비록 future들이 thread들 간의 통신에 사용되긴 하지만, future 객체 자신들은 동기화된 접근을 제공하지 않습니다. 만약 다수의 thread들이 하나의 future 객체에 접근해야 한다면, 3장에서 언급한 것처럼 mutex나 다른 동기화 메커니즘을 사용해 future를 보호해야 합니다. 하지만, 다수의 thread들이 각자의 std::shared_future<> 복사본(copy)에 동기화 없이 접근할 수도 있다는 것을 4.2.5에서 보게 될 것입니다.

1회성 이벤트의 기초중의 기초는, 계산(calculation)의 결과는 백그라운드에서 진행된다는 것입니다. 2장 에서 std::thread는 진행한 작업의 결과물을 반환하는 쉬운 방법을 제공하지 않는 다는 것을 봤을 것입니다. 그리고 약속하건대, future를 사용해서 이 문제가 어떻게 해결되는지 4장에서 보게 될 겁니다. 이제 어떻게 하는지 봅시다.

백그라운드 작업으로부터 값을 받기

Returning values from background tasks

이런 가정을 해봅시다. 당장 값이 필요하진 않지만, 긴 시간 후에 유용한 결과를 도출하는 계산(calculation)이 있다고 말입니다. 당신은 더글라스 애덤스가 [삶, 대학, 그리고 모든것에 대한 답]에서 언급한 예시를 반박하는 방법을 찾았을 수도 있겠죠. 당신은 새 thread가 그 계산을 수행하도록 할 수 있지만, 그 방법은 당신이 스레드가 결과를 전송하도록 관리해줘야 한다는 것을 의미합니다. 왜냐하면 std::thread는 그렇게 결과를 전송하는 직접적인 메커니즘을 제공하지 않기 때문이죠. 바로 이 부분에서 std::async라는 함수 템플릿 (마찬가지로 <future> 헤더에 선언되어 있는)이 사용됩니다.

std::async는 당장 값이 필요하지는 않은, 비동기적인 작업을 시작하기 위해 사용됩니다. std::async는 당신에게 std::thread 객체가 아니라, std::future 객체를 반환합니다.이 future 객체는 결과적으로 함수의 반환값을 저장하고 있죠. 값이 필요해지만, 당신은 단순하게 future객체에서 get() 함수를 호출하기만 하면 됩니다. 그리고 이 get() 함수를 수행한 스레드는 future 객체가 ready상태가 되고 값을 반환할 때까지 블록되어 대기하게 되죠. 아래의 코드는 간단한 예시를 보여줍니다.

// [Listing 4.6]
// Using  std::future to get the return value of an asynchronous task
// std::future를 사용해 비동기적인 작업으로부터 반환값 받기.

#include <future>
#include <iostream>

int find_the_answer_to_ltuse();
void do_other_stuff();
int main()
{
     std::future<int> the_answer = std::async(find_ther_answer_to_ltuse);
     do_other_stuff()
     std::cout<<"The answer is "<< the_answer.get() <<std::endl;
}

std::async는 함수를 호출할 때 추가로 인자를 넘겨줄 수 있습니다. std::thread와 같은 방식이지요. 만약 첫 인자(argument)가 멤버 함수에 대한 포인터라면, 2번째 인자는 멤버함수를 적용할 오브젝트여야 합니다. ( 직접적으로 또는 포인터로 또는 std::ref()로 감싸서 넘겨주어야 하죠.) 그리고 남은 인자들은 멤버함수의 인자로 전달됩니다. 그렇지 않다면, 두번째 이후의 인자들은 첫번째 인자의 함수 또는 호출가능한 객체(callable object)로 전달됩니다. std::thread에서와 동일하게, 만약 인자가 r-value라면, moving을 통해서 원본의 복제(copy)가 생성됩니다. 이런 동작방식은 move-only 타입인 함수객체나 인자를 사용할 수 있게 해주죠. 다음 예시를 봅시다.

// [Listing 4.7]
// Passing arguments to a function with std::async
// std::async로 호출하는 함수에 인자 전달하기

#include <string>
#include <future>

struct X
{
     void foo(int, std::string const&);
     std::string bar(std::string const&);
};

X x;

// p->foo(42,"Hello")를 호출한다. 이때 p는 변수 x의 주소이다.
auto f1 = std::async(&X::foo, &x, 42, "Hello");

// tmpx.bar("goodbye")를 호출한다. 이때 tmpx는 x의 복사본(copy)이다.
auto f2 = std::async(&X::bar, x,  "goodbye");

struct Y
{     
     double operator()(double);
};

Y y;


// Y클래스 생성자로부터 이동 생성된 tmpy를 사용해 tmpy(3.141)을 호출한다.
auto f3 = std::async(Y(), 3.141);

// y(2.718)을 호출한다.
auto f4 = std::async(std::ref(y), 2.718);

X baz(X&);

// baz(x)를 호출한다.
std::async(bax,std::ref(x));

class move_only
{
     move_only();
     move_only(move_only&&)
     move_only(move_only const&) = delete;
     move_only& operator=(move_only&&);
     move_only& operator=(move_only const&) = delete;

     void operator()();
};

// std::move(move_only())로부터 생성된 tmp를 사용해 tmp()를 호출한다.
auto f5 = std::async(move_only());

기본적으로, std::async가 새로운 스레드를 시작하도록 하는지, future이 대기중이면 동기적으로 작업을 수행하는지는 구현에 따라서 다릅니다. 대부분의 경우 이것은 당신이 원하는 것일 테지만, 당신이 추가적인 매개변수를 사용해서 std::async()가 함수를 호출하기 전에 명시할수도 있습니다. 이 매개변수의 타입은 std::launch이고, std::launch::deferred가 될수도 있습니다. std::launch::deferred의 경우는 함수의 호출이 get()이나 wait()이 나중에 호출될 때까지 연기되죠. std::launch::async는 함수가 독립된 스레드에서 수행되어야 한다는 것을 지시합니다(indicate). 또는, std::launch::deferred|std::launch::async로 구현에 따라서 선택되도록 지시할 수 있습니다. 가령,

// 새로운 thread를 생성하여 실행
auto f6 = std::async(std::launch::async,Y(), 1.2);

// wait()이나 get()이 호출되면 실행
auto f7 = std::async(std::launch::deferred, baz, std::ref(x));

// 구현에 따라서 선택되도록 지정
auto f8 = std::async(
     std::launch::deferred | std::launch::async,
     baz, std::ref(x)  );

// 구현에 따라서 선택되도록 지정
auto f9 = std::async( baz, std::ref(x) );

// deferred된 함수를 호출
f7.wait();

나중에 이 챕터와 챕터 8 에서 확인할 수 있겠지만, std::async의 사용은 알고리즘들을 병렬적으로 수행가능한 작업들로 나누는것을 쉽게 만들어줍니다. 하지만, 이것이 std::future를 작업과 연관시키는 유일한 방법은 아닙니다; 당신은 작업을 std::packaged_task<> 클래스 템플릿의 인스턴스로 감싸거나, std::promise<> 클래스 템플릿으로 명시적으로 값을 설정하도록 코드를 작성하는 것이 가능합니다. std::packaged_taskstd::promise보다 좀 더 높은 수준의 추상화인데, 지금부터 살펴보죠.

future와 작업 연관시키기

Associating a task with a future

std::packaged_task<> 는 future를 호출가능한 객체나 함수와 묶습니다(tie). std::packaged_task<>객체가 호출되면, 이 객체는 연관된 함수나 호출가능한 객체를 호출한 뒤, 반환값이 저장되면 future객체를 ready상태로 만듭니다. 이런 동작은 스레드 풀(9장을 확인하세요) 또는 다른 작업관리 체계(scheme)를 만드는 기초로 사용될 수 있죠. 예컨대 각 작업을 개별 thread에게 맡기고, 각각의 백그라운드 스레드에서 작업을 순차적으로 진행하는 것이죠. 만약 많은 연산이 작은 작업들로 분할될 수 있다면, 그 각각의 분할된 작업들은 std::packaged_task<> 인스턴스로 감쌀 수 있습니다. 그 다음엔 그 인스턴스를 작업 스케줄러나 스레드 풀에 전달하는 겁니다. 이러면 작업의 구체적인 내용들이 추상화 되죠; 스케줄러는 개별적인 함수들이 아니라 단순하게 std::packaged_task<> 인스턴스만 관리하게 됩니다.

std::packaged_task<> 클래스 템플릿에서 템플릿 파라미터는 함수의 시그니처(function signature)입니다. 매개변수를 받지 않고 반환값도 없는 함수처럼 void()이거나, 비-상수 레퍼런스와 double형 포인터를 전달받고 int값을 반환하는 함수처럼 int(std::string&, double*)일 수있죠. 당신이 std::packaged_task의 인스턴스를 생성할 때, 당신이 넘겨주는 함수나 호출가능한 객체는 반드시 지정된 매개변수를 받아들이고, 지정된 반환 타입으로 형변환가능한(convertible) 타입을 반환해야 합니다. 타입이 정확히 맞을 필요는 없습니다; int매개변수를 받아 float를 반환하는 함수로부터 std::packaged_task<double(double)>를 생성할수도 있습니다. 이 타입들을 묵시적으로 변환가능하니까요.

지정된 함수 시그니처 반환 타입은 get_future() 멤버함수로부터 std::future<> 타입이 반환되도록 합니다. 반면에, 함수 시그니처의 인자들은 packaged task의 함수 호출 연산자의 시그니처를 지정하는데 사용됩니다. 예를 들어 보죠. std::packaged_task<string::string(std::vector*,int)>의 부분적인 정의를 적어보면 이렇습니다.

// [Listing 4.8]
// Partial class definition for a specialization of std::packaged_task<>
// std::packaged_task<> 특수화(실체화)의 부분적인 정의

template<>
class packaged_task<std::string(std::vector<char>*, int)>
{
public:
     template<typename Cllable>
     explicit packaged_task(Callable&& f);
     std::future<std::string> get_future();
     void operator()(std::vector<char>*,int);
}

std::packaged_task 객체는 그러니 호출 가능한 객체이고, std::function 객체로 감쌀 수 있습니다. std::function으로 감쌀 수 있으니 std::thread로 스레드 함수처럼 넘겨줄 수 있죠. 호출 가능한 객체를 요구하는 함수로 넘겨지거나, 직접적으로 호출될수도 있습니다.

이에 관련된 예시로 POSIX 시그널 핸들러와 앞서 언급한 std::async()를 예시로 들 수 있을 것 같습니다.

  • 박 동하

std::packaged_task가 함수 객체로서 호출되면, 인자들은 함수호출 연산자를 통해서 보유중인(contained) 함수로 전달됩니다. 그리고 결과는 std::future에 비동기적으로 저장되어, get_future() 함수를 통해서 획득할 수 있죠. 그러니 당신은 std::packaged_task 로 작업을 감싸놓고, future를 획득하면 됩니다. future를 가져오기 전에 std::packaged_task객체를 어딘가에서 호출되도록 해야하겠죠. 당신이 결과값을 필요로 하면, 당신은 futureready가 될 때까지 대기합니다. 아래의 예시는 이 동작을 보여줍니다.

thread들 간의 작업 전달(이전)

Passing tasks between threads

// [Listing 4.9]
// Running code on a GUI thread using std::packaged_task
// std::packaged_task를 사용한 GUI 스레드에서의 실행 코드

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();

// 1. 
void gui_thread()
{
     // 2. 
     while(!gui_shutdown_message_received() )
     {
          // 3. 
          get_and_process_gui_message();
          std::packaged_task<void()> task;
          {
               std::lock_guard<std::mutex> lk(m);
               // 4. 
               if( tasks.empty() )
                    continues;
               // 5. 
               task = std::move(tasks.front());
               tasks.pop_front();
          }
          // 6.
          task();
     }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
     // 7.
     std::packaged_task<void()> task(f);
     // 8.
     std::future<void> res = task.get_future();
     std::lock_guard<std::mutex> lk(m);
     // 9.
     tasks.push_back(std::move(task));
     // 10.
     return res;
}

이 코드의 내용은 무척 단순합니다.

(1). GUI 스레드는,
(2). GUI를 끄도록(shut down)하는 메세지를 받을때 까지.
(3). GUI 메세지를 반복적으로 받아서 처리합니다.
(4). 가령 사용자가 클릭이나, 작업 큐(task queue)에 있는 작업들 같은걸 말이죠.
(5). 여튼, 루프를 반복하면서 GUI스레드는 작업큐로부터 작업을 꺼내 수행합니다.
(6). 작업 큐에 대한 잠금을 해제한 이후, 작업을 수행하죠.

작업과 연관된 future는 작업이 종료되는대로 ready 상태가 될 겁니다.
큐의 끝에 새로 작업을 추가하는것 역시 마찬가지로 단순합니다:
(7). 새로운 packaged_task가 주어진 함수로부터 생성됩니다.
(8). 이 task로부터 future를 얻게 되죠.
(9). get_future() 멤버함수를 호출하면서,
(10). 호출자(caller)에게 future가 반환되기 전에 그 작업은 리스트에 추가됩니다.

GUI 스레드로 메세지를 보내는 코드는 그 다음엔 그 메세지에 연관된 future를 기다립니다. 이때 future가 작업이 끝났다는 것을 알 필요가 있는지, 또는 알 필요가 없어서 future를 버려도 괜찮은지를 확인합니다.

The code that posted the message to the GUI thread can then wait for the future if it needs to know that the task has been completed, or it can discard the future if it doesn’t need to know.
메세지를 보낸다고 표현하였으나 (10)에서 작업리스트에 추가하는 것을 의미하는 것으로 보입니다.

  • 박 동하

이 예시는 작업을 위해 std::packaged_task<void()>를 사용합니다. 이 경우에 감싸지는 함수나 호출가능한 객체는 어떤 매개변수도 받지 않고, 반환값도 없죠. (만약 무엇인가를 반환한다면, 그 반환값은 버려집니다.) 이것은 가능한 가장 단순한 작업입니다. 하지만, 앞서 보셨듯이, std::packaged_task는 더 복잡한 상황에서도 사용될 수 있습니다- 다른 함수 시그니처를 템플릿 매개변수로 지정함으로써, 반환 타입을 바꿀수 있습니다. (그리고 그 반환 타입은 future의 연관된 상태에 저장된 데이터의 타입이죠.) 그리고 함수 호출 연산자의 인자 타입들 또한 바꿀 수 있습니다. 이 예시코드는 쉽게 확장될 수 있습니다. GUI 스레드에서 수행되는 작업들이 인자를 받거나, 단순히 완료 지시자(completion indicator)가 아니라 std::future안에 있는 값을 반환할 수 도 있습니다.

단순한 함수로 표현될 수 없거나, 결과가 여러 곳에서 올 수 있는 작업들은 어떨까요? 이러한 경우들은 future를 생성하는 3번째 방법으로 해결할 수 있습니다: std::promise 를 사용해서 명시적으로 값을 정하는(set) 것이죠.

이 부분이 시작하기 전에 std::packaged_task<>std::promise 보다 좀 더 추상화된 방법이라고 언급했었습니다. 좀 더 상세한 컨트을 위해서 std::promise의 사용이 제시됩니다.

  • 박 동하

(std::)promise 사용하기

Making (std::)promises

당신에게 수많은 네트워크 연결들을 관리하는 어플리케이션이 있을때, 각각의 연결들을 개별 스레드로 관리하고 싶은 생각이 들겁니다. 왜냐하면 이 방법은 네트워크 통신을 생각하고, 프로그램으로 만들기도 간단하게 하니까요. 이런 방식은 연결이 적을때는 (그렇기 때문에 적은 스레드로 동작중일 때는) 잘 돌아갑니다. 그러나 불행하게도, 연결이 증가하면 증가할수록, 점점 더 부적합하게 되죠; 동시에 존재하는 수많은 스레드들은 결과적으로 운영체제의 자원을 상당수 잡아먹고, 잠재적으로 성능에 영향을 주는 굉장한 컨텍스트 스위칭(실행흐름 전환)을 유발할 수 있습니다(스레드 수가 하드웨어가 지원하는 병렬성을 초과할 때 말이죠.). 극단적인 경우, 운영체제는 수많은 네트워크 연결로 인해 새로운 스레드를 실행할 자원이 부족할 수 있습니다. 이런 이유로, 매우 많은 수의 네트워크 연결을 가지는 어플리케이션에서는 연결을 관리하는 적은 수의 스레드를 가지고(가능하다면 하나만), 각 스레드는 한번에 다수의 연결을 처리하는 게 일반적입니다.

이 연결을 관리하는 스레드들 중 하나를 생각해봅시다. 데이터 패킷이 다양한 연결들로부터, 랜덤한 순서로 도착할 것입니다. 많은 경우 어플리케이션의 다른 부분들은 데이터가 성공적으로 전송되거나, 특정한 연결로부터 새 데이터 묶음이 성공적으로 수신되는것을 기다리고 있을 겁니다.

std::promise<T>는 (T 타입을 가지는) 값을 저장하는(set) 방법을 제공합니다. 이 저장된 값은 이 promise와 연관된 std::future<T> 객체에서 가져갈 수 있죠. std::promise/std::future 쌍은 앞서 말한 경우에 가능한 메커니즘을 제공합니다; 대기중인 스레드가 미래시점에 블록될 수 있고, 그 데이터를 제공하는 스레드는 promise를 사용해 연관된 값을 저장할 수 있습니다. 그리고 futureready상태가 되죠.

당신은 std::promise에서 get_future() 멤버함수를 통해서 std::future객체를 얻을 수 있습니다. std::packaged_task 와 완전히 똑같죠. promise의 값이 (set_value() 멤버함수를 사용해서) 저장되면, 그에 대응하는 futureready상태가 되고 저장된 값을 가져올 수 있게 됩니다. std::promise를 값 저장 없이 파괴하게 되면, std::future에는 예외가 값 대신 저장됩니다. Section 4.2.4 는 어떻게 예외가 스레드들 너머로 전달되는지를 묘사합니다.

[Listing 4.10]에서는 방금 설명한 연결을 관리하는 한 스레드의 예시코드를 볼 수 있습니다. 당신은 std::promise<bool>/std::future<bool> 쌍을 사용해 성공적인 출력 데이터 블록의 전송을 확인합니다. future에 저장된 값은 단순히 성공/실패 플래그죠. 수신중인 패킷에 대해서는, future와 관련된 데이터는 데이터패킷의 payload 부분입니다.

// [Listing 4.10]
// 단일 스레드에서 promise를 사용해 다수의 연결 다루기

#include <future>
void process_connections(connection_set& connections)
{
     // 1. 
     while(!done(connections))
     {
          // 2. 
          for(connection_iterator
               connection=connections.begin(),end=connections.end();
               connection!=end;
               ++connection)
          {
               // 3. 
               if(connection->has_incoming_data())
               {
                    data_packet data=connection->incoming();
                    std::promise<payload_type>& p=
                         // 4. 
                         connection->get_promise(data.id);
                    p.set_value(data.payload);
               }
               // 5. 
               if(connection->has_outgoing_data())
               {
                    outgoing_packet data=
                         connection->top_of_outgoing_queue();
                    connection->send(data.payload);
                    // 6.
                    data.promise.set_value(true);
               }
          }
     }
}

(1). process_connection() 함수는 done()이 true를 반환할 때까지 루프를 반복합니다.
(2). 매번 루프를 지날 때마다, 이 함수는 각 연결들을 하나씩 확인합니다.
(3). 만약 수신 데이터가 있으면 가져오고,
(5). 송신 데이터가 있으면 전송하죠.
이 코드는 송신 패킷이 어떤 ID와 실제 정보가 담긴 payload를 가지고 있다고 가정합니다.
(4) ID는 std::promise로 맵핑 있습니다. (아마도 관련된 컨테이너에서 찾아낼(lookup) 수 있겠죠.)
그리고 그 값은 수신패킷의 payload에 저장되죠. 송신 패킷의 경우, 패킷은 송신 큐에서 꺼내지고 연결을 통해서 전송됩니다. 한번 전송이 끝나면,
(6) 전송 정보와 연동된 promise는 전송성공을 표현하기 위해 true로 값을 저장합니다.
이 그림이 실제 네트워크 프로토콜에 들어맞는지는 프로토콜에 달려 있습니다; 이 promise/future 형태의 구조는, 비록 일부 운영체제의 비동기 입출력과 유사하긴 하지만, 특정 시나리오에서는 동작하지 않을수도 있습니다.

지금까지의 모든 코드는 예외에 대해서는 전혀 고려하지 않았습니다. 세상이 의도한 대로 잘 돌아가기만 한다면 좋지만, 실제로 그렇진 않죠. 때때로 디스크들이 꽉 찼거나, 찾고있는 것이 그 자리에 없을수도 있습니다. 네트워크가 끊기거나(fail), 데이터베이스가 뻗어버리기도 합니다(goes down). 만약 당신이 결과를 필요로하는 스레드에서 연산을 수행중이라면, 이런 코드들은 예외와 함께 에러가 발생했다고 보고만 할겁니다. 그러니, 당신이 std::packaged_task 또는 std::promise를 사용하기를 원해서 모든것이 정상적으로 동작하기를 요구하는 것은, 불필요한 제약이 될겁니다.

std::future<>를 사용한 예외처리

Saving an exception for the future

이런 짧은 코드를 생각해보죠. 만약 당신이 square_root()함수에 -1을 넘겨주면, 그 함수는 예외를 던지고, 호출루틴(the caller)에 의해서 확인됩니다.

double square_root(double x)
{
     if(x<0)
     {
          throw std::out_of_range(“x<0”);
     }
     return sqrt(x);
}

이제 현재 스레드에서 단순히 square_root()를 호출하는 대신에,

double y=square_root(-1);

이 함수를 비동기로 호출 하는 겁니다.

std::future<double> f=std::async(square_root,-1);
double y=f.get();

예외를 던지는 동작이 완전히 똑같다면 이상적일 것입니다; 두 코드 모두 y가 함수 호출의 결과를 받자마자, f.get()을 호출한 스레드가 예외를 확인할 수 있다면 좋을 것입니다. 단일 스레드 처럼 말이죠.

하지만, 실제 동작은 이렇습니다: std::async의 부분으로 함수가 호출되면, 그 예외는 future에 저장공간에 저장됩니다. 이 futureready상태가 되고, get()을 호출하는 것은 저장된 예외를 던지게 되는 것이죠.
(주의 : 표준에서는 예외 객체의 원본이 던져지는지 복사본이 던져지는지 명시하지 않았습니다; 컴파일러들과 라이브러리들은 이 경우에 제각기 다른 선택을 할 수 있습니다.)
std::packaged_task로 묶었을 때도 같은 일이 일어납니다.-작업이 호출되었을 때, 감싸진 함수가 예외를 던지면, 그 예외는 future에 결과값으로 저장되고, get()의 호출에의해 예외가 던져지는 ready상태가 됩니다.

자연히, std::promise 또한 명시적인 함수 호출에 같은 동작(facility)을 제공합니다. 값보다 예외를 저장하고자 한다면, set_exception() 멤버함수를 set_value()함수 대신 호출하면 됩니다. 이 방식은 알고리즘의 일부로 던져진 예외를 catch 블록에서 받아 처리하는 식으로 사용될 수 있을 겁니다. 받은 예외를 promise에 집어넣으려면(populate),

extern std::promise<double> some_promise;
try{
     some_promise.set_value( calculate_value() );
}
catch(...)
{
     some_promise.set_exception( std::current_exception() );
}

이 코드는 던져진 예외를 받기 위해 std::curent_exeption()을 사용합니다. 대안이 있다면 예외를 던지지 않고 std::copy_exception()을 사용해서 직접적으로 새로운 예외를 저장하는 것이죠.

some_promise.set_exception( std::copy_exception( std::logic_error("foo ") ) );

예외의 타입이 분명하다면 이렇게 하는 것이 try/catch를 블록을 쓰는것보다 훨씬 깔끔하죠. 그리고 우선적으로(in preference) 사용되어야 합니다; 코드를 단순하게 해줄 뿐만 아니라, 컴파일러에게 코드 최적화의 기회를 훨씬 더 제공하기 때문이죠.

future에 예외를 저장하는 또다른 방법은 future와 연동된 std::promise 또는 std::packaged_task를 호출하지 않고 파괴하는 겁니다. 이 두 경우 모두, futureready상태가 아니라면, std::future_errc::broken_promise의 에러 코드가 std::promisestd::packaged_task의 소멸자가 std::future_error예외를 저장할 겁니다; future를 생섬함으로써 당신은 어떤 값이나 예외를 제공한다는 promise를 만들었고, 그 약속이 깨어진 것이죠. 만약 컴파일러가 이러한 경우에 future에 아무것도 저장하지 않는다면, 대기중인 스레드가 영원히 대기할 가능성이 있습니다.

지금까지 std::future에 관한 예시를 모두 살펴보았습니다. 하지만, std::future는 하나의 스레드만 future에 저장되는 결과를 기다릴 수 있게 된다는 한계들이 있습니다. 만약 당신이 같은 이벤트를 하나 이상의 스레드로부터 기다려야 한다면, std::shared_future를 대신 사용해야합니다.

다수의 thread로부터의 접근대기

Waiting from multiple threads

데이터 경쟁을 피하기 위한 다수의 std::shared_future 객체들의 사용

3. 시간 제한을 이용한 대기

Section 4.3 : Waiting with a time limit
교정 대기 중

앞서 언급한 블럭 함수들은 이벤트가 발생할때까지 스레드를 무기한 정지시킵니다. 대부분의 경우 이건 문제가 되지 않죠. 하지만 몇몇 경우에, 얼마나 기다릴 것인가 라는 시간제한을 두고 싶을 수 있을 겁니다. 이런 기능이 있다면 "난 아직 살아있어" 같은 형태의 프로세스 상호간 메세징이나, 유저가 기다리는 것을 포기하고 Cancel버튼을 눌렀을때 종료시키는 것도 가능할 겁니다.

timeout(시간제한)을 두는데는 2가지 방법이 있습니다. 일정한 시간동안 대기하는 duration-based 시간제한과, 또는 특정한 시점까지 대기하는 absolute 시간제한이죠. 대부분의 waiting 함수들은 다양한 형태로 두 시간제한을 다룰 수 있게 해줍니다. _for로 끝나는 함수는 duration-based timeout 이고, _until로 끝나면 absolute timeout이죠.

예컨대, std::condition_variable이 두 가지의 재정의된(overload) 버전을 가지고 있다고 해봅시다. wait()을 오버로드한 wait_for()와, wait_until()로 말이죠. 이 함수들에는 전제(supplied predicate)가 있어서, 기한이 다되거나, 지정된 시점에 도달하거나, 가짜(spurious) wakeup이 발생하면 이 전제를 확인합니다. 전제조건을 검사한 결과가 true일 때만 반환하게 되죠.

timeout을 사용하는 함수들을 자세히 보기에 앞서, C++에서는 시간이 어떻게 명세되어 있는지 확인해봅시다. Clock부터 시작할까요?

  • Clocks(시계)

C++ 표준 라이브러리에서, clock은 시간정보를 표현하는데 사용됩니다. Clock 클래스는 4가지 정보를 가지고 있습니다.

  • 특정시간 : Now
  • clock이 소유하는 Time값을 위한 Type
  • tick period (짧은 시간 주기)
  • 일정하다고(steady) 여겨지는 시간.

현재시각은 정적 멤버함수인 now()를 호출해서 얻을 수 있습니다. 가령, std::chrono::system_clock::now()system_clock의 현재시간을 반환하죠. 이 시점(time point)의 타입은 time_point 멤버로 정해져 있습니다. 그러니 some_clock::now()의 타입은 some_clock::time_point 인거죠.

tick을 문맥상의 의미인 '시간 측정'으로 번역하였습니다. - 박 동하

측정 주기(tick period)는 '초'단위의 '분수'로 정의됩니다. 이 분수는 clock의 period멤버 typedef에 전달되죠. 가령, 초당 25회 측정되는 시계의 주기는 std::ratio<1,25>로 표기됩니다. 반면에 2.5초마다 1회 측정되는 경우는 std::ratio<5,2>로 표시되죠. 만약 측정 주기가 실행시간(runtime)까지 알 수 없거나, 프로그램(Application)의 실행 도중에 달라질 수 있으면, 측정 주기(tick period)는 가능한 가장 짧은 측정 주기의 평균값으로 결정됩니다. 또는 라이브러리의 작성자가 적합하다고 지정한 값을 가지게 되죠. 관측된(observed) 측정 주기가 정의된 주기와 일치한다는 것은 보장할 수 없습니다.

만약 clock이 균일하게 측정된다면, (period와 맞는지는 제쳐두고) 그리고 변경 불가능하다면, 그 clock은 일정(steady) 하다고 할 수 있습니다. is_steady 정적 데이터멤버는 이 경우 true값이며, 일정하지 않은 경우에는 false값을 가집니다. 일반적으로, std::chrono::system_clock은 균일하지 않습니다. 왜냐하면 시계가 조정될 수 있기 때문이죠. (even if sync adjustment is done automatically to take account of local clock drift.) 이런 조정은 now()를 호출하고 이전의 now()호출보다 더 이른시각 값을 반환할 수 있습니다. 이건 균일한 측정 비율(tick rate)라는 요구사항에 위배되죠. Steady clock은 시간제한(timeout) 계산에 중요합니다. 곧 보겠지만, C++ 표준 라이브러리는 std::chrono::steady_clock의 형태로 이것을 제공하고 있죠. 다른 시계는 C++표준 라이브러리의 std::chrono::system_clock(앞서 언급한)으로 제공됩니다. system_clock은 system의 '실제 시간'을 의미하고 시점(time point)을 time_t로 변환하는 함수와 std::chrono::high_resolution_clock (가능한 최소, 그렇기 때문에 최대 정밀도를 지니는 측정 주기(tick period)를 제공하는)을 지원하죠. (high_resolution_clock은 라이브러리가 지원하는 모든 tick period중 가장 작습니다.) 이건 실제로는 그냥 다른 clock의 typedef일수도 있습니다. 이 clock들은 라이브러리 헤더에 정의되어 있죠.

time point에 대한 표현을 짧게 볼 것이지만, 먼저 duration(기간)이 어떻게 표현되는지 확인해봅시다.

  • Duration(기간)

duration은 '시간'을 지원하는 부분들 중 가장 단순합니다. std::chrono::duration<> 클래스 템플릿(Thread 라이브러리에서 사용하는 C++ 시간조작 기능들은 전부 std::chrono 네임스페이스에 있습니다.)에 의해 다뤄지죠. 첫번째 템플릿 파라미터는 표현의 타입입니다. (int, long, double) 그리고 두번째 파라미터는 분수로, 각 기간이 얼마나 많은 초로 이루어져 있는지를 표현하는 수치이죠. 예컨대, 몇 분(minute) 단위를 short자료형으로 저장하려 한다면, 이 duration 형은 std::chrono::duration<short, std::ratio<60,1>>이 됩니다. 1분에는 60초가 있기 때문이죠. 반대로, 밀리초(millisecond)단위를 double에 저장하려 한다면, std::chrono::duration<double, std::ratio<1,1000>> 이 됩니다. 1초 = 1000 밀리초 니까요.

표준 라이브러리는 std::chrono 네임스페이스에 몇가지 미리 정의된 duration들을 제공합니다. 나노초, 마이크로초, 밀리초, 초, 분, 시 들이죠.(nanoseconds, microseconds, milliseconds, seconds, minutes, and hours) 이 기간 단위들은 당신이 원하는 표현을 선택하기에 충분히 다양하죠. (가령 500년 이상의 기간을 표현하기에). SI 비율에 대한 정의도 있는데, std::atto에서 std::exa까지 있습니다. (만약 당신의 플랫폼이 128비트 정수형을 쓴다면!) 이걸 사용해서 std::duration<double,std::centi>처럼 자신만의(custom) 기간 단위를 정의할 수 있죠.

기간들 간의 변환은 값의 버림이 요구되지 않는 한(더 적은 '정밀도'로의 narrowing이 아닌 한)묵시적으로 이루어집니다. 때문에 시간에서 초로 변환하는 것은 괜찮지만, 초에서 시간으로 변환하는 것은 묵시적으로 이루어지지 않죠.) 명시적으로 변환하는 것은 std::duration_cast<>를 통해서 할 수 있습니다.

std::chrono::milliseconds ms(54802);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds> (ms)
// 밀리 초에서 초 단위로의 변환. 정밀도가 감소하기 때문에 명시적 변환이 필요하다.

이 결과값은 반올림되기 보다는 버려집니다(truncation). 그러니 위 예시에서 s는 (반올림 값인 55가 아니라) 54라는 값을 가지게 되죠.

Duration(기간)은 산술연산도 지원합니다. 그러니 당신이 원한다면 duration 간의 덧셈, 뺄셈이나 상수와의 곱셉, 나눗셈이 가능하죠. 그러니 5*seconds(1)seconds(5) 또는 minutes(1) - seconds(55)와 같습니다. duration에서 단위 수(서수)는 count() 멤버 함수를 사용해서 얻을 수 있습니다. 그러니 std::chrono::milliseconds(1234).count()는 1234인 것이죠.

Duration-based(기간을 사용한) 대기는 std::chrono::duration<> 개체(instance)를 필요로 합니다. 예컨대, 당신이 future가 준비되기까지 최대 35 밀리초간 대기한다면, 이런식으로 표현할 수 있습니다.

std::future<int> f = std::async(some_task);
if( f.wait_for( std::chrono::milliseconds(35) ) == std::future_status::ready )
{
   do_something( f.get() );
}

wait 함수는 timeout이 되었는지 대기중인 이벤트가 발생했는지를 반환합니다. 이 경우, 당신은 future를 기다리고 있을 겁니다.
그러니 wait함수는
timeout이 발생해 std::future_status::timeout을 반환하거나,
future변수가 준비되어서 std::future_status::ready를 반환하거나,
future의 task가 deferred되어서 std::future_status::deferred를 반환합니다.
duration-based 대기에서 duration은 라이브러리 내부의 steady_clock을 이용해서 측정됩니다. 그러니 35밀리초는 elapsed time의 35 밀리초를 의미하죠. system clock이 대기시간동안 조정되었더라도 말이죠.

앞서 system_clock은 steady하지 않을 수 있다는 점을 언급했고, 그렇기 때문에 라이브러리 내에 정의된 steady_clock을 통해서만 duration을 계산한다는 의미로 해석됩니다.

물론, 시스템 스케줄링의 변덕성(vagaries)과 운영체제 시계(OS clock)의 정확성에 따라서, thread가 호출/반환을 하는데 걸리는 시간은 35밀리초보다 길어질 수 있습니다.

자, duration을 배웠으니, time point(시점)으로 이동해보죠.

  • Time Points (시점)

시계(clock)을 위한 시점은 std::chrono::time_point<>의 인스턴스로 표현됩니다. 첫번째 템플릿 파라미터는 어떤 시계를 참조할 것인지, 두번째 파라미터는 시간의 측정단위(std::chrono::duration<>의 상세 자료형)로 구성되죠. 시점의 값은 시간의 길이(명시된 duration의 배수)입니다. 시간에서 임의의 지점(point)은 epoch라고 불리기 떄문이죠. clock에서 epoch는 기본속성이지만, 바로 획득되거나 C++ 표준에서 명세하는 것이 아닙니다.(directly available to query or specified by the c++ standard.) 일반적인 epoch는 1970년 1월 1일 00:00과 application이 부팅 될 때의 time_point인스턴스를 포함하죠. Clock은 epoch를 공유하거나, 독립적이니 epoch를 소유할 수 있습니다. 만약 두 clock이 하나의 epoch를 공유한다면, 한 클래스의 time_point typedef를 다른 clock 타입도 그 time_point와 연관되도록 할 수 있습니다. 비록 epoch가 언제인지 알 수 없더라도, 당신은 임의의 시점(time point)에서 time_since_epoch()를 계산해낼 수 있죠. 이 멤버함수는 특정 시점(우리가 알수 없던 그 epoch)으로부터의 시간길이를 duration 값으로 반환합니다.

예컨대, time_pointstd::chrono::time_point<std::chrono::system_clock, std::chrono::minutes>로 정의했다면, 이 time_point에서는 상대 system clock값을 분 단위로 보유하게 됩니다.(system_clock의 기본 정밀도는 초이거나 더 작기 때문에 상대적으로 덜 정밀하게 되죠.)

std::chrono::time_point<> 인스턴스에는 duration 을 더하거나 뺄 수 있습니다. 그러니 std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(500)은 500 나노초 뒤의 미래시점(time_point)을 반환하겠죠. 이런 특징은 당신이 코드블럭(block of code)의 최대 duration을 알 때, timeout절대값(absolute timeout)을 계산하는데 유용합니다. 하지만 대기 함수(waiting function)을 여러번 호출 하거나, waiting function을 지나치고 진행하는(precede)함수(non-waiting function)가 호출되면 일부 시간을 잡아먹을 순 있습니다.

물론, 하나의 시점에서 다른 시점을 뺄 수도 있습니다. 이 때 결과는 두 시점의 시간 간격을 duration 값으로 반환해주죠. 이 연산은 코드블럭의 시간을 계산하는 데 유용하게 쓰일 수 있습니다. 예를 들어보죠.

auto start = std::chrono::high_resolution_clock::now();
do_something();
auto stop = std::chrono::high_resolution_clock::now();

std::cout << "do_something() took "
   << std::chrono::duration<double, std::chrono::seconds>(stop-start).count() 
   << " seconds" << std::endl;

// Start와 Stop의 시간차를 duration<> 템플릿을 사용해 인스턴스화. 
// double 자료형으로 저장되며, 측정단위는 second(초).

std::chrono::time_point<> 인스턴스의 clock 파라미터는 epoch의 정의 뿐만 아니라 다른 기능도 합니다. absolute timeout을 가지는 wait함수로 time point의 인스턴스를 넘겨주게 되면, 넘겨준 파라미터는 시간을 측정하는데 사용되죠. 이러한 특징은 시간이 바뀔때 중요한 결과로 이어집니다. wait함수는 이 시간 변화를 추적해서 now()함수가 넘겨받은 timeout 시점 보다 늦지 않는한 반환하지 않기 때문이죠. 만약 그 clock이 앞서 조정되었다면, (파라미터로 전달하기 전에 조정되었다면) wait 함수의 총 대기 시간(steady_clock으로 측정했을 때)을 단축시킬 수 있습니다. 그리고 만약 wait 함수에 전달된 이후에 조정되었다면, wait 함수의 총 대기시간을 증가시키게 되죠.

(wait 함수에 파라미터로 넘겨준다는 점으로 미루어) 예상하셨겠지만, time point는 wait함수의 _until 파생형과 같이 사용되기도 합니다. 일반적인 사용예시는 프로그램에서 고정된 시점에서부터 some-clock::now()로부의 시간 차(offset)계산입니다. system time과 연동된 time point는 time_t에서 std::chrono::system_clock::to_time_point() 정적 멤버 함수를 호출하는 것으로 획득할 수 있습니다. 이 값은 사용자에게 가시적인 시간(user-visible time)에 연산을 스케줄링하는데 사용되죠. 예를 들자면, 만약 어떤 condition-variable과 관련된 이벤트의 발생까지 최대 500밀리초 동안 대기한다고 하면, 아마 이런 코드를 작성하게 될 겁니다.

// [Listing 4.11]
#include <condition_variable>
#include <mutex>
#include <chrono>
std::condition_variable cv;
bool done;
std::mutex m;
    
bool wait_loop()
{
   auto const timeout = std::chrono::steady_clock::now()
                        + std::chrono::milliseconds(500);
   std::unique_lock<std::mutex> lk(m);
   while(!done)
   {
      if( cv.wait_until( lk, timeout ) == std::cv_status::timeout )
         break;
   }
   return done;
}

제한된 시간동안 condition variable을 기다린다면 이런 방법을 권장합니다. 만약 당신이 wait함수로 전제조건(predicate)를 넘기지 않는다면 말이죠. 이 방법으로, 루틴(loop)의 전체 길이는 제한됩니다. 4.1.1에서 보았듯이, spurious wakeup을 다루기 위해선, predicate없이 condition variable을 쓸때는 loop를 사용해야 합니다. 만약 wait_for() 함수가 루프 안에서 사용된다면, spurious wake가 발생하기 전까지 거의 최대 길이만큼 대기할 수도 있습니다. 그리고 다음 wait time이 시작되죠. 이렇게 되면 몇번이고 대기를 반복하기 때문에 전체 대기시간은 무기한으로 길어집니다.

timeout을 명세하는 기초적인 방법들을 알았으니, 이제 timeout을 사용하는 함수들로 넘어가 볼까요.

timeout을 사용할 수 있는(accept) 함수

timeout을 사용하는 가장 단순한 사용법은, 특정 thread의 프로세싱에 딜레이를 주는 겁니다. 이렇게 하면 그 thread는 다른 thread들로부터 프로세서를 잡아먹지 않게 되죠. 할게 없으니까요. 당신이 'done'플래그를 루프안에서 poll했던 4.1의 예시에서 봤었죠. 이 역할을 하는 함수가 2개 있는데, std::this_thread::sleep_for()std::this_thread::sleep_until()입니다. 이 두 함수는 마치 단순한 알람시계처럼 동작하죠. thread는 임의의 기간(duration)동안 (sleep_for()) 또는 임의의 시점(time point)까지 (sleep_until()) sleep 상태에 빠지게 되죠. 4.1 부분에서의 예시들은 sleep_for()가 적합하죠. 어떤 일이 주기적으로 진행되어야 하거나, 종료시간(elapsed time)을 고려해야하는 경우들이었습니다. 반면에, sleep_until()은 호출하는 thread가 특정한 시점에 일어나도록 조절할 수 있죠. 새벽에 백업을 시작하도록 한다거나, 오전 6시에 payroll을 출력하도록 한다거나, 또는 비디오를 되감을 때 다음 프레임이 갱신되면 스레드를 종료하도록 하는 것이 가능합니다.

물론, timeout의 기능이 sleeping만 있는 것은 아니죠. 이미 봤었지만 future와 condition variable과 timeout을 같이 쓸 수 있습니다. 심지어 뮤텍스로 락을 거는 것도 (뮤텍스가 지원한다면) timeout을 이용해서 할 수 있었죠. 일반적인 std::mutexstd::recursive_mutex는 잠금시에 timeout을 지원하지 않습니다. 하지만 std::timed_mutex는 지원하죠. std::recursice_timed_mutex도 지원합니다. 이 두 타입들은 try_lock_for()try_lock_until() 멤버함수를 지원하고 임의의 기간이나 시점까지 잠금을 얻으려고 하죠. 표 4.1은 C++ 표준 라이브러리에서 timeout을 사용할 수 있는 함수들을 보여줍니다. duration으로 표시된 파라미터들은 std::duration<>의 인스턴스여야만 합니다. 그리고 time_point로 표시된 파라미터들은 std::time_point<>의 인스턴스여야 하죠.

시간제한(timeout)을 받는 함수들

이제까지 condition variable, future, promises, 그리고 packaged task에 대해서 다루었습니다. 이 기능들을 사용해서 어떻게 thread들 간의 연산을 동기화할 수 있는지 큰 그림을 그려보죠.

4. 코드 단순화를 위한 연산의 동기화

Using synchronization of operations to simplify code

이번 단원에서 그동안 설명했던 동기화 기능들을 사용하는 것은 당신이 동기화가 필요한 연산에 집중할 수 있도록 해줍니다. 코드 단순화를 도울 수 있는 방법 중 하나는, 프로그램의 병렬성에 대해서 코드를 좀 더 함수적(funtional)으로 작성하는(accomodates) 접근법을 취하는 것입니다. Thread간의 데이터를 공유하는 것 보다, 각 연산(task)들에게 필요한 정보가 주어지고, 결과가 다른 thread들에게 future를 사용해서 전파(disseminate)되는 것이죠.

future를 사용한 함수형 프로그래밍

함수형 프로그래밍(FP : Functional Programming)이라는 용어는 함수의 결과가 오직 그 함수에게 전달되는 매개변수(parameter)에 의해서만 결정되고, 함수 외부의 상태와는 분리되어 있는(의존하지 않는) 프로그래밍 스타일을 의미합니다. 이런 방법은 함수의 수학적 개념과 관련되어 있습니다. 그리고 이것의 의미는, 함수를 같은 인자(parameter)로 2번 호출하면, 그 결과는 완전히 똑같다는 것이죠. 이것이 C++ 표준 라이브러리에 있는 sin, cos, 그리고 sqrt와 같은 수학적인 함수들의 속성(property)입니다. 단순한 타입들의 기본연산도 마찬가지 입니다. 3+3, 6*9, 1.3/4.7처럼요. 순수한(pure) 함수의 효과는 온전히 반환 값으로만 제한됩니다.

Side effect를 최소화하는 함수들로 프로그램을 구성해야 한다는 이야기로 보입니다.

  • 박 동하

이렇게 되면 생각하기 쉬워집니다. 특히 동시성(concurrency)이 개입할때 그렇죠. 왜냐하면 공유 메모리와 관련된 (3장에서 다루었던) 대부분의 문제들이 사라지니까요. 공유 데이터에 변경이 없다면, 경쟁 상태(race condition)가 사라지고, 따라서 공유 데이터를 mutex를 사용해서 보호할 필요도 없어집니다. 마치 병렬적인 시스템에서 프로그래밍을 하는데 점차 더 사용되고 있는 Haskell 프로그래밍 언어처럼 강력한 단순화를 보여주죠. Haskell에서 모든 함수들은 기본적으로(by default) 순수(pure)합니다. 대부분이 순수하기 때문에, 순수하지 않은(impure) 함수들은 실제로 공유 상태를 변경합니다. 그러니 어떻게 이 순수하지 않은 함수들이 어플리케이션(application)의 전체구조에 적합하게 사용될지 고려하기(reason) 쉬워지는 것이죠. 함수형 프로그래밍의 이점들은 함수형 프로그래밍 패러다임에만 국한되지는 않습니다. 무엇보다(however), C++는 멀티 패러다임 언어이고, 프로그램을 함수형 프로그래밍 스타일로 작성하는 것이 가능합니다. C++11은 C++98에 비해 이 부분에서도 쉽죠.
람다(lambda) 함수(Ref : appendix A, section A.7),
Boost와 TR1의 std::bind와의 병합(incorporation),
auto 자동 타입 추론기능(Ref : appendix A, section A.7)가 있으니까요.
그리고 future는 C++에서 FP 스타일 동시성을 구현하는 마지막 요소 입니다. future는 thread들간에 전달될 수 있고, 한 계산의 결과를 다른 계산의 결과에 의존하도록 만들 수 있습니다. 공유 데이터에 대한 명시적 접근 없이 말이죠.

FP-스타일 Quick-Sort

FP-스타일 동시성을 위해서 future를 사용하는 것을 보여드리죠. 단순한 Quick-Sort 알고리즘을 통해서 확인해봅시다. 이 알고리즘의 기본 아이디어는 단순한데, 값들의 리스트(선형 집합)가 주어지면, 리스트의 한 원소 기준점(pivot)으로 삼고, 리스트를 기준점보다 작은 부분(partition)과 크거나 같은 부분으로 나눕니다. A sorted copy of the list is obtained by sorting the two sets and returning the sorted list of values less than the pivot, followed by the pivot, followed by the sorted list of values greater than or equal to the pivot. 그림 4.2는 10개의 정수가 이 방법으로 정렬되는 것을 보여줍니다. FP-스타일의 절차적인 구현형태는 아래쪽에 있습니다. 이 구현은 std::sort()처럼 배치상태를 바꾸기 보다는 리스트를 값으로 반환(return by value) 합니다.

// [Listing 4.12 : A sequental implementation of Quicksort]

template <typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
     if( input.empty() ){
          return input;
     }

     std::list<T> result;    

     // 1.
     result.splice( result.begin(), input, input.begin() );
     // 2.
     T const& pivot = *result.begin();

     // 3.
     auto divide_point = std::partition( input.begin(), input.end() ,
                    [&](T const& t) {return t<pivot;}  
               ); 

     // 4.
     std::list<T> lower_part;
     lower_part.splice( 
               lower_part.end(), input, input.begin(), divide_point );

     // 5.
     auto new_lower( 
               sequential_quick_sort( std::move(lower_part) )  );
     // 6.
     auto new_higher(
               sequential_quick_sort( std::move(input) )  ); 

     // 7.
     result.splice(result.end(), new_higher);
     // 8.
     result.splice(result.begin(), new_lower);
     
     return result;
}

비록 FP-스타일이긴 하지만, 이대로라면 많은 복사가 발생하게 됩니다. 그렇기 때문에 내부적으로는 '일반적인' 스타일을 사용할 필요가 있죠.
(1). 당신은 splice()를 사용해서 리스트를 잘라내면서 첫번째 원소를 기준점(pivot)으로 지정합니다.
(2). 이 방법은 잠재적으로 차선의(suboptimal) 정렬(비교나 교환횟수의 측면에서)로 이어집니다만,std::list로 무엇인가 하는 것은 리스트 순회로 인해서 시간을 더 소비할 여지가 있습니다.
(3). 다음에는 std::partition을 사용해서 리스트를 pivot보다 작은 값들과 작지 않은 값들로 분할(divide)합니다.

분할 기준을 명시하는 가장 쉬운방법은 람다(lambda) 함수를 사용하는 것입니다. 참조 캡쳐(reference capture)를 사용해서 pivot을 값으로 복사하는것을 방지해야 하죠. (Ref : appendix A, section A.7)

(4). 이제, FP스타일 형식(interface)으로 최적화되었습니다. 그러니 만약 당신이 절반으로 나누어 재귀를 사용하면, 2개의 리스트들이 필요하죠. 이 부분은 splice()를 사용해, input에서 divide_point까지 잘라내어 lower_part리스트로 집어넣으면 됩니다.
(5).(6). 이렇게 하면 남아있는 값들은 input에 남아있게 되죠. 이 두 리스트(inputlower_part)를 재귀호출로 정렬합니다.
(7). 여기서도 std::move를 사용하면 다시 복사를 막을 수 있죠. 결과값은 묵시적으로 옮겨집니다. 마지막으로, splice()를 다시 쓰면 result를 올바른 순서로 맞추게 됩니다(piece). new_higher 값은 마지막에 위치하게 되죠.
(8). new_higher값은 pivot 뒤에 위치하며, new_lower값은 시작부분, pivot앞쪽에 위치합니다.

함수형 프로그래밍 스타일 재귀 정렬

FP-스타일 병렬(parallel) Quick-Sort

이 프로그램은 이미 함수형 스타일을 사용하고 있기 때문에, future를 사용하면 병렬화된 버전으로 전환하기가 쉽습니다. 아래쪽 코드에 나와있죠. 이 버전은 Quick-Sort 알고리즘을 future와 함수형 스타일을 사용해서 구현합니다.

// [Listing 4.13 : Parallel Quicksort using futures]

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
     if( input.empty() ){
          return input;
     }

     std::list<T> result;    

     result.splice( result.begin(), input, input.begin() ); 
     T const& pivot = *result.begin();

     auto divide_point = std::partition( input.begin(), input.end() ,
                    [&](T const& t) {return t<pivot;}  
               ); 

     std::list<T> lower_part;
     lower_part.splice( lower_part.end(), input, input.begin(),
           divide_point );

     // 1.
     std::future<std::list<T>> new_lower( 
               std::async(&parallel_quick_sort<T>, std::move(lower_part)  ) );
     // 2.
     auto new_higher(
               parallel_quick_sort( std::move(input) )  ); 
     // 3
     result.splice(result.end(), new_higher);.
     // 4.
     result.splice(result.begin(), new_lower.get() );
     
     return result;
}

가장 큰 변화는 현재 스레드에 있는 기준점(pivot)보다 작은 부분을 정렬하기 보다는,

  1. std::async()를 사용해서 다른 스레드에게 넘겨서 정렬한다는 것입니다.
  2. 기준점보다 큰 부분은 직접 재귀(direct recursion)를 통해서 정렬됩니다.
  3. 재귀적으로 parallel_quick_sort()를 호출함으로써, 하드웨어 병렬성의 이점을 얻을 수 있습니다.

만약 std::async()가 매번 새 스레드를 시작하고, 재귀를 3번 한다면, 8개의 스레드가 실행중일 겁니다. 재귀를 10회 진행하면 (1000여개의 원소가 있는 리스트를 정렬하기 위해서) 1024개의 스레드가 진행중이겠죠. 하드웨어가 감당할 수 있다면 말입니다. 만약 라이브러리가 너무 많은 일(task)들이 생성되었다고 판단하면(아마도 하드웨어가 병렬적으로 감당할 수 있는 수준을 넘어섰기 때문이겠죠.), 라이브러리는 동기적으로(synchronously) 일들을 새로 생성하는 상태로 전이할(switch)겁니다. 새로운 스레드를 만드는것 보다, 진행중인 스레드 안에서 get()을 호출하여 실행하게 되겠죠. 왜냐하면 다른 스레드에게 작업(task)를 넘기는 일의 오버헤드는 수행능력(performance)에 도움이 안될테니까, 새로운 스레드 생성을 지양하는(avoiding) 것입니다. std::async가 각 task마다 새 스레드를 시작하도록 하거나, 모든 일이 동기적으로 실행되도록 하는 구현을 따르는 것은 의미가 없습니다. std::launch::deferred가 명시적으로 정의(specified)되거나, std::launch::async가 명시적으로 정의되어있지 않다면 말이죠. 만약 당신이 자동적인 크기 조정을 위해 라이브러리에 의존하고 있다면, 당신의 구현을 위해서 라이브러리 문서에서 어떤 동작을 설명하는지 확인하는게 좋습니다.

std::async()를 사용하기 보다, std::packaged_taskstd::thread를 간단하게 wrapper한 당신만의 spawn_task()를 쓸 수도 있을 겁니다. 4.14에 나와있는 것 처럼 말이죠. 함수를 실행(call)하기 위해 'std::packaged_task'를 만들고, thread위에서 실행하는 것이죠. 그리고 future를 반환하게 합니다.

you’d create a std::packaged_task for the result of the function call, get the future from it, run it on a thread, and return the future.

이렇게 하면 어떤 굉장한 장점이 있는 것은 아닙니다.(실제로도 상당한 Oversubscription을 가져올 가능성이 있죠.) 하지만, 이 방법은 더 복잡한(sophisticated) 구현으로 넘어가기(migrate) 위한 기반을 다져놓을(pave) 수 있습니다. 여기서 복잡한 방법이라고 한다면, task를 큐(queue)에 추가하면서 여러 Worker thread의 풀(pool)에서 돌아가도록 하는 것을 의미합니다. thread pool에 대해서는 9장에서 알아볼 겁니다.

앞서 chapter 2 에서 언급되었듯, oversubscription은 하드웨어가 지원할 수 있는 병렬성(concurrency)보다 더 많은 프로그램 상의 thread가 존재하는 것을 의미합니다.

  • 박 동하

이렇게 하는 것은 thread pool의 생성과 실행을 당신이 정말로 이해하고, 완전히 통제하고 싶을 때만 'std::async'보다 더 낫습니다.(in preference to)

여튼, parallel_quick_sort로 돌아가죠. 당신은 방금 new_higher를 위해서 재귀를 사용했습니다. 앞서 sequential_quick_sort부분과 같이 단순하게 splice할 수도 있죠. 하지만, new_lower는 여기선 std::future<std::list<T>>를 사용합니다. 그러니 splice()를 하기에 앞서 get()를 호출해서 list<T>를 얻어와야 하죠. 이렇게 하면 백그라운드에서 작업이 끝날때 까지 기다린 후, 그 값을 이동(move)시켜서 splice()를 호출합니다; get()은 r-value reference를 반환하기 때문이죠. 그러니 move를 사용하는 것이 가능합니다. (이동 문맥(move semantics)에 대해서는 appendix A, section A.1.1를 참조하세요.)

비록 std::async()가 하드웨어 병렬성이 지원하는 한 최적으로 동작하긴 하지만, 이 방법은 여전히 이상적인 Quicksort의 병렬 구현이 아닙니다. 한 가지, std::partition은 많은 일을 하지만, 이 부분은 여전히 순차적 호출(sequential call)로 구현되어 있습니다. 지금은 충분히 괜찮죠. 만약 당신이 최고속도의 병렬 구현에 관심이 있다면, 학술 문헌(academic literature)를 확인해보세요.

//[Listing 4.14] A sample implementation of spawn_task

template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type>
          spawn_task(F&& f, A&& a)
{
     // 편의를 위한 결과 타입의 재정의
     typedef std::result_of<F(A&&)>>::type result_type;
     
     // packaged_task를 이용한 작업 생성.
     std::packaged_task<result_type(A&&)> task( std::move(f) );

     // 작업의 결과물을 위한 future type.
     std::future<result_type> res(task.get_future());

     // 백그라운드에서 작업 실행.
     std::thread th(std::move(task), std::move(a));
     th.detach();

     // 앞서 정의한 future를 반환. 함수 외부에서는 get()통해 값을 받게 된다.
     return res;
}

함수형 프로그래밍이 유일한 병렬 프로그래밍 패러다임은 아닙니다. 이 패러다임에서는 공유 데이터의 사용을 지양(eschews)하죠. 다른 패러다임은 CSP(Commuting Sequential Process)입니다. 이 패러다임에서는 thread들이 개녀적으로 완전히 분리되어 있죠. 이 thread들은 공유데이터를 사용하지 않고 서로 메시지를 주고받는 통신채널을 사용합니다. 이 패러다임은 프로그래밍 언어 Erlang과, MPI(Message Passing Interface) 환경에서 적용된 방법이고, C와 C++를 사용하는 고성능 계산에 주로 사용됩니다. C++에서 이제는 이 패러다임을 지원한다는게 별로 놀랍진 않으실겁니다. 다음 section에서 한가지 활용법을 배워보죠.

CSP : https://en.wikipedia.org/wiki/Communicating_sequential_processes
Erlang : http://www.erlang.org/
MPI : http://mpi-forum.org/

메세지 전달을 통해서 연산 동기화하기

On-work

CSP의 아이디어는 단순합니다: 만약 공유 데이터가 없다면, 각 thread는 완전히 독립적이고, 순수하게 전달받는 메세지에 의해서만 반응하여 동작하죠. 그러니 각 thread는 사실상 상태머신(state machine)인 겁니다: 메세지를 받으면, 그 thread는 어떤 방법으로 상태를 갱신하고 다른 thread들에게 메세지를 보낼수도 있습니다. 이런 과정들이 초기 상태(initial state)에 따라서 수행되는(performed) 것이죠. 이런 thread들을 작성하는 한 방법은, 이런 thread들을 규칙화(formalize)하고, 유한 상태 머신(Finite State Machine) 모델을 구현하는 것입니다.

유한 상태 기계(Finite State Machine)
: https://ko.wikipedia.org/wiki/%EC%9C%A0%ED%95%9C_%EC%83%81%ED%83%9C_%EA%B8%B0%EA%B3%84

하지만 이 방법만 있는 것은 아닙니다; 상태머신은 어플리케이션의 구조에 내재(implicit)되어 있을수도 있습니다. 이 방법에서는 프로그램의 주어진 상황에서 동작에 대한 명확한 명세(exact behavioral requirements)와 프로그래밍 팀의 기량이 있다면 어떤 시나리오가 주어져도 더 나은 수행을 보여주죠.(works better) 각 thread를 어떻게 구현하든, 독립적인 프로세스로 분리하는 것은 공유데이터의 병렬성으로 인한 복잡함을 굉장히 덜어주고, 따라서 프로그래밍도 쉬워집니다. 버그도 줄어들죠.

진정한 Communicating Sequential Process 는 공유데이터가 없습니다. 모든 통신은 메세지 큐(message queue)를 통해서 이루어지죠. 하지만, C++ 스레드들은 주소공간을 공유하기 때문에, 이 요구사항을 충족시키는 것은 불가능합니다. 여기서 규칙(discipline)이 발생합니다: 어플리케이션이나 라이브러리 작성자 말이죠. 물론 메세지 큐는 thread들의 통신을 위해서 공유되어야 합니다. 하지만 내부사항(the details)은 라이브러리 안쪽으로 숨겨져야 하죠.

지금부터는 ATM을 사용한 예시입니다

  • 박 동하

잠시, ATM을 위한 코드를 구현하고 있다고 상상해봅시다. 이 코드는 돈을 인출하려는 사람과 연결된 은행과의 상호작용을 다뤄야 하죠. 카드 삽입, 메세지 출력, 키 입력, 현금 인출, 카드 반환과 같은 ATM의 물리적인 장치들도 통제해야 하죠.

이 모든 것들을 다루는 방법으로는, 3개의 독립적인 thread로 코드를 분리하는 것을 생각할 수 있습니다. 하나는 ATM 기계(machinery)를, 하나는 ATM Logic을, 하나는 은행과의 통신을 담당하는 것이죠. 이 thread들은 데이터를 공유하기보다 서로 메세지를 전달하는 것으로 통신할 수 있습니다. 예컨대, 기계 담당 thread는 사용자가 카드를 입력하거나 버튼을 누르면, Logic thread로 메세지를 보냅니다. 그리고 logic thread는 기계 thread로 얼마나 많은 돈을 출금할지 메세지를 보내는 것이죠. 이런 방식입니다.

ATM logic을 모델링 하는것은 상태머신(state machine)입니다. 각 상태에서 thread는 적합한 메세지를 기다리고, 그 메세지가 오면 다음 절차로 진행하죠. 이 진행과정은 다른 상태로 전이(transition)하는 것입니다. 그리고 다시 메세지를 기다리는, 순환(cycle)이 반복되죠. 이 상태들은 [Figure 4.3]에 보이는 간단한 구현처럼 만들어질 수 있습니다. 이 단순화된 구현에서, 시스템은 카드가 삽입되기를 기다리고 있습니다. 카드가 삽입되면, ATM 시스템은 사용자가 하나씩 PIN 번호를 하나씩 입력받습니다.
입력이 끝나면, PIN 번호는 인증과정을 거치죠(verified). 만약 PIN이 잘못되어있으면, 그대로 끝입니다. 그러니 카드를 사용자에게 돌려주고 카드를 삽입하길 기다리는 상태로 복원(resume)하게 됩니다. PIN이 인증되면, 진행과정(transaction)을 취소할지 인출금액을 입력할지를 기다립니다. 만약 취소(cancel)를 누르면, 그대로 끝이고, 카드를 반환합니다. 인출금액을 선택하면, 시스템은 현금인출하고 카드를 반환할지, "금액부족"메세지를 출력하고 카드를 돌려줄지 은행으로부터의 확인을 기다립니다. 당연한 이야기지만, 실제 ATM은 더 복잡하겠죠. 하지만 이정도면 CSP의 아이디어를 표현하기엔 충분합니다.

단순한 ATM 상태 머신 모델

ATM logic을 위한 상태 머신을 설계 했으니, 이것을 각 상태를 멤버함수로 표현한 클래스로 구현할 수 있습니다. 각 멤버함수는 정해진 종류의 메세지 입력을 기다리고, 메세지가 도착하면 처리합니다. 다른 상태로 전이하는 것이죠. 각 메세지 타입들은 다른 struct로 표현됩니다. [Listing 4.15]는 이 시스템에서 ATM logic을 단순하게 구현한 부분을 보여줍니다. main루프와 초기상태의 구현(카드 삽입을 기다리는)도 확인할 수 있죠.

보시는 것처럼, 메세지 전달을 위한 모든 필수적인 동기화들은 메세지-전달(message-passing) 라이브러리 안에 완전히 감추어져 있습니다. (Ref : Appendix C)

// [Listing 4.15] A simple implementation of an ATM logic class
struct card_inserted
{
     std::string account;
};

class atm
{
     messaging::receiver incoming;
     messaging::sender bank;
     messaging::sender interface_hardware;

     void (atm::*state)();

     std::string account;
     std::string pin;

     // 1. 
     void waiting_for_card()
     {
          // 2. 
          interface_hardware.send(display_enter_card());
          // 3. 
          incoming.wait()
               .handle<card_inserted>(
                    // 4. 
                    [&](card_inserted const& msg)
                    {
                         account=msg.account;
                         pin="";
                         interface_hardware.send(display_enter_pin());
                         state=&atm::getting_pin;
                    }
                    );
     }
     void getting_pin();

public:
     // 5.
     void run()
     {
          // 6. 
          state=&atm::waiting_for_card;
          try
          {
               for(;;)
               {
                    // 7. 
                    (this->*state)();
               }
          }
          catch(messaging::close_queue const&)
          { }
     }
};

앞서 언급한 것처럼, 여기에 표현된 구현(implementation)은 ATM에서 요구되는 실제 절차(real logic)보다 단순화한 것입니다. 하지만 당신은 메세지-전달 스타일 프로그래밍이 어떤 것인지 알려주고 있죠. 동시성(concurrency)과 동기화(synchronization) 문제(issue)는 생각할 필요가 없습니다. 단순히 어떤 상태에서 어떤 메세지를 전달받고, 어떤 메세지를 전달할 것인가를 고려하면 되죠.

The state machine for this ATM logic runs on a single thread, with other parts of the system such as the interface to the bank and the terminal interface running on separate threads.

이런 스타일의 프로그램 설계는 Actor model이라고 불립니다. (시스템에는 별개의 thread에서 동작하는 여러 actor들이 존재하고, 이 actor들이 다른 actor들에게 메세지를 보내면서 작업을 수행하는 것이죠. message를 통해서 직접 전달되는 것 이외에 공유되는 상태는 없습니다.)

Execution starts with the run() member function (5), which sets the initial state to waiting_for_card (6)
and then repeatedly executes the member function representing the current state (whatever it is) (7).

The state functions are simple member functions of the atm class. The waiting_for_card state function (1) is also simple:
it sends a message to the interface to display a “waiting for card” message (2)
and then waits for a message to handle (3).
The only type of message that can be handled here is a card_inserted message, which you handle with a lambda function (4).
You could pass any function or function object to the handle function, but for a simple case like this, it’s easiest to use a lambda. Note that the handle() function call is chained onto the wait() function; if a message is received that doesn’t match the specified type, it’s discarded, and the thread continues to wait until a matching message is received.

람다 함수 자체는 단순히 카드로부터 계좌번호를 받아(catch) 멤버 변수로 저장합니다. 이 과정에서 현재 PIN 번호를 지워버리고, 사용자로부터 PIN번호를 받기 위해서 하드웨어로 메세지를 보내죠. 그 다음에 getting PIN상태로 전이합니다. 한번 메세지 핸들러가 완료되면, 그 상태함수는 복귀하고, main 루프는 다음 상태 함수를 호출합니다.(7)

getting_pin상태 함수는 [Figure 4.3]에서 표현된 것처럼 3가지 메세지를 다뤄야 하기 때문에 조금 더 복잡합니다. 아래쪽에서 확인해보죠.

// [Listing 4.16] The "getting_pin" state function for the simple ATM implementation

void atm::getting_pin()
{
     incoming.wait()
          // 1. 
          .handle<digit_pressed>(
               [&](digit_pressed const& msg)
               {
                    unsigned const pin_length=4;
                    pin+=msg.digit;
                    if(pin.length()==pin_length)
                    {
                         bank.send(verify_pin(account,pin,incoming));
                         state=&atm::verifying_pin;
                    }
               }
               )
          // 2. 
          .handle<clear_last_pressed>(
               [&](clear_last_pressed const& msg)
               {
                    if(!pin.empty())
                    {
                         pin.resize(pin.length()-1);
                    }
               }
               )
          // 3. 
          .handle<cancel_pressed>(
               [&](cancel_pressed const& msg)
               {
                    state=&atm::done_processing;
               }
               );
}

이 경우, 3가지 메세지 타입들이 진행에 사용될 수 있습니다. 그러니 wait()함수는 3개의 handle()이 연속적으로 호출된 것이죠. 각각의 handle()호출은 템플릿 매개변수(parameter)로 전달받아 람다 함수로 전달합니다. handle()호출이 이렇게 연결되어(chained) 있기 때문에, wait() 구현은 함수가 digit_pressed메세지, clear_last_pressed메세지, 또는 cancel_pressed 메세지를 기다린다는 것을 알수 있습니다. 다른 메세지 타입들은 전부 무시되죠.(discarded)

이 경우에는, 당신은 시스템이 메세지를 받으면 반드시 상태를 바꾸도록 할 필요는 없습니다. 예컨대, digit_pressed메세지를 받으면, 단순히 그 메세지를 pin에 마지막 숫자까지 더하는 것이죠. [Listing 4.15]에 있는 메인 루프에서는 getting_pin()을 다시 호출하고 다음 숫자입력을 기다릴 겁니다.(지움(clear)이거나 취소(cancel)일수도 있죠.)

이러한 구현은 [figure 4.3]에 있는 행동과 대응합니다. 각 상태(state box)는 별개의 멤버함수로 구현이 가능하죠. 멤버함수에서는 관련된 메세지들을 기다리고, 적합한 메세지가 오면 상태를 갱신합니다.

보시는 것처럼, 이 CSP 스타일 프로그래밍은 동시성 시스템을 굉장히 단순하게 만들 수 있습니다. 각 thread가 완전히 독립적으로 다뤄지기(treated) 때문이죠. 그러니 관심사항(concerns)을 분리하기 위해 다수의 thread들을 사용하는 것과, 그렇게 하기 위해서 thread들 간의 작업을 어떻게 분할할 것인지를 명시적으로(explicitly) 결정해야 합니다.

5. 요약(Summary)

스레드 간 연산을 동기화 하는 건 동시성을 사용하는 프로그램을 작성하는데 중요한 요소입니다. 만약 동기화가 없다면, 그 스레드들은 필수적으로 독립적이고 분리된 어플리케이션들로 작성되어야 할 겁니다. 서로 연관된 일을 처리하는 한 그룹 처럼 움직이도록 말이죠. 이번 단원에서는, 연산들을 동기화하는 다양한 방법을 다뤘습니다. 간단한 조건 변수, future 와 promise, 그리고 Packaged tasks가 있었죠. 동기화 문제에 어떻게 접근할 것인가도 다루었습니다. 입력에 따라서만 결과가 나오는 함수형 프로그래밍이나, 스레드간 통신이 비동기 메세징으로 이루어지는 메세지 전달이 있었죠.

많은 C++에서 가능한 고-레벨 기능들을 다뤘지만, 이제 이 모든것들이 가능하게 해주는 깊은-레벨 기능들에 대해서 다룰 차례입니다. 바로 C++ 메모리 모델과 atomic 연산들입니다.