DataScience
Published 2023. 5. 27. 10:52
Rust 멀티스레딩 Rust
728x90

모든 컴퓨터는 멀티코어 프로세서를 가지고 있어서 여러개의 프로세스를 병렬적으로 실행할 수 있습니다. 하지만 각 프로세스는 독립적으로 격리되어 있어서 서로간에 데이터를 공유하기 어렵습니다. 

하나의 프로세스로부터 여러개의 스레드를 만들면 한 프로세스 안에 속한 스레드들은 격리되어 있지 않고 메모리를 공유하며 서로 통신할 수 있습니다. 

 

스레드 스폰

프로세스에서 스레드를 만드는 것을 스레드를 스폰(spawn)한다고 말합니다.

싱글 스레드 스폰하기

모든 프로그램은 메인 스레드로부터 시작합니다. 메인 스레드가 main 함수를 실행하고, 다른 스레드들을 실행시킬 수도 있습니다. 스레드 한 개를 스폰하는 방법은 아래와 같습니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1)
thread.start()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

실행 결과

Hello, can you hear me?: 0
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9

메인 스레드와 생성된 스레드가 번갈아 가면서 작업을 수행하는 것을 알 수 있습니다.

러스트에서는 새로운 스레드를 만들 때 표준 라이브러리의 std::thread::spawn 함수를 사용합니다. spawn 함수는 스레드에서 실행시킬 함수를 인자로 받습니다. 만일 전달받은 함수가 종료된다면, 스레드도 종료됩니다. 다음 코드에서는 thread::spawn 을 사용해 스레드를 생성하고, thread::sleep 을 사용해 1ms 만큼 쉬었다가 다음 루프를 실행합니다. 파이썬과 마찬가지로, 메인 스레드와 생성된 스레드가 번갈아가며 실행되는 것을 확인할 수 있습니다.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

실행 결과

Hello from the main thread: 0
Hello, can you hear me?: 0
Hello from the main thread: 1
Hello, can you hear me?: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello from the main thread: 3
Hello, can you hear me?: 4
Hello from the main thread: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9

대몬(daemon) 스레드 만들기

스폰할 스레드를 대몬 스레드로 만들 수도 있습니다. 대몬 스레드는 백그라운드에서 실행되며, 메인 스레드가 종료되면 함께 종료되는 스레드입니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1, daemon=True)
thread.start()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

Hello, can you hear me?: 0
Hello from the main thread: 0
Hello, can you hear me?: 1
Hello from the main thread: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello from the main thread: 3
Hello from the main thread: 4
Hello, can you hear me?: 4

thread::spawn  JoinHandle 을 반환합니다. JoinHandle 은 스레드가 종료될 때까지 기다리는 join 메서드를 가지고 있습니다. join 을 호출하면 메인 스레드가 종료되지 않고, 생성된 스레드가 종료될 때까지 기다립니다. 만일 메인 함수에서 join 을 호출하지 않으면, 스폰된 스레드가 실행 중이더라도 메인 스레드가 종료되어 프로그램이 종료됩니다.

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

실행 결과

Hello from the main thread: 0
Hello, can you hear me?: 0
Hello from the main thread: 1
Hello, can you hear me?: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello from the main thread: 3
Hello, can you hear me?: 3
Hello from the main thread: 4
Hello, can you hear me?: 4
Hello, can you hear me?: 5

실행 결과에서 알 수 있듯이 스폰된 스레드가 10번 출력이 되어야 하는데 6번만 출력되고 프로그램이 종료되었습니다. 이를 해결하기 위해서는 join 을 호출해야 합니다.

join 을 사용해 스레드 기다리기

일반적으로 스레드는 시작된 다음 join 함수를 사용해 스레드 작업이 끝날 때까지 기다려집니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1)
thread.start()
thread.join()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

실행 결과

Hello, can you hear me?: 0
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4

이번에는 join()을 사용해 스폰된 스레드가 끝까지 실행되기를 기다렸다가 메인 스레드를 실행합니다.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    handle.join().unwrap();

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

실행 결과

Hello, can you hear me?: 0
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4

스레드와 소유권

