国产一区二区美女诱惑_国产精品免费播放_91精品国产综合久久香蕉麻豆 _久久精品30_久久综合88_国产精品亚洲成人_黑人极品videos精品欧美裸_亚洲色图欧美激情

原創生活

國內 商業 滾動

基金 金融 股票

期貨金融

科技 行業 房產

銀行 公司 消費

生活滾動

保險 海外 觀察

財經 生活 期貨

當前位置:滾動 >

微服務進階避坑策略 微服務異步架構MQ之RocketMQ方案

文章來源:財金網  發布時間: 2019-04-11 11:42:28  責任編輯:cfenews.com
+|-

【原標題:微服務進階避坑策略 微服務異步架構MQ之RocketMQ方案】“我們大家都知道把一個微服務架構變成一個異步架構只需要加一個MQ,現在市面上有很多MQ的開源框架。到底選擇哪一個MQ的開源框架才合適呢?”

一、什么是MQ?MQ的原理是什么?

MQ就是消息隊列,是Message Queue的縮寫。消息隊列是一種通信方式。消息的本質就是一種數據結構。因為MQ把項目中的消息集中式的處理和存儲,所以MQ主要有解耦,并發,和削峰的功能。

1,解耦:

MQ的消息生產者和消費者互相不關心對方是否存在,通過MQ這個中間件的存在,使整個系統達到解耦的作用。

如果服務之間用RPC通信,當一個服務跟幾百個服務通信時,如果那個服務的通信接口改變,那么幾百個服務的通信接口都的跟著變動,這是非常頭疼的一件事。

但是采用MQ之后,不管是生產者或者消費者都可以單獨改變自己。他們的改變不會影響到別的服務。從而達到解耦的目的。為什么要解耦呢?說白了就是方便,減少不必要的工作量。

2,并發

MQ有生產者集群和消費者集群,所以客戶端是億級用戶時,他們都是并行的。從而大大提升響應速度。

3,削峰

因為MQ能存儲的消息量很大,所以他可以把大量的消息請求先存下了,然后再并發的方式慢慢處理。

如果采用RPC通信,每一次請求用調用RPC接口,當請求量巨大的時候,因為RPC的請求是很耗資源的,所以巨大的請求一定會壓垮服務器。

削峰的目的是用戶體驗變好,并且使整個系統穩定。能承受大量請求消息。

二、現在市面上有什么MQ,

重點介紹RocketMQ

現在市面上的MQ有很多,主要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,這些都是開源的MQ產品。以前很多人推薦使用RabbitMQ,他也是非常好用的MQ產品,這里不做過多的介紹。Kafka也是高吞吐量的老大,我們這里也不介紹。

我們重點介紹一下RocketMQ,RocketMQ是阿里巴巴在2012年開源的分布式消息中間件,目前已經捐贈給Apache軟件基金會,并于并于2017年9月25日成為 Apache 的頂級項目。

作為經歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。

功能概覽圖

可以看見RocketMQ支持定時和延時消息,這是RabbitMQ所沒有的能力。

RocketMQ的物理結構

從這里可以看出,RocketMQ涉及到四大集群,producer,Name Server,Consumer,Broker。

Producer集群:

是生產者集群,負責產生消息,向消費者發送由業務應用程序系統生成的消息,RocketMQ提供三種方式發送消息:同步,異步,單向。

一,普通消息

1,同步原理圖

同步消息關鍵代碼

