DataScience
article thumbnail
Published 2023. 5. 28. 09:58
Rust 비동기 프로그래밍 Rust
728x90

비동기 프로그래밍이란?

비동기 모델에서는 여러 가지 일이 동시에 일어날 수 있습니다. 프로그램에서 오래 실행되는 함수를 호출해도 실행 흐름이 차단되지 않고 프로그램이 계속 실행됩니다. 함수가 완료되면 프로그램은 결과를 알고 액세스합니다.

위 그림에서 왼쪽은 동기 함수의 실행 흐름, 오른쪽은 비동기 함수의 실행 흐름을 나타냅니다. 동기 함수의 경우, 요청1에 대한 응답이 주어질 때까지 기다렸다가 요청 2를 처리합니다. 비동기 함수는 요청1을 보낸 다음 응답이 올 때까지 기다리지 않고 바로 요청2를 처리합니다. 그 후, 응답 1과 응답2가 도착하면 결과를 바로 확인합니다.

비동기 함수를 사용하면, 프로그램 외부에서 작업이 끝나길 기다리는 동안에 다른 작업을 수행할 수 있기 때문에 효율적으로 작업을 수행할 수 있다는 장점이 있습니다.

비동기 작동 방식

코루틴은 실행 중에 일시 중지했다가 다시 시작할 수 있는 파이썬의 함수입니다. 코루틴은 여러 함수를 동시에 실행할 수 있도록 주기적으로, 또는 비동기 런타임이 유휴 상태일 때, 각각의 코루틴이 자발적으로 제어권을 넘겨주는 협력적 멀티태스킹(Cooperative multitasking)에 사용됩니다.

따라서 코루틴과 스레드는 매우 비슷한 일을 수행합니다. 다만 스레드의 경우 운영 체제가 스레드 간에 작업을 전환하는 런타임 환경을 가지고 있습니다. 반면 코루틴의 경우 코루틴 전환 시점은 코드에 따라 결정됩니다. 코드상에 설정된 시점에 작업을 일시 중단했다가 다시 시작하는 방식으로 여러 작업을 동시적으로 수행합니다.

러스트 역시 운영체제가 아닌 비동기 런타임에서 관리하는 태스크를 사용해 비동기 함수를 실행합니다. 코루틴과 마찬가지로 협력적 멀티태스킹에 사용됩니다.

협력적 멀티태스킹이란, 멀티태스킹에 참여하는 주체들이 언제든 자신의 실행을 자발적으로 멈추고 다른 주체에게 실행 권한을 넘기는 방식을 의미합니다. 운영체제에서는 스케줄러가 어떤 스레드가 언제 작업을 실행하고 종료할지를 관리하기 때문에 멀티스레딩을 협력적 멀티태스킹이 아닙니다.

 

tokio

앞에서 설명했듯이, 비동기 함수를 설명하려면 프로그램 내부에서 비동기 함수들의 실행을 관리하는 비동기 런타임이 필요합니다. 러스트는 빌트인 async 런타임이 존재하지 않아서 tokio 를 사용해야 합니다. Tokio는 다음 세 가지의 주요 구성 요소를 제공합니다.

  • 비동기 코드 실행을 위한 멀티스레드 런타임.
  • 표준 라이브러리의 비동기 버전.
  • 대규모 라이브러리 에코시스템.

이제 프로젝트에 tokio 를 설치하기 위해 Cargo.toml파일에 tokio를 추가합니다. 이때 features = ["full"] 을 넣어야 전체 기능을 다 사용할 수 있습니다.

[dependencies]
tokio = { version = "1.25.0", features = ["full"] }

비동기 함수 만들어보기

파이썬에는 비동기 함수를 만들기 위해서 asyncio 라는 내장 라이브러리가 있습니다.

비동기 함수를 선언하려면 일반 함수 정의 앞에 async 키워드를 붙여주면 됩니다.

async def func():
    print("async")

비동기 함수는 그냥 호출하게 되면 결과 대신 coroutine 객체가 리턴되고, 경고가 발생합니다.

>>> print(func())