std::thread::spawn에는 함수 이름을 전달할 수도 있지만, 앞에서처럼 클로저(closure)를 전달하는 경우가 더 많습니다. 클로저를 사용하면 특정 값을 스레드 안으로 이동시키는 것이 가능합니다.

let numbers = vec![1, 2, 3];
thread::spawn(move || {
    for n in numbers {
        println!("{n}");
    }
})
.join()
.unwrap();

변수 numbers의 소유권이 새로 만들어진 스레드로 이동됩니다. 바로 move 키워드가 클로저에 사용되었기 때문입니다. 만일 move를 사용하지 않았다면, 클로저는 numbers를 레퍼런스로 사용하게 되고, 이는 컴파일 에러의 원인이 됩니다. 왜냐하면 소유권을 빌린 변수 numbers보다 스레드의 지속 범위(scope)가 더 넓을 수 있기 때문입니다.

스레드는 프로그램이 종료될 때까지도 계속 실행될 수 있기 때문에, spawn 함수는 인자로 'static 타입을 입력받습니다. 영원히, 즉 프로그램이 종료될 때까지 존재할 수 있는 함수만을 입력받는 것입니다. 만일 클로저가 지역 변수의 레퍼런스를 입력받는 경우, 지역 변수가 범위를 벗어나 메모리에서 지워져 버리는 문제가 발생합니다.

클로저의 리턴값은 스레드로 전달됩니다. 이 값은 join 메소드가 호출될 때 Result로 감싸져서 리턴됩니다.

let numbers = Vec::from_iter(0..=1000);

let t = thread::spawn(move || {
    let len = numbers.len();
    let sum = numbers.into_iter().sum::<usize>();
    sum / len // 1
});

let average = t.join().unwrap(); // 2
println!("average: {average}");

위의 예제에서, 스레드의 클로저(1)에서 리턴된 값은 join 메소드(2)를 통해 메인 스레드로 전달됩니다.

만일 변수 numbers의 길이가 0이라면 스레드는 (1)에서 값을 0으로 나누려고 하다가 패닉을 발생시키게 됩니다. 그러면 join은 패닉 메시지를 리턴하게 되고, unwrap(2)에 의해 메인 스레드 역시 패닉이 발생합니다.

범위 제한(scoped) 스레드

만일 어떤 스레드가 반드시 특정 범위에서만 존재하는 것이 확실할 경우, 이 스레드는 지역 변수의 소유권을 빌려올 수 있습니다. 왜냐하면 스레드 역시 지역변수와 같이 특정 범위에서만 존재하기 때문입니다.

이러한 스레드를 범위 제한 스레드라고 부릅니다. 범위 제한 스레드를 만들기 위해서는 std::thread::scope를 사용하면 됩니다. 이제 지역 변수를 입력받는 클로저와 범위가 동일한 스레드를 만들 수 있습니다.

use std::thread;

fn main() {
    let numbers = vec![1, 2, 3];

    thread::scope(|s| {
        s.spawn(|| {
            println!("length: {}", numbers.len());
        });
        s.spawn(|| {
            for n in &numbers {
                println!("{n}");
            }
        });
    });
}
  1. 먼저 스레드의 범위를 만들어주기 위해 std::thread::scope 함수에 클로저를 전달합니다. 해당 클로저는 즉시 실행되고, 현재 범위를 나타내는 인자 s를 입력으로 받습니다.
  2. 그다음 s를 사용해 스레드를 생성합니다. 스레드에 전달되는 클로저는 지역 변수 numbers를 사용할 수 있습니다.
  3. 범위가 끝날 때, 실행 중인 스레드들이 종료될 때까지 기다립니다.

