수업/Network Programming

[Network Programming] MultiTherad 기반 서버

hw-ani 2023. 5. 29. 13:59

Thread(쓰레드)

(예전에 시프 공부할 때도 정리 잘 해뒀으니 이 글초반에 쓰레드 설명하는 부분이랑 같이 보면 좋음.)

 

쓰레드의 등장 배경

    1. 프로세스의 생성은 많은 리소스가 소모된다.

    2. process간 context-switching은 오버헤드가 크다.(쓰레드에 비해...)

    3. 프로세스들은 메모리 공간이 독립적이라 특별한 경로가 아니면 데이터 공유가 안된다.

 

Thread(쓰레드)는 한 Process내의 또 다른 실행 흐름이다. 구체적으로 말하자면, 쓰레드들은 Stack을 제외한 공간을 공유한다.(+ 다른 실행 흐름이니까 당연히 PC값도 다르겠지..)

한 process 내에 존재하므로 프로세스처럼 아예 다른 메모리 공간을 할당받을 필요도 없고, context-switching이 일어나도 오버헤드가 비교적 작다. 또 메모리 공간도 공유하므로 쓰레드간 데이터 공유도 간편하다.(단, 공유자원이니까 주의해서 다뤄야한다. Lock으로 동기화 해야 한다.)

 

좌측이 MultiProcess, 우측이 MultiThread를 표현한 것

 

 

 

 

pthread_create

#include <pthread.h>

pthread_t tid; //생성할 thread id를 받아올 변수 선언 필요

int pthread_create(pthrad_t *restrict thread,
                   const pthread_attr_t *restrict attr,
                   void *(*start_routine)(void*),
                   void *restrict arg
                   );

첫번째 인자로 생성할 쓰레드 ID 저장을 위한 변수의 주소값을 전달한다. thread도 id가 있다.

두번째 인자로 쓰레드 특성 정보를 전달한다. NULL을 전달하면 기본값으로 설정된다.

세번째 인자로 쓰레드의 main 함수 역할을 할 함수의 포인터를 전달한다.(시그널 핸들러랑 비슷) 이 함수가 종료되면 해당 쓰레드도 종료된다. 이때 전달하는 함수는 void * type을 반환하고 void * type 인자를 하나 받아야 한다. (시그널 핸들러는 void 반환하고 int 인자 받았어야했음 헷갈리지말기)

네번째 인자로 세번째 인자로 등록된 함수가 호출될 때 전달할 인자를 담고 있는 변수의 주소 값을 전달한다.

 

 컴파일 시 "-lpthread" 옵션을 추가하여 외부 쓰레드 라이브러리를 link 하도록 지시해야 한다.

 

 

 

pthread_join

#include <pthread.h>

int pthread_join(pthread_t thread, void **status);

Process 자체가 종료되면 thread들이 종료됐든 말든 무조건 같이 종료된다. sleep()으로 thread 종료되는걸 다 기다리고 관리하기엔 어렵기 때문에, pthread_join 함수를 이용해 메인 프로세스는 쓰레드가 종료될 때까지 기다려줘야한다.

 

첫번째 인자로 종료를 기다릴 thread의 ID를 전달한다.

두번째 인자로 해당 thread의 main 함수가 반환하는 값이 저장될 포인터 변수의 주소 값을 전달한다.

해당 쓰레드가 종료될 때까지 이 함수는 반환하지 않는다. 그때까지 이 함수를 호출한 process(or thread)는 대기(블로킹)상태가 된다.

 

성공 시 0, 실패 시 0 이외의 값을 반환한다.

 

 

 

pthread_detach

#include <pthread.h>

int pthread_detach(pthread_t thread);
//성공 시 0, 실패 시 0 이외의 값 반환

쓰레드는 종료돼도 메모리에서 자동으로 소멸되진 않는다.

메모리에서 소멸시키려면 아래 두 함수를 사용해야한다.

    1. pthread_join()

    2. pthread_detach()

1번 함수의 문제점은 위에서 봤듯이 blocking 상태에 놓인다는 것이다. 물론 메인 process가 먼저 종료되지 않기 위해 사용하는 목적이라면 blocking 시키는게 맞을 수도 있다.

하지만 메모리에서의 소멸이 목적이라면 이를 유도하면서 blocking은 발생하지 않는 함수인 pthread_detach() 함수를 이용하면 좋다. 뭐 이 함수를 호출했다고 실행 중인 쓰레드가 갑자기 종료되거나 하진 않는다.

대신 pthread_detach()에 인자로 넣어서 호출해두면, 해당 thread가 종료될때 자동으로 메모리 자원들을 반납하도록 한다.

 

 

 


공유 자원과 임계 영역(Critical Section) 문제

공유자원이 왜 문제가 되는지.. 뭐 이런 얘기는 하도 많이 들었으니 pass

 