<coroutine object asynch at 0x10fe8e3b0>
/Users/temp/python/main.py:9: RuntimeWarning: coroutine 'func' was never awaited
  print(func())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

비동기 함수를 호출하려면, asyncio.run  await 키워드를 같이 사용해야 합니다.

import asyncio

async def func():
    print("async")

async def main():
    await func()

asyncio.run(main())

러스트에서도 파이썬과 동일하게 비동기 함수를 만드려면 async  await을 모두 사용합니다. 위의 파이썬 코드와 동일한 러스트 코드를 만들어보면 다음과 같습니다. 파이썬에서는 함수의 앞에 await 을 사용하는 반면, 러스트에서는 함수 호출 뒤에 .await 을 붙여줍니다. 또한, 비동기 함수는 비동기 함수에서만 호출이 가능하므로 비동기 함수를 실행하기 위해서 main  async 로 바꾸고, #[tokio::main]  main 함수에 붙여서 비동기 메인 함수를 만들 수 있습니다. 이제 메인 함수가 비동기 함수이기 때문에 내부에서 async와 await 키워드를 사용 가능합니다.

use tokio;

async fn func() {
    println!("async");
}

#[tokio::main]
async fn main() {
    func().await
}

그렇다면 비동기 함수를 기다리지 않고 프린트해보면 어떨까요? 일단 아래 코드는 컴파일되지 않습니다.

use tokio;

async fn func() {
    println!("async");
}

#[tokio::main]
async fn main() {
    println!("{:?}", func()) // 🤯
}

실행 결과

error[E0277]: `impl Future<Output = i32>` doesn't implement `std::fmt::Display`
 --> src/main.rs:9:22
  |
9 |     println!("{:#}", func())
  |                      ^^^^^^ `impl Future<Output = i32>` cannot be formatted with the default formatter

컴파일 에러를 들여다보면, 함수 func의 리턴값이impl Future<Output = i32>라는 것을 알 수 있습니다. 즉 비동기 함수를 기다리지 않으면, 파이썬에서는 coroutine이, 러스트에서는 Future 가 리턴됩니다. 둘 다 await 을 사용해 함수 호출이 끝나기를 기다려줘야 실제 함수의 결과를 얻을 수 있습니다.

자바스크립트나 C#과 같은 언어의 비동기 함수들은 프라미스(Promise) 기반으로 만들어져 있어서 비동기 함수를 호출하는 즉시 실행되지만, 파이썬과 러스트는 실행할 수 있을 때 실행하는 방식(lazy execution)을 사용하고 있습니다.

하지만 단순히 await를 사용하기만 해서는 기존의 동기 함수와 비슷한 결과가 나옵니다.

async fn hello() -> i32 {
    println!("Hello World!");
    tokio::time::sleep(std::time::Duration::from_secs(3)).await;
    println!("waited");
    1
}

async fn bye() -> i32 {
    println!("Goodbye!");
    2
}

#[tokio::main]
async fn main() {
    hello().await; // blocking
    bye().await;
}

실행 결과

Hello World!
waited
Goodbye!

리턴값이 있는 비동기 함수

만일 비동기 함수에서 값을 리턴한다면 그 값을 어떻게 받을 수 있을까요? 파이썬과 러스트 모두, 단순히 기존 동기 함수처럼 코드를 작성하고, 함수 정의에 async 키워드만 붙여주면 됩니다. 먼저 파이썬의 경우는 다음과 같습니다. await 키워드를 붙여서 호출하고 나면 함수의 리턴값을 얻을 수 있습니다.

import asyncio


async def func():
    return "async"


async def main():
    result = await func()
    print(result)


asyncio.run(main())

실행 결과

async

러스트에서도 함수의 리턴값은 await 을 사용하고 나면 얻을 수 있습니다.

use tokio;

async fn func<'a>() -> &'a str {
    "async"
}

#[tokio::main]
async fn main() {
    println!("{:?}", func().await) // 🤯
}

실행 결과

"async"

여러 작업 실행하기

asyncio.gather 를 사용하면 여러 개의 비동기 함수를 한꺼번에 실행할 수도 있습니다.