이러한 패턴을 사용하면 범위 안의 스레드들이 절대 범위 밖으로 나가지 못하는 것이 보장됩니다. 따라서 spawn 함수에 ``static타입이 아닌 인자를 입력받을 수 있게 됩니다. 예를 들어numbers변수의 경우는 해당 스코프s보다 오래 존재하기 때문에 범위 안의 스레드에서 numbers`를 참조할 수 있습니다.

위의 예제에서 두 스레드 모두 numbers 변수에 동시적으로 접근할 수 있습니다. 메인 스레드를 포함해 어느 스레드도 numbers 의 값을 바꾸고 있지 않기 때문에 동시에 접근하는 것 자체는 괜찮습니다. 하지만 코드를 아래와 같이 바꿔서 numbers에 새로운 값을 넣으려고 하면 컴파일 오류가 발생합니다.

use std::thread;

fn main() {
    let mut numbers = vec![1, 2, 3];
    thread::scope(|s| {
        s.spawn(|| {
            numbers.push(1);
        });
        s.spawn(|| {
            numbers.push(2); // Error!
        });
    });
}

컴파일 오류는 다음과 같습니다. 참고로 러스트 컴파일러 버전에 따라서 에러 메시지는 조금씩 다를 수 있습니다.

error[E0499]: cannot borrow `numbers` as mutable more than once at a time
  --> src/main.rs:9:17
   |
5  |       thread::scope(|s| {
   |                      - has type `&'1 Scope<'1, '_>`
6  |           s.spawn(|| {
   |           -       -- first mutable borrow occurs here
   |  _________|
   | |
7  | |             numbers.push(1);
   | |             ------- first borrow occurs due to use of `numbers` in closure
8  | |         });
   | |__________- argument requires that `numbers` is borrowed for `'1`
9  |           s.spawn(|| {
   |                   ^^ second mutable borrow occurs here
10 |               numbers.push(2); // Error!
   |               ------- second borrow occurs due to use of `numbers` in closure

GIL(Global interpreter lock)

GIL은 한 번에 하나의 스레드만 파이썬 바이트코드를 실행하도록 하기 위해 인터프리터에서 사용되는 메커니즘입니다. GIL은 인터프리터가 코드를 실행하기 전에 획득하는 락(lock)입니다. 스레드가 GIL을 획득하면 파이썬 바이트코드를 실행할 수 있지만 다른 모든 스레드는 GIL이 해제될 때까지 파이썬 코드를 실행할 수 없도록 차단됩니다.

GIL의 목적은 여러 스레드의 동시 액세스로부터 인터프리터의 내부 데이터 구조를 보호하는 것입니다. GIL이 없으면 두 개 이상의 스레드가 동일한 데이터 구조를 동시에 수정할 수 있으며, 이로 인해 레이스 컨디션 등의 문제가 발생할 수 있습니다.

GIL의 단점

그러나 GIL에는 몇 가지 단점도 있습니다. 한 번에 하나의 스레드만 바이트코드를 실행할 수 있기 때문에 GIL은 멀티스레드 프로그램, 특히 CPU에 바인딩된 작업을 포함하는 프로그램의 성능에 치명적인 영향을 줍니다. 한 스레드가 CPU에 바인딩된 작업을 실행하는 경우, 다른 스레드가 I/O 또는 기타 CPU에 바인딩되지 않은 작업을 대기 중이더라도 Python 코드를 실행하지 못하고 작업이 끝날 때까지 기다려야 합니다.

GIL의 한계를 극복하기 위해 파이썬은 몇 가지 기능을 지원합니다.

  • 비동기 프로그래밍
  • 멀티스레딩
  • 멀티프로세싱
  • 제한적 GIL 해제

이 중에서 실제 개발자 입장에서 가장 널리 쓰이는 방식은 비동기 프로그래밍과 멀티스레딩입니다. numpy나 pandas와 같이 C를 사용해 GIL을 제한적으로 해제하거나, 15장에서 다룰 방법인 러스트를 사용해 성능을 비약적으로 향상시키는 방식도 가능합니다. 다만 이 방법은 서드파티 패키지를 사용하거나, 별도 패키지를 자체적으로 빌드해서 사용해야하는 만큼 사용 방법이 다소 제한적입니다.

GIL 경합(contention) 문제

아래 코드를 실행해 보면 count 함수를 연속으로 두 번 호출하는 것과, 스레드를 사용하는 것과의 실제 실행 속도 차이가 그리 크지 않습니다. 그 이유는 파이썬은 한 번에 단 하나의 코드만 실행할 수 있기 때문에, 스레드를 여러 개를 만들더라도 빠르게 계산하지 못하기 때문입니다.

import time
import threading

N = 10000000

def count(n):
    for i in range(n):
        pass

start = time.time()
count(N)
count(N)
print(f"Elapsed time(sequential): {(time.time() - start) * 1000:.3f}ms")

start = time.time()
t1 = threading.Thread(target=count, args=(N,))
t2 = threading.Thread(target=count, args=(N,))

t1.start()
t2.start()

t1.join()
t2.join()

print(f"Elapsed time(threaded): {(time.time() - start) * 1000:.3f}ms")

실행 결과

Elapsed time(sequential): 0.4786410331726074
Elapsed time(threaded): 0.4163088798522949

 

메모리 공유

스레드 소유권과 레퍼런스 카운팅

두 스레드가 데이터를 공유하는 상황에서, 두 스레드 모두가 나머지 하나보다 더 오래 존재한다는 사실이 보장되지 않는다면, 어떤 스레드도 데이터의 소유권을 가질 수 없습니다. 공유되는 어떤 데이터도 두 스레드보다 더 오래 존재해야만 합니다.

스태틱(static)

러스트에는 어떠한 스레드에도 소속되지 않는 변수를 만드는 방법이 있는데, 바로 static입니다. static 변수는 프로그램 자체가 소유권을 가지기 때문에 반드시 어떤 스레드보다도 오래 존재할 수 있습니다. 다음 예제에서는 두 스레드 모두 X에 접근할 수 있지만, 두 스레드 모두 X를 소유할 수는 없습니다.

static X: [i32; 3] = [1, 2, 3];
thread::spawn(|| dbg!(&X));
thread::spawn(|| dbg!(&X));

static으로 선언된 변수는 상수 값을 가지며, 프로그램이 시작되기 전에 생성됩니다. 따라서 어떤 스레드도 static 변수로부터 값을 빌려올 수 있게 됩니다.

유출(Leaking)

또 다른 데이터 공유 방법은 값의 할당을 유출시키는 방법입니다. Box::leak 함수를 사용하면 Box의 소유권을 해제하고 절대 이 값이 삭제되지 않게 할 수 있습니다. 이때부터 Box는 프로그램이 종료될 때까지 존재하게 되고 어느 스레드에서도 값을 빌려 갈 수 있게 됩니다.

let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));

thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));

여기서 move 클로저가 값의 소유권을 가져가는 것처럼 보이지만, 자세히 살펴보면 x는 단순히 원래 Box의 레퍼런스라는 사실을 알 수 있습니다.

레퍼런스란 정수형이나 불리언 타입처럼 원본 데이터는 그대로 두고 값만 복사해가는 것입니다.

여기서 주의해야 할 점은 'static 으로 선언되었다고 해서 이 값이 프로그램 시작 전에 만들어진다는 것은 아니라는 것입니다. 중요한 점은 이 값이 프로그램이 종료될 때까지 유지된다는 사실입니다.

이렇게 Box를 유출시키게 되면 메모리가 유출되는 단점이 있습니다. 메모리에 어떤 객체를 할당했지만, 객체를 삭제하고 메모리에서 할당 해제하지 않는 것입니다. 전체 프로그램에서 이러한 패턴이 몇 번 존재하지 않는다면 큰 상관이 없지만, 이러한 패턴이 반복되면 프로그램의 메모리가 점차 부족해질 것입니다.

레퍼런스 카운팅

스레드 사이에서 공유된 데이터가 확실히 삭제되고 메모리에서 할당 해제되게 하려면, 해당 데이터의 소유권을 포기해서는 안됩니다. 대신, 소유권을 공유하면 가능합니다. 해당 데이터의 소유자들을 지속적으로 관리함으로써 더 이상 해당 데이터의 소유자가 없을 때 객체를 삭제할 수 있습니다.

이러한 방법을 레퍼런스 카운팅(Reference counting)이라고 하고, 러스트에서는 std::rc::Rc 타입을 사용해 구현이 가능합니다. Box와 비슷하지만, 데이터를 복사하게 되면 새로운 데이터가 메모리에 할당되는 것이 아니고 레퍼런스 카운터의 값이 증가합니다. 결론적으로 원본 데이터와 복사된 데이터 모두 같은 메모리에 할당된 값을 참조합니다. 이러한 원리 때문에 소유권을 공유한다고 말하는 것입니다.

use std::rc::Rc;

let a = Rc::new([1, 2, 3]);
let b = a.clone();

assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation!