Thread-unsafe function : 둘 이상의 쓰레드가 동시에 호출하면 문제가 생기는 함수를 지칭

Thread-safe function     : 둘 이상의 쓰레드가 동시에 호출해도 문제가 없는 함수를 지칭

예를들어 gethostbyname() 함수는 Thread-unsafe function이고, gethostbyname_r() 함수는 Thread-safe function이다. 컴파일 시 "-D_REENTRANT" 옵션으로 _REENTRANT 매크로를 정의하면 Thread-unsafe function 호출문이 Thread-safe function 호출문으로 자동 변경 컴파일된다.(아니면 소스코드 내 헤더파일 include 하기 전에 "#define _REENTRANT" 로 정의해도 된다.)

 

 

임계 영역(Critical Section)이란, 두 thread가 동시에 접근했을 때 문제가 되는 부분을 말한다. 주로 전역 변수 같은 공유 자원에 접근하는 부분이다.

 

공유자원이나 임계영역에 접근하는 문제를 해결하기 위해 동기화(Synchronization)가 필요하다.

 

 

 

동기화(Synchornization)

동기화가 필요한 대표 상황

    1. 공유 자원 동시 접근 발생하는 상황

    2. 공유 자원 접근 순서를 지정해야하는 상황

 

동기화 대표 기법

    1. Mutex 기반 동기화

    2. Semaphore 기반 동기화

 

 

 

Mutex 기반 동기화

#include <pthread.h>

//prhtead_mutex_t type 변수 선언해서 mutex 변수 만들고 아래 함수 써야 함
pthread_mutex_t myMutex;

//mutex 초기화, 소멸
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

//mutex 획득, 반환
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

//성공 시 0, 실패 시 0 이외의 값 반환

네 함수 모두 "pthread_mutex_t" type의 주소값을 받는다. 이걸로 어떤 mutex에 대해 작업할지를 결정한다.

mutex 변수를 초기화하는 pthread_mutex_init() 함수의 두번째 인자로는 mutex의 특성을 설정할 수 있는데, 그냥 NULL로 주면 기본으로 설정된다.(시프 때 정리했던 글 보면 이 함수 안썼는데.. 아다리맞게 잘 된거일 뿐 써야 된다고 하네)

 

(사용 방법)

pthreadt_mutex_t type의 변수를 선언하고, init()으로 초기화한 뒤, 임계 영역에 들어가기 전에 lock()을 하고, 임계 영역을 나오면서 unlock()을 하면 된다.

참고로 pthread_mutex_t type 변수는 보통 전역으로 선언한다. 해당 변수엔 모든 쓰레드가 접근할 수 있어야 동기화가 될테니...

 

 

 

Semaphore 기반 동기화

#include <semaphore.h>

//sem_t type 변수 선언해서 semaphore 변수 만들고 아래 함수 써야 함
sem_t mySem;

//semaphore 초기화, 소멸
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);

//semaphore 획득, 반환
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);

//성공 시 0, 실패 시 0 이외의 값 반환

시프에선 sem_open()으로 named semaphore를 가져와서 프로세스간 동기화를 했었는데, 지금 우린 쓰레드간 동기화가 목표이므로 sem_init()으로도 충분하다.

 

네 함수 모두 "sem_t" type의 주소값을 받는다. 이걸로 어떤 semaphore에 대해 작업할지를 결정한다.

semaphore 변수를 초기화하는 sem_init() 함수의 두번째 인자는 다른 프로세스에서 접근 가능한 세마포어로 만들지를 결정한다. 0을 전달하면 하나의 프로세스 내에서만 접근 가능하고, 0 이외의 값을 전달하면 둘 이상의 프로세스에서 접근 가능하다. 우리는 쓰레드간 동기화를 할거라 그냥 0을 전달하면 된다.

sem_init() 함수의 세번째 인자로 전달한 값이 세마포어의 초기 값이 된다.

 

sem_wait()는 semaphore의 값을 1 감소시키고, sem_post()는 semaphore의 값을 1 증가시킨다.

semaphore의 값이 0보다 커야 sem_wait()를 넘어서 다음 라인으로 진입할 수 있다.

 

(사용 방법)

sem_t type의 변수를 선언하고, init()으로 초기화한 뒤, 임계 영역에 들어가기 전에 wait()을 하고, 임계 영역을 나오면서 post()을 하면 된다.

참고로 sem_t type 변수는 보통 전역으로 선언한다. 해당 변수엔 모든 쓰레드가 접근할 수 있어야 동기화가 될테니...

 

 

Mutex와 달리 Semaphore는 변수 값을 자유롭게 설정할 수 있어서 비교적 활용도가 높다.