try {        SendResult sendResult = producer.send(msg);        // 同步發送消息,只要不拋異常就是成功        if (sendResult != null) {        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());    }    catch (Exception e) {        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());        e.printStackTrace();    }}

2,異步原理圖

異步消息關鍵代碼

producer.sendAsync(msg, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) {       // 消費發送成功      System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); }@Overridepublic void onException(OnExceptionContext context) {    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());}});

3,單向(Oneway)發送原理圖

單向只發送,不等待返回,所以速度最快,一般在微秒級,但可能丟失

單向(Oneway)發送消息關鍵代碼

producer.sendOneway(msg);

三種發送消息具體代碼請參考文檔:https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.566.7e49793fuueSlB[1]

二,定時消息和延時消息

發送定時消息關鍵代碼

try {     // 定時消息,單位毫秒(ms),在指定時間戳(當前時間之后)進行投遞,例如 2016-03-07 16:21:00 投遞。如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。    long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();    msg.setStartDeliverTime(timeStamp);    // 發送消息,只要不拋異常就是成功    SendResult sendResult = producer.send(msg);    System.out.println("MessageId:"+sendResult.getMessageId());}catch (Exception e) {    // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());    e.printStackTrace(); }

發送延時消息關鍵代碼

try {    // 延時消息,單位毫秒(ms),在指定延遲時間(當前時間之后)進行投遞,例如消息在 3 秒后投遞    long delayTime = System.currentTimeMillis() + 3000;    // 設置消息需要被投遞的時間 msg.setStartDeliverTime(delayTime);     SendResult sendResult = producer.send(msg);     // 同步發送消息,只要不拋異常就是成功     if (sendResult != null) {        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());      }} catch (Exception e) {   // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());    e.printStackTrace(); }

注意事項

1,定時和延時消息的 msg.setStartDeliverTime 參數需要設置成當前時間戳之后的某個時刻(單位毫秒)。如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。

2,定時和延時消息的 msg.setStartDeliverTime 參數可設置40天內的任何時刻(單位毫秒),超過40天消息發送將失敗。

3,StartDeliverTime 是服務端開始向消費端投遞的時間。 如果消費者當前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。

4,由于客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差。

5,設置定時和延時消息的投遞時間后,依然受 3 天的消息保存時長限制。例如,設置定時消息 5 天后才能被消費,如果第 5 天后一直沒被消費,那么這條消息將在第8天被刪除。

6,除 Java 語言支持延時消息外,其他語言都不支持延時消息。

發布消息原理圖

三,事務消息

RocketMQ提供類似X/Open XA的分布式事務功能來確保業務發送方和MQ消息的最終一致性,其本質是通過半消息的方式把分布式事務放在MQ端來處理。

原理圖

其中:

1,發送方向消息隊列 RocketMQ 服務端發送消息。

2,服務端將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。

3,發送方開始執行本地事務邏輯。

4,發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。

5,在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。

6,發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。

7,發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半消息進行操作。

RocketMQ的半消息機制的注意事項是

1,根據第六步可以看出他要求發送方提供業務回查接口。

2,不能保證發送方的消息冪等,在ack沒有返回的情況下,可能存在重復消息

3,消費方要做冪等處理。

核心代碼

final BusinessService businessService = new BusinessService(); // 本地業務

TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());try {    SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {    @Override    public TransactionStatus execute(Message msg, Object arg) {        // 消息 ID(有可能消息體一樣,但消息 ID 不一樣,當前消息 ID 在控制臺無法查詢)        String msgId = msg.getMsgID();        // 消息體內容進行 crc32,也可以使用其它的如 MD5        long crc32Id = HashUtil.crc32Code(msg.getBody());        // 消息 ID 和 crc32id 主要是用來防止消息重復        // 如果業務本身是冪等的,可以忽略,否則需要利用 msgId 或 crc32Id 來做冪等        // 如果要求消息絕對不重復,推薦做法是對消息體 body 使用 crc32 或 MD5 來防止重復消息        Object businessServiceArgs = new Object();        TransactionStatus transactionStatus =TransactionStatus.Unknow;        try {        boolean isCommit = businessService.execbusinessService(businessServiceArgs);        if (isCommit) {        // 本地事務成功則提交消息 transactionStatus = TransactionStatus.CommitTransaction;        } else {        // 本地事務失敗則回滾消息 transactionStatus = TransactionStatus.RollbackTransaction;        }        } catch (Exception e) {log.error("Message Id:{}", msgId, e);        }        System.out.println(msg.getMsgID());log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());         return transactionStatus;    }    }, null);    }catch (Exception e) {  // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理   System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());   e.printStackTrace();}

具體代碼參考文檔:https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.570.5d5738a49FJl1t[2]

所有消息發布原理圖

producer完全無狀態,可以集群部署。

Name Server集群:

NameServer是一個幾乎無狀態的節點,可集群部署,節點之間無任何信息同步,NameServer很像注冊中心的功能。

聽說阿里之前的NameServer 是用ZooKeeper做的,可能因為Zookeeper不能滿足大規模并發的要求,所以之后NameServer 是阿里自研的。

NameServer其實就是一個路由表,他管理Producer和Comsumer之間的發現和注冊。

Broker集群:

Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slaver,但是一個Slaver只能對應一個Master,Master與Slaver的對應關系通過指定相同的BrokerName。

不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有的NameServer。

Consumer集群:

訂閱方式

消息隊列 RocketMQ 支持以下兩種訂閱方式:

集群訂閱:同一個 Group ID 所標識的所有 Consumer 平均分攤消費消息。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那么在集群消費模式下每個實例平均分攤,只消費其中的 3 條消息。

// 集群訂閱方式設置(不設置的情況下,默認為集群訂閱方式)properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

廣播訂閱:同一個 Group ID 所標識的所有 Consumer 都會各自消費某條消息一次。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那么在廣播消費模式下每個實例都會各自消費 9 條消息。

// 廣播訂閱方式設置properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BR

OADCASTING);

訂閱消息關鍵代碼:

Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() { //訂閱多個 Tagpublic Action consume(Message message, ConsumeContext context) {   System.out.println("Receive: " + message);   return Action.CommitMessage;}});//訂閱另外一個 Topicconsumer.subscribe("TopicTestMQ-Other", "*", **new** MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) {    System.out.println("Receive: " + message);    return Action.CommitMessage;}});consumer.start();

專題首頁|財金網首頁

原創
新聞

精彩
互動

獨家
觀察

京ICP備2021034106號-38   營業執照公示信息  財金網  版權所有  cfenews.com  投稿郵箱:362293157@qq.com  業務QQ:362293157立即發帖
国产精品久久久久精k8 | 欧美精品高清视频| 欧美一区二区视频在线观看2020| 亚洲精品99999| 欧洲亚洲精品视频| 欧美三级理伦电影| 久久精品资源| 成人一级毛片| 日本亚洲最大的色成网站www| 国产91丝袜在线观看| 亚洲欧美日韩中文字幕一区二区三区 | 狠狠躁少妇一区二区三区| 欧美极品在线| 在线国产一区二区| 精品中文字幕一区二区小辣椒| 久久久久久久久久久久久久久99| 精品福利在线看| 日韩精品视频在线免费观看| 香港伦理在线| 一区二区亚洲视频| 亚洲国产专区校园欧美| 99视频国产精品| 日本精品视频一区二区| 天堂在线看视频| 大黄网站在线观看| 亚洲午夜久久| 美国毛片一区二区| 亚洲最大成人网4388xx| 日韩av网站电影| www在线免费观看视频| 97久久超碰| 日本中文字幕一区| 亚洲精品国产精华液| 国产视频亚洲精品| av剧情在线观看| 日韩av在线播放网址| 国产成人午夜精品5599| 午夜精品福利在线| 91午夜在线| 欧美亚洲福利| 视频一区视频二区在线观看| 亚洲欧美电影院| 免费在线黄网| 在线免费成人| 青青草一区二区三区| 亚洲国产精品视频| 中文字幕在线视频免费观看| 久久精品嫩草影院| 日韩精品国产欧美| 日韩欧美成人网| 国产精品四虎| 精品久久久亚洲| 成人禁用看黄a在线| 欧美喷水一区二区| 欧美xxxx少妇| 欧美人成网站| 亚洲已满18点击进入久久| 波多野结衣在线| 成人在线超碰| 成人午夜碰碰视频| 亚洲大胆美女视频| 成人午夜亚洲| 国产在线国偷精品免费看| 欧美日韩高清一区二区| 蜜桃视频动漫在线播放| 蘑菇福利视频一区播放| 欧美影院一区二区| 日本一级理论片在线大全| 欧美另类综合| 岛国av午夜精品| 超碰免费公开在线| 最新日韩在线| 欧洲人成人精品| 国产色播av在线| 日韩影院免费视频| 日韩限制级电影在线观看| 三上悠亚亚洲一区| 国产精品一区二区三区四区| 日韩电影中文字幕一区| 一区二区在线视频观看| 久久久久久9999| 国产高潮av| 国产精品手机在线播放| 一区二区久久久| 麻豆影视在线观看_| 亚洲深爱激情| 欧美一区二区三区四区久久| 婷婷久久免费视频| 91小视频免费看| 中文字幕欧美一区二区| 7777久久香蕉成人影院| 在线精品观看国产| 在线观看网站免费入口在线观看国内| 久久精品盗摄| 亚洲欧美国产精品| 区一区二视频| 在线观看一区日韩| 欧美国产日韩电影| 久久久久国产精品麻豆ai换脸| 欧美伦理影视网| 一区二区国产精品| 精品国产百合女同互慰| 亚洲ab电影| 日韩欧美黄色动漫| 日韩一级特黄| 综合网在线视频| 99thz桃花论族在线播放| 不卡一区在线观看| 亚洲人成小说| 99国产精品视频免费观看一公开| 日韩欧美一区二区视频| 亚洲第一福利社区| 在线视频国内自拍亚洲视频| 欧美精品资源| 亚洲免费资源在线播放| 免费亚洲电影| 日本一区二区动态图| heyzo高清中文字幕在线| xfplay精品久久| 91一区二区三区在线| 国产不卡视频一区| 欧洲毛片在线| 久久99这里只有精品| 激情福利在线| 国产福利一区二区三区| 免费在线看黄| 久久综合网色—综合色88| 精品黄色免费中文电影在线播放 | 91影院在线观看| 50度灰在线| 国产欧美日韩三区| 欧美电影免费观看网站| 亚洲黄色性网站| 日韩三级久久| 欧美日韩精品一二三区| 精品视频免费在线观看| 日韩精品一区二区三区第95| 亚洲精品1234| 天堂影院在线| 国产精品一区在线| 伊人福利在线| 亚洲天堂免费看| 2020最新国产精品| 欧美精品一卡二卡| 女同性一区二区三区人了人一| 成人午夜激情| 国产美女久久久久| 羞羞的视频在线观看| 亚洲人成网站色在线观看| 91精品啪在线观看国产爱臀| 日韩一区二区三区视频| 一本色道久久综合亚洲精品不卡| 国产一级网站视频在线| 久久精品一二三| 高清不卡一区| 欧美一级在线免费| 亚洲精品孕妇| 免费在线午夜视频| 亚洲综合一二三区| 欧美日韩黑人| 美女做a视频| 高清不卡一二三区| 美女色狠狠久久| 欧美一区二区美女| 久久精品一区二区三区中文字幕| 麻豆免费在线观看| 亚洲另类一区二区| 成人久久久久| 在线观看av中文| 国产日韩欧美精品一区| 国产精品宾馆| 亚洲欧洲一区二区三区久久| 国产一区二区三区国产| 二区三区不卡| 日韩一区二区三区观看| 免费精品视频最新在线| 欧美成人ⅴideosxxxxx| 在线不卡的av| 麻豆91精品91久久久的内涵| 欧美gv在线观看| 91麻豆精品国产自产在线 | 国产福利一区二区| 国产黄色精品| 日韩高清免费在线| 99视频精品在线| 欧美成人一区在线观看| 日日噜噜夜夜狠狠视频| 国产精品蜜臀在线观看| 欧美一区二区性| 日本在线免费| 欧美日韩中文字幕在线视频| 久久婷婷麻豆| 亚洲三级在线| 中国大陆高清aⅴ毛片| 国产精品久久毛片| 999国产精品| 高清电影在线观看免费| 91精品国产91久久久久久最新毛片| 国产乱色国产精品免费视频| 电影一区二区在线观看|