위 예제에서는 Rc타입의 변수 a를 clone() 메소드로 복사해서 b를 만들었습니다. 그리고 두 변수의 메모리 주소를 확인해보면 동일하다는 것을 알 수 있습니다.

Rc가 삭제되면 카운터가 감소됩니다. 가장 마지막으로 존재하는 Rc가 삭제되면, 카운터가 0이 되고 따라서 메모리에서 값이 할당 해제됩니다.

하지만 Rc를 다른 스레드로 보내려고 하면 에러가 발생합니다.

    error[E0277]: `Rc` cannot be sent between threads safely
        |
    8   |     thread::spawn(move || dbg!(b));
        |                   ^^^^^^^^^^^^^^^

결론적으로 Rc는 스레드 안전성이 보장되지 않는 타입입니다. 만일 여러 개의 스레드가 특정 값에 대해 Rc를 사용한다면, 각 스레드에서 레퍼런스 카운터를 동시에 변경할 가능성이 있고 이것은 예측하지 못한 결과를 발생시킵니다.

Arc(Atomic refernce counting)

대신 아토믹(atomically)한 레퍼런스 카운팅을 사용하는 std::sync::Arc을 사용할 수 있습니다. Rc와 동일한 기능을 제공하지만, Arc는 여러 스레드에서 레퍼런스 카운터를 변경하는 것이 허용된다는 점이 다릅니다. 레퍼런스 카운터가 변경되는 작업이 아토믹하게 이루어지기 때문에, 여러 개의 스레드에서 동시에 카운터를 변경하더라도 스레드 안전성이 보장됩니다.

use std::sync::Arc;

let a = Arc::new([1, 2, 3]);
let b = a.clone();

thread::spawn(move || dbg!(a));
thread::spawn(move || dbg!(b));
  • (1)에서 배열을 Arc를 사용해 메모리에 할당합니다. 이때 레퍼런스 카운터는 1이 됩니다.
  • Arc를 클론하면 레퍼런스 카운트는 2가 되고, a와 b 모두 같은 메모리 주소를 사용합니다.
  • 각 스레드마다 고유한 Arc를 전달받았습니다. 즉 배열이 스레드 사이에 공유되었습니다. 각 스레드에서 Arc가 삭제될 때마다 레퍼런스 카운터가 감소하고, 카운터가 0이 되면 배열은 메모리에서 할당 해제됩니다.

뮤텍스(mutex)

뮤텍스는 Mutual exclusion(상호 배제)의 약자로, 뮤텍스는 주어진 시간에 하나의 스레드만 데이터에 액세스할 수 있도록 허용합니다. 뮤텍스의 데이터에 액세스하려면 먼저 스레드가 뮤텍스의 락(lock)을 획득하도록 요청하여 액세스를 원한다는 신호를 보내야 합니다. 락은 뮤텍스의 일부인 데이터 구조로, 어떤 스레드에서 뮤텍스를 참조하고 있는지가 저장되어 있습니다.

뮤텍스를 사용하기 위해서는 두 가지 규칙을 지켜야 합니다.

  • 데이터를 사용하기 전에 반드시 잠금을 해제해야 합니다.
  • 뮤텍스가 보호하는 데이터를 사용한 후에는 다른 스레드가 잠금을 획득할 수 있도록 데이터의 잠금을 해제해야 합니다.

뮤텍스의 API

단일 스레드에서 뮤텍스를 사용하는 방법입니다.

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

실행 결과

m = Mutex { data: 6, poisoned: false, .. }

많은 타입과 마찬가지로, 연관 함수 new를 사용해 Mutex를 생성합니다. 뮤텍스 내부의 데이터에 접근하기 위해 lock 메서드를 사용해 락을 획득합니다. 이 호출은 현재 스레드를 차단하여 다른 스레드에서 락을 가질 차례가 될 때까지 어떤 작업도 할 수 없도록 합니다.

만약 락을 보유한 다른 스레드가 패닉에 빠지면 lock 호출이 실패합니다. 이 경우 아무도 락을 얻을 수 없게 되기 때문에 스레드에서 패닉을 발생시킵니다.

락을 획득한 후에는 반환 값(이 경우 num)을 뮤텍스 내부의 값에 대한 가변 레퍼런스로 취급할 수 있습니다. 락을 해제하고 나서 뮤텍스의 값을 출력하면 내부에 보관하고 있던 정수형 값이 5에서 6으로 변경된 것을 확인할 수 있습니다.

