亚洲国产日韩欧美一区二区三区,精品亚洲国产成人av在线,国产99视频精品免视看7,99国产精品久久久久久久成人热,欧美日韩亚洲国产综合乱

搜索

Python怎樣操作消息隊列?pika連接RabbitMQ

蓮花仙者
發(fā)布: 2025-08-04 14:37:01
原創(chuàng)
249人瀏覽過

使用python通過pika操作rabbitmq的核心步驟為:1. 建立連接(blockingconnection);2. 創(chuàng)建通道(channel);3. 聲明持久化隊列(queue_declare,durable=true);4. 發(fā)布消息時設(shè)置消息持久化(delivery_mode=2);5. 消費者手動確認(rèn)消息(auto_ack=false,basic_ack)。選擇rabbitmq因其基于amqp協(xié)議,具備高可靠性、豐富的交換機類型和成熟生態(tài),適合需要復(fù)雜路由與消息不丟失的場景。pika的同步模式(blockingconnection)適用于簡單腳本,邏輯直觀但阻塞線程;異步模式(如selectconnection)適用于高并發(fā)服務(wù),通過事件循環(huán)提升吞吐量,但編程復(fù)雜度更高。消息持久化需同時設(shè)置隊列和消息的durable與delivery_mode=2,確保服務(wù)重啟后消息可恢復(fù);確認(rèn)機制通過關(guān)閉auto_ack并手動調(diào)用basic_ack實現(xiàn),保證消息被成功處理前不會丟失,支持“至少一次”投遞,要求消費者具備冪等性。完整實現(xiàn)包括生產(chǎn)者發(fā)送5條消息并休眠,消費者接收后模擬處理耗時并發(fā)送確認(rèn),確保消息可靠傳遞與處理。

Python怎樣操作消息隊列?pika連接RabbitMQ

Python操作消息隊列,Pika連接RabbitMQ,這組合在很多后端系統(tǒng)里簡直是標(biāo)配。它提供了一種可靠的異步通信機制,讓不同服務(wù)間解耦,處理高并發(fā)任務(wù)變得游刃有余。通過Pika庫,Python應(yīng)用可以輕松地發(fā)布消息到隊列,也能消費隊列中的消息,實現(xiàn)服務(wù)間的有效協(xié)作。

解決方案

要用Python通過Pika操作RabbitMQ,核心步驟圍繞著連接(Connection)、通道(Channel)、聲明隊列/交換機、發(fā)布消息和消費消息。最直接的方式是使用Pika的

BlockingConnection
登錄后復(fù)制
,它簡單易用,適合快速開發(fā)和非高并發(fā)場景。

生產(chǎn)者(發(fā)布消息)示例:

立即學(xué)習(xí)Python免費學(xué)習(xí)筆記(深入)”;

import pika
import time

# 連接RabbitMQ服務(wù)器
# 這里假設(shè)RabbitMQ運行在本地,沒有用戶名密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 聲明一個隊列,如果隊列不存在則創(chuàng)建。durable=True表示隊列持久化
# 即使RabbitMQ重啟,隊列也不會丟失
channel.queue_declare(queue='my_queue', durable=True)

message_count = 0
while message_count < 5:
    message = f"Hello World! Message number {message_count}"
    # 發(fā)布消息到默認(rèn)交換機,路由鍵為隊列名
    # delivery_mode=2表示消息持久化,即使RabbitMQ重啟,消息也不會丟失
    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 使消息持久化
        )
    )
    print(f" [x] Sent '{message}'")
    message_count += 1
    time.sleep(1) # 模擬發(fā)送間隔

connection.close()
登錄后復(fù)制

消費者(消費消息)示例:

import pika
import time

