Rust에서 RabbitMQ 사용하기 (WorkQueue)

2023. 6. 3. 15:36Language/Rust

오늘은 Rust에서 RabbitMQ를 사용하는 방법에 대해 알아봅니다.

RabbitMQ는 너무 유명한 MessageQueue이므로 따로 설명하지는 않겠습니다.

RabbitMQ에서 가장 기본이 되는 것이 Work Queue입니다.

Work Queue

위와 같이 P는 Producer 즉 메시지를 Queue에 넣는 프로그램이며, C는 Consumer로 메시지를 받아 특정 작업을 하는 프로그램 입니다.

1개의 Queue에 여러 Producer와 여러 Consumer가 붙어서 사용 할 수 있습니다.

Producer

먼저 메시지를 Queue에 넣는 방법에 대해 알아보겠습니다.

Rust에서는 amiquip라는 Crate를 사용하면 됩니다.

$ cargo new producer
$ code producer

Cargo를 통해 프로젝트를 생성하고 Cargo.toml에 아래와 같이 Crate를 추가합니다.

[dependencies]
amiquip = { version = "0.4", default-features = false }

그리고 main.rs에 코드를 작성합니다.

use amiquip::{Connection, Exchange, Publish, QueueDeclareOptions, Result};

const TASK_QUEUE: &str = "task_queue";

fn main() -> Result<()> {

    // Open Connection
    let mut connection = Connection::insecure_open("amqp://test:test@localhost:5672")?;

    // Open a Channel
    let channel = connection.open_channel(None)?;

    // 만약 Queue가 없다면 만들어서 메시지를 넣기 위해 Declare를 작성합니다.
    // 없어도 됩니다.
    let queue = channel.queue_declare(TASK_QUEUE, 
    QueueDeclareOptions{
        durable:true,
        ..QueueDeclareOptions::default()
    });

    // Qos는 Consumer가 메시지를 얼마나 선점하고 가져오는지에 대한 설정입니다.
    channel.qos(0, 1, false)?;

    // Exchange는 나중에 배울텐데 지금은 Queue 1개만 사용하므로 direct로 설정해서 사용합니다.
    let exchange = Exchange::direct(&channel);

    // 메시지를 입력합니다.
    exchange.publish(Publish::new("hellow there".as_bytes(), TASK_QUEUE))?;

    connection.close()
}

위와 같이 RabbitMQ에 메시지가 정상적으로 들어간 것을 볼 수 있습니다.

Consumer

메시지가 Queue에 쌓여 있는것을 가져오기 위해서는 Consumer 프로그램을 작성합니다.

$ cargo new consumer
$ code consumer

Cargo를 통해 프로젝트를 생성하고 Cargo.toml에 아래와 같이 Crate를 추가합니다.

[dependencies]
amiquip = { version = "0.4", default-features = false }

아래와 같이 consumer 코드를 작성합니다.

use amiquip::{Connection, ConsumerMessage, ConsumerOptions, QueueDeclareOptions, Result};

const TASK_QUEUE: &str = "task_queue";

fn main() -> Result<()> {
    // Open Connection
    let mut connection = Connection::insecure_open("amqp://test:test@localhost:5672")?;

    // Open a Channel
    let channel = connection.open_channel(None)?;

    // 만약 Queue가 없다면 큐를 생성합니다.
    let queue = channel.queue_declare(TASK_QUEUE, 
        QueueDeclareOptions{
            durable:true,
            ..QueueDeclareOptions::default()
        })?;

    // Qos는 Consumer가 메시지를 얼마나 선점하고 가져오는지에 대한 설정입니다.
    channel.qos(0, 1, false)?;

    // Start a consumer.
    let consumer = queue.consume(ConsumerOptions::default())?;
    println!("Waiting for messages. Press Ctrl-C to exit.");

    for (i, message) in consumer.receiver().iter().enumerate() {
        match message {
            ConsumerMessage::Delivery(delivery) => {
                let body = String::from_utf8_lossy(&delivery.body); // body는 byte로 되어 있어 utf8로 인코딩합니다.
                println!("({:>3}) Received [{}]", i, body);
                consumer.ack(delivery)?; // Ack를 날려 Queue에서 메시지를 삭제합니다.
            }
            other => {
                println!("Consumer ended: {:?}", other);
                break;
            }
        }
    }

    connection.close()
}
$ cargo run
Waiting for messages. Press Ctrl-C to exit.
(  0) Received [hellow there]

메시지가 정상적으로 소비 된 것을 확인 할 수 있습니다.

 

참고

amiquip - Rust (docs.rs)

RabbitMQ tutorial - Work Queues — RabbitMQ

'Language > Rust' 카테고리의 다른 글

Rust Shuffle  (0) 2023.06.11
Rust fmt 모듈  (0) 2023.06.07
if Let  (1) 2023.06.03
Rust의 Trait  (0) 2023.05.11
Rust 설치 및 VS Code 셋팅(Linux, Debian)  (0) 2023.04.26