balance가 인출하고자 하는 금액보다 크다고 판단해 잔고를 인출합니다.

import threading
import time

balance = 100

def withdraw(amount):
    global balance
    if balance >= amount:
        time.sleep(0.01)
        balance -= amount
        print(f"Withdrawal successful. Balance: {balance}")
    else:
        print("Insufficient balance.")

def main():
    t1 = threading.Thread(target=withdraw, args=(50,))
    t2 = threading.Thread(target=withdraw, args=(75,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

실행 결과

Withdrawal successful. Balance: 50
Withdrawal successful. Balance: -25

스레드가 lock을 획득했을 때만 데이터에 접근 가능하도록 하면 이러한 문제를 막을 수 있습니다.

import time
import threading

balance = 100
lock = threading.Lock()

def withdraw(amount):
    global balance
    thread_id = threading.get_ident()
    with lock:
        print(f"Thread {thread_id}: Checking balance...")
        if balance >= amount:
            time.sleep(1)
            balance -= amount
            print(f"Thread {thread_id}: Withdrawal successful. Balance: {balance}")
        else:
            print(f"Thread {thread_id}: Insufficient balance.")

def check_lock():
    while lock.locked():
        print("Lock is locked.")
        time.sleep(0.1)

def main():
    t1 = threading.Thread(target=withdraw, args=(50,))
    t2 = threading.Thread(target=withdraw, args=(75,))
    t3 = threading.Thread(target=check_lock)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

if __name__ == '__main__':
    main()

실행 결과

Thread 123145503645696: Checking balance...
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Thread 123145503645696: Withdrawal successful. Balance: 50
Thread 123145520435200: Checking balance...
Thread 123145520435200: Insufficient balance.

다음 코드는 10개의 스레드가 100번씩 1씩 증가하는 값을 더하는 코드입니다.

use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn withdraw(balance: &mut i32, amount: i32) {
    if *balance >= amount {
        thread::sleep(Duration::from_millis(10));
        *balance -= amount;
        println!("Withdrawal successful. Balance: {balance}");
    } else {
        println!("Insufficient balance.");
    }
}

fn main() {
    let mut balance = Arc::new(100);

    let t1 = thread::spawn(move || {
        withdraw(&mut balance, 50); // 🤯
    });

    let t2 = thread::spawn(move || {
        withdraw(&mut balance, 75);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

가변 레퍼런스로 각 스레드에 값을 전달할 수 없습니다.

뮤텍스를 사용해 데이터를 보호하고 있습니다. 뮤텍스를 사용하면 여러 스레드가 동시에 데이터에 액세스할 수 없습니다. 따라서 데이터가 더해지는 동안 다른 스레드는 데이터에 액세스할 수 없습니다.

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn withdraw(balance: Arc<Mutex<i32>>, amount: i32) {
    let mut balance = balance.lock().unwrap();
    println!("{:?}: Checking balance.", thread::current().id());
    if *balance >= amount {
        thread::sleep(Duration::from_millis(100));
        *balance -= amount;
        println!("Withdrawal successful. Balance: {balance}");
    } else {
        println!("Insufficient balance.");
    }
}

fn check_lock(balance: Arc<Mutex<i32>>) {
    while let Err(_) = balance.try_lock() {
        println!("Lock is locked.");
        thread::sleep(Duration::from_millis(10));
    }
}

fn main() {
    let balance = Arc::new(Mutex::new(100));

    let balance1 = Arc::clone(&balance);
    let t1 = thread::spawn(move || {
        withdraw(Arc::clone(&balance1), 50);
    });
    let balance2 = Arc::clone(&balance);
    let t2 = thread::spawn(move || {
        withdraw(Arc::clone(&balance2), 75);
    });
    let balance3 = Arc::clone(&balance);
    let t3 = thread::spawn(move || {
        check_lock(Arc::clone(&balance3));
    });

    t1.join().unwrap();
    t2.join().unwrap();
    t3.join().unwrap();
}

실행 결과

ThreadId(2): Checking balance.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Withdrawal successful. Balance: 50
ThreadId(3): Checking balance.
Insufficient balance.

 

메시지 전달

스레드 간에 데이터를 공유하는 방법 중 하나로 널리 쓰이는 것 중 하나가 바로 MPSC입니다. 다중 생성자-단일 소비자(Multiple Producer Single Consumer)란 뜻으로, 여러 개의 스레드에서 하나의 스레드로 데이터를 보내는 방식입니다.

파이썬에서는 공식적으로 MPSC를 만드는 방법이 없기 때문에, 스레드 안정성이 보장되는 큐 자료형인 Queue 를 사용해 이를 구현해 보겠습니다.

import threading
import time
from queue import Queue

channel = Queue(maxsize=3)


def producer():
    for msg in ["hello", "from", "the", "other", "side"]:
        print(f"Producing {msg}...")
        channel.put(msg)


def consumer():
    while not channel.empty():
        item = channel.get()
        print(f"Consuming {item}...")
        channel.task_done()
        time.sleep(0.01)


producer_thread = threading.Thread(target=producer)
producer_thread.start()

consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

mpsc(multiple producer, single consumer)는 여러 개의 송신자와 하나의 수신자를 가진 채널을 만듭니다. 이 채널은 송신자가 메시지를 전송하면 수신자가 메시지를 수신할 때까지 기다립니다. 이 채널은 Sender와 Receiver로 구성됩니다.

Receiver에는 두 가지 유용한 메서드, 즉 recv와 try_recv가 있습니다. 여기서는 메인 스레드의 실행을 차단하고 값이 채널로 전송될 때까지 기다리는 _receive_의 줄임말인 recv를 사용합니다. 값이 전송되면 recv는 Result<T, E>에 값을 반환합니다. 송신기가 닫히면 recv는 에러를 반환하여 더 이상 값이 오지 않을 것임을 알립니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for msg in vec!["hello", "from", "the", "other", "side"] {
            let val = String::from(msg);
            println!("Producing {}...", val);
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(10));
        }
    });

    for re in rx {
        println!("Consuming {}...", re);
    }
}

try_recv 를 사용하면 다음과 같이 할 수 있습니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hello");
        thread::sleep(Duration::from_millis(1000));
        tx.send(val).unwrap();
    });

    loop {
        println!("Waiting for the signal...");
        if let Ok(received) = rx.try_recv() {
            println!("Message: {}", received);
            break;
        }

        thread::sleep(Duration::from_millis(300));
    }
}