import asyncio

async def give_order(order):
    print(f"Processing {order}...")
    await asyncio.sleep(3 - order)
    print(f"Finished {order}")

async def main():
    await asyncio.gather(give_order(1), give_order(2), give_order(3))

asyncio.run(main())

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1

만일 각 함수에 리턴값이 있다면 값이 모아져서 리턴됩니다.

import asyncio

async def give_order(order):
    print(f"Processing {order}...")
    await asyncio.sleep(3 - order)
    print(f"Finished {order}")
    return order

async def main():
    results = await asyncio.gather(give_order(1), give_order(2), give_order(3))
    print(results)

asyncio.run(main())

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1
[1, 2, 3]

실제 함수 실행 속도가 다르더라도 결과는 gather 에 넣은 순서대로 나오게 됩니다.

러스트에서는 tokio::join!에 기다리고자 하는 함수들을 넣어주면 됩니다. 이게 가능한 이유는, 위에서 만든 hello와 bye 함수 모두 실제로는 Future라고 하는 비동기 객체를 리턴하게 되고, tokio가 Future를 사용해 내부적으로 비동기 관련 처리를 해주기 때문입니다.

async fn give_order(order: u64) -> u64 {
    println!("Processing {order}...");
    tokio::time::sleep(std::time::Duration::from_secs(3 - order)).await;
    println!("Finished {order}");
    order
}

#[tokio::main]
async fn main() {
    let result = tokio::join!(give_order(1), give_order(2), give_order(3));

    println!("{:?}", result);
}

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1
(1, 2, 3)

예제: 빠르게 HTTP 요청 보내보기

파이썬 동기

import requests
from random import randint


# The highest Pokemon id
MAX_POKEMON = 898


def fetch(total):
    urls = [
        f"https://pokeapi.co/api/v2/pokemon/{randint(1, MAX_POKEMON)}"
        for _ in range(total)
    ]
    with requests.Session() as session:
        for url in urls:
            response = session.get(url).json()
            yield response["name"]


def main():
    for name in fetch(10):
        print(name)

    print([name for name in fetch(10)])


main()

Cargo.toml 에 추가

- rand: 이 크레이트는 난수 생성을 위한 유용한 유틸리티를 제공합니다. 여기에는 다양한 유형의 난수값을 생성하고, 이를 유용한 확률 분포로 변환하고, 무작위성 관련 알고리즘을 구현하는 기능이 포함되어 있습니다. 이 크레이트는 다양한 난수 생성기에 대해 구현된 Rng 특성을 통해 사용하기 쉬운 API를 제공합니다. - reqwest: 이 크레이트는 요청을 보내고 응답을 처리하기 위한 사용하기 쉬운 사용자 친화적인 HTTP 클라이언트를 제공합니다. 동기 및 비동기 요청을 모두 지원하며 상위 수준 API부터 고급 사용 사례를 위한 하위 수준 API까지 일반적인 사용 사례를 지원합니다. - serde_json: 이 크레이트는 Rust에서 데이터를 직렬화(serialize) 및 역직렬화(deserialize)하기 위한 프레임워크를 제공하는 serde 프로젝트의 일부입니다. serde_json 크레이트는 특히 데이터를 JSON으로 직렬화하고 JSON에서 데이터를 역직렬화하는 기능을 제공합니다.

rand = "0.8.5"
reqwest = { version="0.11.16", features = ["blocking", "json"] }
serde_json = "1.0.95"

rand::thread_rng() 로 1부터 898까지의 무작위 난수 생성

reqwest::blocking::Client::new() 동기 방식의 HTTP 클라이언트 생성

.json::<serde_json::Value>() 응답 json의 형식을 미리 알 수 없는 경우 사용, 일반적으로는 응답 형식을 알고 있어서 구조체를 사용해 타입을 명시

use rand::Rng;
use reqwest;
use serde_json;

const MAX_POKEMON: u32 = 898;

