Rust에서 RabbitMQ 사용하기 (WorkQueue)
2023. 6. 3. 15:36ㆍLanguage/Rust
오늘은 Rust에서 RabbitMQ를 사용하는 방법에 대해 알아봅니다.
RabbitMQ는 너무 유명한 MessageQueue이므로 따로 설명하지는 않겠습니다.
RabbitMQ에서 가장 기본이 되는 것이 Work Queue입니다.
위와 같이 P는 Producer 즉 메시지를 Queue에 넣는 프로그램이며, C는 Consumer로 메시지를 받아 특정 작업을 하는 프로그램 입니다.
1개의 Queue에 여러 Producer와 여러 Consumer가 붙어서 사용 할 수 있습니다.
Producer
먼저 메시지를 Queue에 넣는 방법에 대해 알아보겠습니다.
Rust에서는 amiquip라는 Crate를 사용하면 됩니다.
$ cargo new producer
$ code producer
Cargo를 통해 프로젝트를 생성하고 Cargo.toml
에 아래와 같이 Crate를 추가합니다.
[dependencies]
amiquip = { version = "0.4", default-features = false }
그리고 main.rs에 코드를 작성합니다.
use amiquip::{Connection, Exchange, Publish, QueueDeclareOptions, Result};
const TASK_QUEUE: &str = "task_queue";
fn main() -> Result<()> {
// Open Connection
let mut connection = Connection::insecure_open("amqp://test:test@localhost:5672")?;
// Open a Channel
let channel = connection.open_channel(None)?;
// 만약 Queue가 없다면 만들어서 메시지를 넣기 위해 Declare를 작성합니다.
// 없어도 됩니다.
let queue = channel.queue_declare(TASK_QUEUE,
QueueDeclareOptions{
durable:true,
..QueueDeclareOptions::default()
});
// Qos는 Consumer가 메시지를 얼마나 선점하고 가져오는지에 대한 설정입니다.
channel.qos(0, 1, false)?;
// Exchange는 나중에 배울텐데 지금은 Queue 1개만 사용하므로 direct로 설정해서 사용합니다.
let exchange = Exchange::direct(&channel);
// 메시지를 입력합니다.
exchange.publish(Publish::new("hellow there".as_bytes(), TASK_QUEUE))?;
connection.close()
}
위와 같이 RabbitMQ에 메시지가 정상적으로 들어간 것을 볼 수 있습니다.
Consumer
메시지가 Queue에 쌓여 있는것을 가져오기 위해서는 Consumer 프로그램을 작성합니다.
$ cargo new consumer
$ code consumer
Cargo를 통해 프로젝트를 생성하고 Cargo.toml
에 아래와 같이 Crate를 추가합니다.
[dependencies]
amiquip = { version = "0.4", default-features = false }
아래와 같이 consumer 코드를 작성합니다.
use amiquip::{Connection, ConsumerMessage, ConsumerOptions, QueueDeclareOptions, Result};
const TASK_QUEUE: &str = "task_queue";
fn main() -> Result<()> {
// Open Connection
let mut connection = Connection::insecure_open("amqp://test:test@localhost:5672")?;
// Open a Channel
let channel = connection.open_channel(None)?;
// 만약 Queue가 없다면 큐를 생성합니다.
let queue = channel.queue_declare(TASK_QUEUE,
QueueDeclareOptions{
durable:true,
..QueueDeclareOptions::default()
})?;
// Qos는 Consumer가 메시지를 얼마나 선점하고 가져오는지에 대한 설정입니다.
channel.qos(0, 1, false)?;
// Start a consumer.
let consumer = queue.consume(ConsumerOptions::default())?;
println!("Waiting for messages. Press Ctrl-C to exit.");
for (i, message) in consumer.receiver().iter().enumerate() {
match message {
ConsumerMessage::Delivery(delivery) => {
let body = String::from_utf8_lossy(&delivery.body); // body는 byte로 되어 있어 utf8로 인코딩합니다.
println!("({:>3}) Received [{}]", i, body);
consumer.ack(delivery)?; // Ack를 날려 Queue에서 메시지를 삭제합니다.
}
other => {
println!("Consumer ended: {:?}", other);
break;
}
}
}
connection.close()
}
$ cargo run
Waiting for messages. Press Ctrl-C to exit.
( 0) Received [hellow there]
메시지가 정상적으로 소비 된 것을 확인 할 수 있습니다.
참고
'Language > Rust' 카테고리의 다른 글
Rust Shuffle (0) | 2023.06.11 |
---|---|
Rust fmt 모듈 (0) | 2023.06.07 |
if Let (1) | 2023.06.03 |
Rust의 Trait (0) | 2023.05.11 |
Rust 설치 및 VS Code 셋팅(Linux, Debian) (0) | 2023.04.26 |