# 連接RabbitMQ服務(wù)器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 聲明相同的隊列,確保消費者知道要從哪個隊列取消息
channel.queue_declare(queue='my_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    """
    消息處理回調(diào)函數(shù)
    ch: channel對象
    method: 包含消息的 delivery tag 等信息
    properties: 消息屬性
    body: 消息體
    """
    print(f" [x] Received '{body.decode()}'")
    time.sleep(body.count(b'.')) # 模擬處理消息的耗時
    # 消息處理完成后,發(fā)送確認(rèn)回執(zhí)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(" [x] Done")

# 設(shè)置QoS (Quality of Service),每次只分發(fā)一條消息給消費者
# 這樣可以防止一個消費者處理速度慢,導(dǎo)致所有消息堆積在它那里
channel.basic_qos(prefetch_count=1)

# 開始消費消息,no_ack=False表示需要手動發(fā)送確認(rèn)回執(zhí)
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

# 啟動消費循環(huán),會一直阻塞直到連接關(guān)閉
channel.start_consuming()
登錄后復(fù)制

為什么選擇RabbitMQ作為消息隊列?

我個人覺得,RabbitMQ就像是消息隊列界的“老黃?!?,它穩(wěn)定、可靠、功能全面,是很多企業(yè)級應(yīng)用的首選。你可能聽說過Kafka在高吞吐量大數(shù)據(jù)場景的優(yōu)勢,但對于需要復(fù)雜路由、高可靠性、并且消息處理量并非天文數(shù)字的業(yè)務(wù),RabbitMQ的優(yōu)勢就凸顯出來了。

它基于AMQP(Advanced Message Queuing Protocol)協(xié)議,這個協(xié)議本身就為消息的可靠傳輸、事務(wù)性、路由等提供了強大的保障。這意味著,當(dāng)你的系統(tǒng)對消息丟失零容忍時,RabbitMQ能給你足夠的信心。它的交換機(Exchange)類型非常豐富,比如直連(Direct)、扇出(Fanout)、主題(Topic)、頭部(Headers),可以滿足各種復(fù)雜的路由需求。想象一下,你有一個訂單系統(tǒng),新訂單消息可能需要同時通知庫存、物流和客戶服務(wù)部門,通過RabbitMQ的Topic交換機,一條消息就能精準(zhǔn)地分發(fā)給所有相關(guān)方,這可比你手動維護(hù)多個HTTP請求或者數(shù)據(jù)庫觸發(fā)器要優(yōu)雅和高效得多。

而且,RabbitMQ的社區(qū)非常活躍,文檔也相當(dāng)完善,遇到問題很容易找到解決方案。對于Python開發(fā)者來說,Pika庫的支持也很好,雖然Pika的API有時候看起來有點“原生”,需要對AMQP概念有一定理解,但這正是它強大和靈活的體現(xiàn)。它的成熟度,讓它在很多關(guān)鍵業(yè)務(wù)場景下,成為一個讓人放心的選擇。

Pika庫的異步與同步模式有何不同?

Pika庫提供了兩種主要的工作模式:同步模式(

BlockingConnection
登錄后復(fù)制
)和異步模式(如
SelectConnection
登錄后復(fù)制
TornadoConnection
登錄后復(fù)制
等)。這兩種模式的選擇,很大程度上取決于你的應(yīng)用場景和對性能、并發(fā)處理的需求。

想象一下,你是個餐廳服務(wù)員。

同步模式 (

BlockingConnection
登錄后復(fù)制
) 就像你一次只服務(wù)一位客人。你接到一個點餐請求,就一直等到菜做好、客人吃完、結(jié)賬,你才去接下一個客人的請求。這種模式簡單直接,邏輯清晰,不容易出錯。對于那些一次只處理少量消息、或者在腳本中一次性發(fā)送一批消息然后退出的場景,
BlockingConnection
登錄后復(fù)制
是完美的。它會阻塞當(dāng)前線程,直到操作完成。比如,一個簡單的日志收集腳本,把日志發(fā)到RabbitMQ,用
BlockingConnection
登錄后復(fù)制
就足夠了,寫起來也很順手。

異步模式 (

SelectConnection
登錄后復(fù)制
,
TornadoConnection
登錄后復(fù)制
等)
則像你同時服務(wù)多位客人。你接到點餐請求后,不是傻等,而是把點餐單交給廚房,然后立刻去接其他客人的請求,或者去處理其他事務(wù)(比如倒水、收拾桌子)。當(dāng)廚房通知你菜好了,你再回來處理之前的點餐。這種模式復(fù)雜一些,因為你需要管理多個并發(fā)的事件,但它的效率非常高,不會因為一個耗時操作而阻塞整個應(yīng)用。對于Web服務(wù)(如Django、Flask應(yīng)用)、長連接服務(wù)、或者需要處理大量并發(fā)消息的消費者來說,異步模式是必不可少的。它能讓你的應(yīng)用在等待I/O(比如網(wǎng)絡(luò)傳輸)的時候,去處理其他事情,大大提升了吞吐量和響應(yīng)速度。當(dāng)然,這也意味著你需要更深入地理解Python的異步編程模型,比如回調(diào)函數(shù)、事件循環(huán)等。雖然學(xué)習(xí)曲線可能陡峭一點,但一旦掌握,它能讓你的Python應(yīng)用在處理消息隊列時如虎添翼。

消息的持久化與確認(rèn)機制在Pika中如何實現(xiàn)?

在生產(chǎn)環(huán)境中,消息的持久化和確認(rèn)機制是確保消息不丟失的關(guān)鍵。這兩點在Pika中都有明確的實現(xiàn)方式,它們共同構(gòu)筑了RabbitMQ“至少一次”消息投遞的可靠性保障。

消息持久化:

消息持久化分為兩個層面:隊列持久化消息持久化

Pika
Pika

Pika.art是一個AI驅(qū)動的多樣化風(fēng)格視頻創(chuàng)作平臺

Pika481
查看詳情 Pika
  1. 隊列持久化: 當(dāng)你聲明隊列時,將

    durable
    登錄后復(fù)制
    參數(shù)設(shè)為
    True
    登錄后復(fù)制

    channel.queue_declare(queue='my_queue', durable=True)
    登錄后復(fù)制

    這樣做是為了防止RabbitMQ服務(wù)器重啟后,你創(chuàng)建的隊列消失。如果隊列是非持久化的,服務(wù)器一重啟,隊列就不在了,即使里面有持久化的消息,也無處可尋了。

  2. 消息持久化: 當(dāng)你發(fā)布消息時,通過

    pika.BasicProperties
    登錄后復(fù)制
    設(shè)置
    delivery_mode=2
    登錄后復(fù)制
    。

    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 2表示消息持久化
        )
    )
    登錄后復(fù)制

    這告訴RabbitMQ,這條消息需要寫入磁盤。這樣,即使在消息到達(dá)消費者并被確認(rèn)之前,RabbitMQ服務(wù)器突然崩潰,重啟后這條消息也能從磁盤中恢復(fù),并重新投遞。

需要注意的是,即使消息和隊列都持久化了,也不能保證100%不丟消息。比如,在消息到達(dá)RabbitMQ并寫入磁盤的極短時間內(nèi),如果服務(wù)器崩潰,消息可能還是會丟失。對于極端高可靠性的場景,你可能還需要結(jié)合發(fā)布者確認(rèn)(Publisher Confirms)機制。

消息確認(rèn)機制(Acknowledgements):

這是消費者端確保消息被成功處理的關(guān)鍵。當(dāng)消費者從隊列中獲取一條消息后,它需要向RabbitMQ發(fā)送一個“確認(rèn)回執(zhí)”(Acknowledgement)。

  1. 關(guān)閉自動確認(rèn):

    basic_consume
    登錄后復(fù)制
    時,將
    auto_ack
    登錄后復(fù)制
    參數(shù)設(shè)為
    False
    登錄后復(fù)制
    。

    channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
    登錄后復(fù)制

    默認(rèn)情況下,

    auto_ack
    登錄后復(fù)制
    True
    登錄后復(fù)制
    ,這意味著RabbitMQ一旦把消息發(fā)給消費者,就認(rèn)為消息已經(jīng)被成功處理并從隊列中刪除。這顯然是不安全的。

  2. 手動發(fā)送確認(rèn)回執(zhí): 在你的消息回調(diào)函數(shù)中,當(dāng)消息被成功處理后,調(diào)用

    channel.basic_ack()
    登錄后復(fù)制
    。

    def callback(ch, method, properties, body):
        # ... 處理消息的邏輯 ...
        ch.basic_ack(delivery_tag=method.delivery_tag) # 確認(rèn)消息
    登錄后復(fù)制

    delivery_tag
    登錄后復(fù)制
    是RabbitMQ分配給每條消息的唯一標(biāo)識。通過發(fā)送確認(rèn)回執(zhí),RabbitMQ就知道這條消息已經(jīng)被消費者成功處理,可以安全地從隊列中刪除了。

如果消費者在處理消息過程中崩潰,或者沒有發(fā)送確認(rèn)回執(zhí),RabbitMQ會認(rèn)為這條消息沒有被成功處理,并在消費者重新連接或有其他消費者可用時,將這條消息重新投遞給其他消費者。這保證了消息的“至少一次”投遞:消息可能被投遞多次,但絕不會丟失。當(dāng)然,這也意味著你的消費者需要具備冪等性,即多次處理同一條消息不會產(chǎn)生副作用。

你也可以使用

basic_nack
登錄后復(fù)制
(否定確認(rèn))或
basic_reject
登錄后復(fù)制
來拒絕消息。
basic_nack
登錄后復(fù)制
更靈活,可以指定是否將消息重新放回隊列(
requeue=True/False
登錄后復(fù)制
),而
basic_reject
登錄后復(fù)制
通常用于徹底拒絕一條消息,且只能拒絕一條。在實際應(yīng)用中,根據(jù)業(yè)務(wù)邏輯選擇合適的確認(rèn)方式,是構(gòu)建健壯消息系統(tǒng)的關(guān)鍵。

以上就是Python怎樣操作消息隊列?pika連接RabbitMQ的詳細(xì)內(nèi)容,更多請關(guān)注php中文網(wǎng)其它相關(guān)文章!

最佳 Windows 性能的頂級免費優(yōu)化軟件
最佳 Windows 性能的頂級免費優(yōu)化軟件

每個人都需要一臺速度更快、更穩(wěn)定的 PC。隨著時間的推移,垃圾文件、舊注冊表數(shù)據(jù)和不必要的后臺進(jìn)程會占用資源并降低性能。幸運的是,許多工具可以讓 Windows 保持平穩(wěn)運行。

下載
來源:php中文網(wǎng)
本文內(nèi)容由網(wǎng)友自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,本站不承擔(dān)相應(yīng)法律責(zé)任。如您發(fā)現(xiàn)有涉嫌抄襲侵權(quán)的內(nèi)容,請聯(lián)系admin@php.cn
最新問題
開源免費商場系統(tǒng)廣告
最新下載
更多>
網(wǎng)站特效
網(wǎng)站源碼
網(wǎng)站素材
前端模板
關(guān)于我們 免責(zé)申明 意見反饋 講師合作 廣告合作 最新更新
php中文網(wǎng):公益在線php培訓(xùn),幫助PHP學(xué)習(xí)者快速成長!
關(guān)注服務(wù)號 技術(shù)交流群
PHP中文網(wǎng)訂閱號
每天精選資源文章推送
PHP中文網(wǎng)APP
隨時隨地碎片化學(xué)習(xí)
PHP中文網(wǎng)抖音號
發(fā)現(xiàn)有趣的

Copyright 2014-2025 http://ipnx.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號