fn fetch(total: u32) -> Vec<String> {
    let mut urls = Vec::new();
    for _ in 0..total {
        let url = format!(
            "https://pokeapi.co/api/v2/pokemon/{}",
            rand::thread_rng().gen_range(1..=MAX_POKEMON)
        );
        urls.push(url);
    }
    let client = reqwest::blocking::Client::new();
    let mut names = Vec::new();
    for url in urls {
        let response = client
            .get(&url)
            .send()
            .unwrap()
            .json::<serde_json::Value>()
            .unwrap();
        names.push(response["name"].as_str().unwrap().to_string());
    }
    names
}

fn main() {
    for name in fetch(10) {
        println!("{}", name);
    }

    println!("{:?}", fetch(10));
}

실행 결과

grumpig
dartrix
celesteela
piloswine
tangrowth
virizion
glastrier
dewpider
hattrem
glameow
['boltund', 'gourgeist-average', 'shellos', 'shiinotic', 'eevee', 'cranidos', 'celesteela', 'solosis', 'houndour', 'landorus-incarnate']

파이썬 비동기

import asyncio
import aiohttp
from random import randint


# The highest Pokemon id
MAX_POKEMON = 898


async def _fetch(session, url):
    async with session.get(url) as response:
        return await response.json()


async def fetch(total):
    urls = [
        f"https://pokeapi.co/api/v2/pokemon/{randint(1, MAX_POKEMON)}"
        for _ in range(total)
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [_fetch(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            yield response["name"]


async def main():
    async for name in fetch(10):
        print(name)

    print([name async for name in fetch(10)])


asyncio.run(main())

러스트 비동기

use rand::Rng;
use reqwest;

const MAX_POKEMON: u32 = 898;

async fn fetch(total: u32) -> Vec<String> {
    let mut urls = Vec::new();
    for _ in 0..total {
        let url = format!(
            "https://pokeapi.co/api/v2/pokemon/{}",
            rand::thread_rng().gen_range(1..=MAX_POKEMON)
        );
        urls.push(url);
    }
    let client = reqwest::Client::new();
    let mut names = Vec::new();
    for url in urls {
        let response = client
            .get(&url)
            .send()
            .await
            .unwrap()
            .json::<serde_json::Value>()
            .await
            .unwrap();
        names.push(response["name"].as_str().unwrap().to_string());
    }
    names
}

#[tokio::main]
async fn main() {
    for name in fetch(10).await {
        println!("{}", name);
    }

    println!("{:?}", fetch(10).await);
}

 

rayon

비동기 프로그래밍과는 큰 상관이 없지만, tokio 와 자주 비교되는 크레이트인 rayon 에 대해서 살펴보겠습니다.

tokio vs rayon

Tokio와 Rayon은 모두 Rust에서 병렬 및 비동기 프로그래밍을 위한 라이브러리이지만, 초점과 사용 사례는 서로 다릅니다.

  • Tokio는 주로 비동기 프로그래밍, 특히 네트워크 애플리케이션 구축을 위한 비동기 프로그래밍에 중점을 둡니다. Rust에서 효율적이고 고성능이며 확장 가능한 비동기 애플리케이션을 구축하기 위한 도구 세트를 제공합니다. Tokio는 Rust의 퓨처 및 비동기/대기 언어 기능과 함께 작동하도록 설계되었으며, 비동기 작업을 효율적으로 실행할 수 있는 런타임을 제공합니다.
  • 반면 레이온은 데이터 처리 작업을 위한 병렬 처리와 동시성에 중점을 두고 있습니다. 대규모 데이터 컬렉션에 대한 계산을 병렬화하기 위한 간단하고 사용하기 쉬운 인터페이스를 제공합니다. 레이온은 Rust의 이터레이터 특성과 함께 작동하도록 설계되었으며, 데이터를 병렬로 처리하는 데 사용할 수 있는 일련의 병렬 알고리즘을 제공합니다.

요약하자면, Tokio는 비동기 네트워크 애플리케이션을 구축하는 데 이상적이며, Rayon은 대규모 데이터 컬렉션에 대한 계산을 병렬화하는 데 이상적입니다. 두 라이브러리 모두 다양한 사용 사례에 유용하며, 비동기 처리와 병렬 처리가 모두 필요한 경우에 함께 사용할 수 있습니다.

병렬 이터레이터

레이온은 Rust를 위한 데이터 병렬 처리 라이브러리입니다. 매우 가볍고 순차 계산을 병렬 계산으로 쉽게 변환할 수 있습니다. 또한 데이터 레이스가 발생하지 않는 것이 보장됩니다. 레이온은 아래 명령어로 설치 가능합니다.

cargo add rayon

공식 문서에서 권장하는 사용 방법은 prelude 밑에 있는 모든 것을 불러오는 것입니다. 이렇게 하면 병렬 이터레이터와 다른 트레이트를 전부 불러오기 때문에 코드를 훨씬 쉽게 작성할 수 있습니다.

use rayon::prelude::*;

기존의 순차 계산 함수에 병렬성을 더하려면, 단순히 이터레이터를 par_iter로 바꿔주기만 하면 됩니다.

use rayon::prelude::*;
use std::time::SystemTime;

fn sum_of_squares(input: &Vec<i32>) -> i32 {
    input
        .par_iter() // ✨
        .map(|&i| {
            std::thread::sleep(std::time::Duration::from_millis(10));
            i * i
        })
        .sum()
}

fn sum_of_squares_seq(input: &Vec<i32>) -> i32 {
    input
        .iter()
        .map(|&i| {
            std::thread::sleep(std::time::Duration::from_millis(10));
            i * i
        })
        .sum()
}

fn main() {
    let start = SystemTime::now();
    sum_of_squares(&(1..100).collect());
    println!("{}ms", start.elapsed().unwrap().as_millis());
    let start = SystemTime::now();
    sum_of_squares_seq(&(1..100).collect());
    println!("{}ms", start.elapsed().unwrap().as_millis());
}

실행 결과

106ms
1122ms

par_iter_mut 는 각 원소의 가변 레퍼런스를 받는 이터레이터입니다.

use rayon::prelude::*;

use std::time::SystemTime;

fn plus_one(x: &mut i32) {
    *x += 1;
    std::thread::sleep(std::time::Duration::from_millis(10));
}

fn increment_all_seq(input: &mut [i32]) {
    input.iter_mut().for_each(plus_one);
}

fn increment_all(input: &mut [i32]) {
    input.par_iter_mut().for_each(plus_one);
}

fn main() {
    let mut data = vec![1, 2, 3, 4, 5];

    let start = SystemTime::now();
    increment_all(&mut data);
    println!("{:?} - {}ms", data, start.elapsed().unwrap().as_millis());

    let start = SystemTime::now();
    increment_all_seq(&mut data);
    println!("{:?} - {}ms", data, start.elapsed().unwrap().as_millis());
}

실행 결과

[2, 3, 4, 5, 6] - 12ms
[3, 4, 5, 6, 7] - 55ms

par_sort 는 병합 정렬을 응용한 정렬 알고리즘을 사용해 데이터를 병렬적으로 분할해 정렬합니다.

use rand::Rng;
use rayon::prelude::*;

use std::time::SystemTime;

fn main() {
    let mut rng = rand::thread_rng();
    let mut data1: Vec<i32> = (0..1_000_000).map(|_| rng.gen_range(0..=100)).collect();
    let mut data2 = data1.clone();

    let start = SystemTime::now();
    data1.par_sort();
    println!("{}ms", start.elapsed().unwrap().as_millis());

    let start = SystemTime::now();
    data2.sort();
    println!("{}ms", start.elapsed().unwrap().as_millis());

    assert_eq!(data1, data2);
}

실행 결과

68ms
325ms

Rayon 사용 시 주의사항

멀티스레드도 마찬가지지만 스레드 스폰 및 조인에 시간이 소요되기 때문에 주의하세요.

'Rust' 카테고리의 다른 글

Rust testing  (142) 2023.06.15
Rust 파이썬 바인딩  (56) 2023.05.29
Rust 멀티스레딩  (28) 2023.05.27
Rust 스마트 포인터  (63) 2023.05.14
Rust 예외처리  (36) 2023.05.13
profile

DataScience

@Ninestar

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!