예를들어 두 쓰레드 A와 B가 있을 때, A→B→A→... 순으로 critical section에 접근해야 한다고 해보자. 그럼 세마포어 semA, semB 두개를 만들어서 semA는 1, semB는 0으로 설정한다. 각 세마포어 변수는 각 쓰레드가 실행될 수 있는지를 나타낸다고 봐도 된다. 그럼 thread A의 cirical section에 들어가기 전에 sem_wait(&semA); 로 시작하게 하고, 나올 땐 sem_post(&semB);를 해준다. thread B에선 반대로 sem_wait(&semB);로 시작하고, sem_post(&semA);로 마무리하면 된다. 그러면 semA와 semB가 번갈아가며 1의 값을 가지므로 두 쓰레드는 번갈아가며 잘 실행된다.

자세한 코드는 강의자료 nsp-18 p.23 참고

 

 

 


MultiThread 기반 다중 접속 서버 (채팅 서버)

MultiProcess 기반 서버에선 연결 요청이 들어올 때마다 자식 proecss를 만들고 거기서 client에 서비스를 하도록 했다.

MultiPlexing 기반 서버에선 한 프로세스에서 socket descriptor들을 관찰 대상으로 등록해놓고 매번 select() 함수로 확인하며 수신한 데이터가 있으면 따로 처리해줬다.

 

MultiThread 기반 서버에선 MultiProcess 기반 서버와 비슷하게 연결 요청이 들어올 때마다 쓰레드를 새로 만들고 그 쓰레드에서 client에게 서비스를 제공하도록 한다.

단, 이번에 만드는건 채팅 서버라서 좀 다른 점은 있다. 서버가 받은 데이터는 모든 clinet에게 전송해줘야하므로 전역 변수(공유 자원)로 배열을 선언해서 모든 client socket을 저장한다. 따라서 mutex lock도 사용해야한다.

 

 

 

- Server Source Code

/**MultiThread 기반 다중 접속 채팅 Server**/

pthread_mutex_t mutx;         //동기화에 사용할 mutex 변수
int clnt_socks[MAX_CLNT_NUM]; //client 목록
int clnt_cnt = 0;             //등록된 client 개수

//연결된 모든 client에 msg에 담긴 데이터 전송
void send_msg(char * msg, int len) {
    pthread_mutex_lock(&mutx);
    for (int i=0; i < clnt_cnt; i++)
        write(clnt_sock[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

//각 thread들의 메인 함수: client에 service 제공, 끝나면 목록에서 삭제
void * handle_clnt(void * arg) {
    int clnt_sock = *((int*)arg);
    int str_len = 0;
    char msg[BUF_SIZE];
    
    while ((str_len = read(clnt_sock, msg, sizeof(msg)))!=0)
        send_msg(msg, str_len);
    
    pthread_mutex_lock(&mutx); //공유 자원에 접근할거라 동기화
    for (int i=0; i < clnt_cnt; i++) {  //연결 해제할 client는 목록에서 삭제
        if (clnt_sock == clnt_socks[i]) {
            while (i++ < clnt_cnt-1)
                clnt_socks[i] = clnt_socks[i+1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx); //공유 자원 접근 끝!
    close(clnt_sock);  //연결 종료
    return NULL;
}

int main(int argc, char *argv[]) {
    . . .
    while (1) {
        //연결 요청 처리
        clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
        
        //연결된 client는 client 배열에 등록
        pthread_mutex_lock(&mutx);
        clnt_socks[clnt_cnt++] = clnt_sock;
        pthread_mutex_unlock(&mutx);
        
        pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
        pthread_detach(t_id); //나중에 메모리 해제 잘 되도록..
        printf("Connected client IP: %s\n", inet_ntoa(clnt_adr.sin_addr));
    } //어차피 무한반복이라 pthread_join()은 굳이 필요 없는 듯
    . . .
    return 0;
}

 

 

 

- Client Source Code

/**MultiThread 기반 다중 접속 채팅 Client**/

//데이터 수신 역할을 맡은 Thread의 메인 함수 : 데이터 수신해서 출력
void * recv_msg(void * arg) {  //매개변수로 소켓의 File descriptor 받아옴
    int sock = *((int*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len;
    while (1) {
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        if (str_len == -1)
            return (void *) -1;
        name_msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

//데이터 송신 역할을 맡은 Thread의 메인 함수 : stdin에서 입력받아 그대로 보냄
void * send_msg(void * arg) {  //매개변수로 소켓의 File descriptor 받아옴
    int sock = *((int*(arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    while (1) {
        fgets(msg, BUF_SIZE, stdin);
        if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s %s", name, msg);
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    . . .
    //connect()로 연결 요청
    if ((connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");
    
    //연결 후 송신과 수신을 다른 쓰레드에서 동시에 처리하도록 함.
    pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);
    close(sock);
    . . .
    return 0;
}

 

각 쓰레드에서 한 소켓에 입출력을 동시에 진행하는건 별 문제없이 잘 동작한다. 입출력 버퍼는 어차피 나뉘어져있고, 잘 보내고 받아진다.