rx가 어떻게 앞의 메시지를 다 기다릴 수 있는지? Receiver, 즉 rx는 송신자 tx 가 메시지를 전송할 때까지 기다리려고 시도하며, 해당 채널이 끊어지면 오류를 반환합니다.

이 함수는 사용 가능한 데이터가 없고 더 많은 데이터를 전송할 수 있는 경우(적어도 한 명의 발신자가 여전히 존재할 경우) 항상 현재 스레드를 블럭합니다. 해당 발신자(또는 동기화 발신자)에게 메시지가 전송되면 이 수신자가 깨어나서 해당 메시지를 반환합니다. 즉 수신자가 메시지를 아직 보내지 않았다면 계속해서 메시지를 기다립니다.

해당 발신자가 연결을 끊었거나 이 스레드가 차단되는 동안 연결이 끊어지면, 이 채널에서 더 이상 메시지를 수신할 수 없음을 나타내는 Err을 반환합니다. 그러나 채널이 버퍼링되므로 연결이 끊어지기 전에 보낸 메시지는 계속 정상적으로 수신됩니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        for msg in vec!["hello", "from", "the", "other", "side"] {
            let val = String::from(msg);
            thread::sleep(Duration::from_millis(100));
            tx1.send(val).unwrap();
        }
    });

    thread::spawn(move || {
        let val = String::from("bye");
        thread::sleep(Duration::from_millis(1000));
        tx.send(val).unwrap();
    });

    for re in rx {
        println!("{}", re);
    }
}

'Rust' 카테고리의 다른 글

Rust 파이썬 바인딩  (56) 2023.05.29
Rust 비동기 프로그래밍  (36) 2023.05.28
Rust 스마트 포인터  (63) 2023.05.14
Rust 예외처리  (36) 2023.05.13
Rust 제네릭  (88) 2023.05.12
profile

DataScience

@Ninestar

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!