使用python通過pika操作rabbitmq的核心步驟為:1. 建立連接(blockingconnection);2. 創(chuàng)建通道(channel);3. 聲明持久化隊列(queue_declare,durable=true);4. 發(fā)布消息時設(shè)置消息持久化(delivery_mode=2);5. 消費者手動確認(rèn)消息(auto_ack=false,basic_ack)。選擇rabbitmq因其基于amqp協(xié)議,具備高可靠性、豐富的交換機類型和成熟生態(tài),適合需要復(fù)雜路由與消息不丟失的場景。pika的同步模式(blockingconnection)適用于簡單腳本,邏輯直觀但阻塞線程;異步模式(如selectconnection)適用于高并發(fā)服務(wù),通過事件循環(huán)提升吞吐量,但編程復(fù)雜度更高。消息持久化需同時設(shè)置隊列和消息的durable與delivery_mode=2,確保服務(wù)重啟后消息可恢復(fù);確認(rèn)機制通過關(guān)閉auto_ack并手動調(diào)用basic_ack實現(xiàn),保證消息被成功處理前不會丟失,支持“至少一次”投遞,要求消費者具備冪等性。完整實現(xiàn)包括生產(chǎn)者發(fā)送5條消息并休眠,消費者接收后模擬處理耗時并發(fā)送確認(rèn),確保消息可靠傳遞與處理。
Python操作消息隊列,Pika連接RabbitMQ,這組合在很多后端系統(tǒng)里簡直是標(biāo)配。它提供了一種可靠的異步通信機制,讓不同服務(wù)間解耦,處理高并發(fā)任務(wù)變得游刃有余。通過Pika庫,Python應(yīng)用可以輕松地發(fā)布消息到隊列,也能消費隊列中的消息,實現(xiàn)服務(wù)間的有效協(xié)作。
要用Python通過Pika操作RabbitMQ,核心步驟圍繞著連接(Connection)、通道(Channel)、聲明隊列/交換機、發(fā)布消息和消費消息。最直接的方式是使用Pika的
BlockingConnection
生產(chǎn)者(發(fā)布消息)示例:
立即學(xué)習(xí)“Python免費學(xué)習(xí)筆記(深入)”;
import pika import time # 連接RabbitMQ服務(wù)器 # 這里假設(shè)RabbitMQ運行在本地,沒有用戶名密碼 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 聲明一個隊列,如果隊列不存在則創(chuàng)建。durable=True表示隊列持久化 # 即使RabbitMQ重啟,隊列也不會丟失 channel.queue_declare(queue='my_queue', durable=True) message_count = 0 while message_count < 5: message = f"Hello World! Message number {message_count}" # 發(fā)布消息到默認(rèn)交換機,路由鍵為隊列名 # delivery_mode=2表示消息持久化,即使RabbitMQ重啟,消息也不會丟失 channel.basic_publish( exchange='', routing_key='my_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 使消息持久化 ) ) print(f" [x] Sent '{message}'") message_count += 1 time.sleep(1) # 模擬發(fā)送間隔 connection.close()
消費者(消費消息)示例:
import pika import time # 連接RabbitMQ服務(wù)器 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 聲明相同的隊列,確保消費者知道要從哪個隊列取消息 channel.queue_declare(queue='my_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): """ 消息處理回調(diào)函數(shù) ch: channel對象 method: 包含消息的 delivery tag 等信息 properties: 消息屬性 body: 消息體 """ print(f" [x] Received '{body.decode()}'") time.sleep(body.count(b'.')) # 模擬處理消息的耗時 # 消息處理完成后,發(fā)送確認(rèn)回執(zhí) ch.basic_ack(delivery_tag=method.delivery_tag) print(" [x] Done") # 設(shè)置QoS (Quality of Service),每次只分發(fā)一條消息給消費者 # 這樣可以防止一個消費者處理速度慢,導(dǎo)致所有消息堆積在它那里 channel.basic_qos(prefetch_count=1) # 開始消費消息,no_ack=False表示需要手動發(fā)送確認(rèn)回執(zhí) channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False) # 啟動消費循環(huán),會一直阻塞直到連接關(guān)閉 channel.start_consuming()
我個人覺得,RabbitMQ就像是消息隊列界的“老黃?!?,它穩(wěn)定、可靠、功能全面,是很多企業(yè)級應(yīng)用的首選。你可能聽說過Kafka在高吞吐量大數(shù)據(jù)場景的優(yōu)勢,但對于需要復(fù)雜路由、高可靠性、并且消息處理量并非天文數(shù)字的業(yè)務(wù),RabbitMQ的優(yōu)勢就凸顯出來了。
它基于AMQP(Advanced Message Queuing Protocol)協(xié)議,這個協(xié)議本身就為消息的可靠傳輸、事務(wù)性、路由等提供了強大的保障。這意味著,當(dāng)你的系統(tǒng)對消息丟失零容忍時,RabbitMQ能給你足夠的信心。它的交換機(Exchange)類型非常豐富,比如直連(Direct)、扇出(Fanout)、主題(Topic)、頭部(Headers),可以滿足各種復(fù)雜的路由需求。想象一下,你有一個訂單系統(tǒng),新訂單消息可能需要同時通知庫存、物流和客戶服務(wù)部門,通過RabbitMQ的Topic交換機,一條消息就能精準(zhǔn)地分發(fā)給所有相關(guān)方,這可比你手動維護(hù)多個HTTP請求或者數(shù)據(jù)庫觸發(fā)器要優(yōu)雅和高效得多。
而且,RabbitMQ的社區(qū)非常活躍,文檔也相當(dāng)完善,遇到問題很容易找到解決方案。對于Python開發(fā)者來說,Pika庫的支持也很好,雖然Pika的API有時候看起來有點“原生”,需要對AMQP概念有一定理解,但這正是它強大和靈活的體現(xiàn)。它的成熟度,讓它在很多關(guān)鍵業(yè)務(wù)場景下,成為一個讓人放心的選擇。
Pika庫提供了兩種主要的工作模式:同步模式(
BlockingConnection
SelectConnection
TornadoConnection
想象一下,你是個餐廳服務(wù)員。
同步模式 (BlockingConnection
BlockingConnection
BlockingConnection
異步模式 (SelectConnection
TornadoConnection
在生產(chǎn)環(huán)境中,消息的持久化和確認(rèn)機制是確保消息不丟失的關(guān)鍵。這兩點在Pika中都有明確的實現(xiàn)方式,它們共同構(gòu)筑了RabbitMQ“至少一次”消息投遞的可靠性保障。
消息持久化:
消息持久化分為兩個層面:隊列持久化和消息持久化。
隊列持久化: 當(dāng)你聲明隊列時,將
durable
True
channel.queue_declare(queue='my_queue', durable=True)
這樣做是為了防止RabbitMQ服務(wù)器重啟后,你創(chuàng)建的隊列消失。如果隊列是非持久化的,服務(wù)器一重啟,隊列就不在了,即使里面有持久化的消息,也無處可尋了。
消息持久化: 當(dāng)你發(fā)布消息時,通過
pika.BasicProperties
delivery_mode=2
channel.basic_publish( exchange='', routing_key='my_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 2表示消息持久化 ) )
這告訴RabbitMQ,這條消息需要寫入磁盤。這樣,即使在消息到達(dá)消費者并被確認(rèn)之前,RabbitMQ服務(wù)器突然崩潰,重啟后這條消息也能從磁盤中恢復(fù),并重新投遞。
需要注意的是,即使消息和隊列都持久化了,也不能保證100%不丟消息。比如,在消息到達(dá)RabbitMQ并寫入磁盤的極短時間內(nèi),如果服務(wù)器崩潰,消息可能還是會丟失。對于極端高可靠性的場景,你可能還需要結(jié)合發(fā)布者確認(rèn)(Publisher Confirms)機制。
消息確認(rèn)機制(Acknowledgements):
這是消費者端確保消息被成功處理的關(guān)鍵。當(dāng)消費者從隊列中獲取一條消息后,它需要向RabbitMQ發(fā)送一個“確認(rèn)回執(zhí)”(Acknowledgement)。
關(guān)閉自動確認(rèn): 在
basic_consume
auto_ack
False
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
默認(rèn)情況下,
auto_ack
True
手動發(fā)送確認(rèn)回執(zhí): 在你的消息回調(diào)函數(shù)中,當(dāng)消息被成功處理后,調(diào)用
channel.basic_ack()
def callback(ch, method, properties, body): # ... 處理消息的邏輯 ... ch.basic_ack(delivery_tag=method.delivery_tag) # 確認(rèn)消息
delivery_tag
如果消費者在處理消息過程中崩潰,或者沒有發(fā)送確認(rèn)回執(zhí),RabbitMQ會認(rèn)為這條消息沒有被成功處理,并在消費者重新連接或有其他消費者可用時,將這條消息重新投遞給其他消費者。這保證了消息的“至少一次”投遞:消息可能被投遞多次,但絕不會丟失。當(dāng)然,這也意味著你的消費者需要具備冪等性,即多次處理同一條消息不會產(chǎn)生副作用。
你也可以使用
basic_nack
basic_reject
basic_nack
requeue=True/False
basic_reject
以上就是Python怎樣操作消息隊列?pika連接RabbitMQ的詳細(xì)內(nèi)容,更多請關(guān)注php中文網(wǎng)其它相關(guān)文章!
每個人都需要一臺速度更快、更穩(wěn)定的 PC。隨著時間的推移,垃圾文件、舊注冊表數(shù)據(jù)和不必要的后臺進(jìn)程會占用資源并降低性能。幸運的是,許多工具可以讓 Windows 保持平穩(wěn)運行。
微信掃碼
關(guān)注PHP中文網(wǎng)服務(wù)號
QQ掃碼
加入技術(shù)交流群
Copyright 2014-2025 http://ipnx.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號