日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

AspNetCore&MassTransit Courier實現分布式事務的詳細過程

瀏覽:746日期:2022-06-09 11:41:00
目錄
  • 分布式事務
  • Saga模式
    • 執行過程
    • 恢復策略
    • 協作方式
      • 編排式(Orchestrator)
      • 協同式(Choreography)
  • MassTransit Courier
    • 補償服務
      • 服務建立
        • 服務配置
          • 服務編排
            • 執行請求
              • 執行成功
              • 執行補償
          • 參考文獻

            在之前的一篇博文中,CAP框架可以方便我們實現非實時、異步場景下的最終一致性,而有些用例總是無法避免的需要在實時、同步場景下進行,可以借助Saga事務來解決這一困擾。在一些博文和倉庫中也搜尋到了.Net下實現Saga模式的解決方案MassTransit,這就省得自己再造輪子了。

            分布式事務

            分布式系統中,分布式事務是一個不能避免的問題,如何保證不同節點間的數據一致性。舉個常見的例子,下訂單、減庫存、扣余額,三者在單個節點時,可以借助本地事務,實現要么成功要么失敗。而當三者處于不同節點時,又參雜了如網絡環境、節點自身環境、服務環境等各種因素,使得三個節點想要實現要么成功、要么失敗就增加了許多困難。

            CAP理論和BASE理論很好的詮釋了這一問題,也有了許多的解決分布式事務的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對不同場景、不同要求等可選擇不同的解決方案。

            數據一致性容錯性復雜性性能維護成本2PC強低中低低3PC強低高低中TCC弱高高中高本地消息表弱高低中中MQ事務弱高低高中Saga事務弱高高中高

            在之前提到過一個基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實時請求下的分布式事務,而對于大部分場景雖然可以直接或者妥協方式使用著異步非實時,如同步實時場景的下訂單且減庫存變更到異步非實時場景的下訂單后發事件減庫存,但是總有那么一些場景,不得不去考慮同步實時請求下的分布式事務。

            Saga模式

            Saga模式又叫做長時間運行事務(Long-running-transaction), 由普林斯頓大學的 Hector Garcia-Molina和Kenneth Salem 1987年發表的論文《Sagas》。核心思想是將長事務拆分為多個本地短事務,通過保證所有短事務的成功或失敗來決定整體的成功或失敗,由Saga事務協調器協調管理,所有節點執行成功,則成功,如有節點失敗,則反向執行前置節點的補償操作。

            • 每個Saga事務由一系列冪等的有序子事務(sub-transaction) Ti 組成。
            • 每個Ti 都有對應的冪等補償動作Ci,補償動作用于撤銷Ti造成的結果。

            執行過程

            當正常執行時,依照T1、T2、T3三個短事務正常執行下去,直到最后一個Tn事務執行完畢,宣告整個事務的成功。

            而當執行到某個Tj出現故障時,則反向補償之前的Tj-1..T1,每個對應的補償操作Cj-1...C1,其中Tj事務由于在執行階段就已失敗,所以Tj對應的補償動作Cj不需要執行,即也確定了最后一個Tn事務可以不設置補償動作Cn。

            恢復策略

            • 向前恢復(forward recovery):對于Ti事務的執行,部分場景下可能因為數據庫的連接、網絡的波動等導致短暫的失敗,對Ti事務重試執行,以確保整個事務的執行,如執行T1, T2, T3,當執行T3失敗時,不直接宣告失敗,對T3執行重試以排除部分不穩定因素,如在若干次重試無效后,再考慮向后恢復。

            • 向后恢復(backward recovery):按照執行順序方式作為向前的指向,則向后為反向補償,對已執行過的節點順序倒退執行各Ti的補償動作Ci,也就是把走過的路往回走,對執行過的操作執行業務上的反操作,如正向流程執行減庫存則補償操作時執行加庫存。

            協作方式

            對于服務與服務間的協作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協同式),在Saga模式中也有著這兩種的實現。

            • 編排式(Orchestrator):把 Saga 的決策和執行順序邏輯集中在一個 Saga 編排器類中。Saga 編排器發出命令式消息給各個 Saga 參與方,指示這些參與方服務完成具體操作(本地事務)。
            • 協同式(Choreography):把 Saga 的決策和執行順序邏輯分布在 Saga 的每個參與方中,它們通過交換事件的方式來進行溝通。

            編排式與協同式的差異僅在于服務之間的協作方式,每個參與服務的接口定義卻沒有任何區別。

            編排式(Orchestrator)

            編排式的 Saga 需要開發人員定義一個編排器類,用于編排一個Saga中多個參與服務執行的流程。如果整個業務流程正常結束,業務就成功完成,一旦這個過程的任何環節出現失敗,Saga編排器類就會以相反的順序調用補償操作,重新進行業務回滾。

            對于每個參與的服務而言,需要做的事情是

            • 訂閱并處理命令消息
            • 執行命令后返回響應消息
            • 設計執行邏輯和補償邏輯

            以提交訂單為例,假設場景是分布式系統下,進程間以消息傳遞進行通信:

            1、事務發起方的主業務邏輯請求預先定義好的Saga編排器類(內部編排了執行順序)。

            2、Saga編排器類向MQ發送減庫存事件,庫存服務訂閱事件、執行處理并返回MQ處理結果。

            3、Saga編排器類向MQ發送減余額事件,支付服務訂閱事件、執行處理并返回MQ處理結果。

            4、Saga編排器類向MQ發送創建訂單命令,訂單服務訂閱事件并按照命令創建訂單。

            5、主業務邏輯接收并處理Saga編排器類處理結果。

            6、整個過程由Saga 編排器類對接收到的回復進行判決,來決定是繼續執行還是懸崖勒馬。

            協同式(Choreography)

            沒有集中式的編排類,而是各參與方間相互訂閱,一個服務訂閱另一個服務的事件。

            先由事務發起方執行邏輯并發布一個事件,該事件被一個或多個服務進行訂閱,這些服務執行本地數據庫操作并發布(或不發布)新的事件,該部分需要保證本地數據庫的操作成功且寫入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務。當最后一個服務執行本地事務并且不發布任何事件或者發布的事件沒有被任何Saga參與者訂閱意味著事務結束,則整個業務流程的分布式事務完成。如果某一服務出現故障,那么則反向發布事件,執行補償操作,以此回滾。

            以提交訂單為例,假設場景是分布式系統下,進程間以消息傳遞進行通信:

            1、事務發起方執行主業務邏輯發送提交訂單命令。

            2、庫存服務訂閱事件、扣減庫存并發布已扣減事件。

            3、訂單服務訂閱庫存已扣減事件,創建訂單并發布訂單已創建事件。

            4、支付服務訂閱訂單已創建事件,執行支付并發布訂單已支付事件。

            5、主業務邏輯訂閱訂單已支付事件并處理。

            當某服務內執行時如存在異常,則反向發布事件,如訂單創建失敗,則發布OrderCreatedFailed事件,庫存服務訂閱該事件并執行補償操作。

            相比而言,編排式中參與服務無需向協同式中訂閱上游服務的事件,減少了服務間對事件協議的依賴,而只需要關心集權的編排器類發送的消息。

            MassTransit Courier

            補償服務

            當開啟一個事務前,需要做一些準備,準備一個事務Id,記錄整個事務執行情況,各Tj事務執行情況,當前請求上下文參數,入參參數記錄等,以方便執行補償操作時需要用到。如當Tj事務執行失敗時,需要對Cj-1到C1執行補償操作,此時各補償操作需要一些正向執行T1,Tj-1的請求參數或執行結果,因此都需要記錄下來。

            在Courier中,通過Routing Slip來完成這些記錄,創建一個Guid,記錄請求上下文參數信息,可以綁定幾個內置事件,在各階段到來時會發送事件,如有需要可以訂閱。

            var builder = new RoutingSlipBuilder(NewId.NextGuid());builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);builder.AddVariable("RequestId", context.RequestId);builder.AddVariable("ResponseAddress", context.ResponseAddress);builder.AddVariable("FaultAddress", context.FaultAddress);builder.AddVariable("Request", context.Message);//組合一系列Activityvar routingSlip = builder.Build();await context.Execute(routingSlip).ConfigureAwait(false);

            服務建立

            弄了個Demo,建立了三個服務,此處我使用編排式來完成,但無論是選用編排式還是協同式,都借助RabbitMQ實現消息傳遞。

            每個服務都安裝了MassTransit相關的包

            MassTransit.AspNetCoreMassTransit.RabbitMQ

            將Saga編排器類放置在OrderService中了,對于編排器類的放置,個人認為是應該看用例的主服務是誰而放置,想過放在BFF去協調三個服務,但是總是感覺不是BFF的職責范圍。

            服務配置

            在各服務中對MassTransit配置,如下在OrderService中對MassTransit需要使用到的RabbitMQ配置,對需要進行多個服務協作的用例配置Routing Slip,對消息隊列偵聽訂閱需要的事件并配置相應的Activity處理。

            services.AddMassTransit(x =>{    var currentAssembly = Assembly.GetExecutingAssembly();    x.AddActivities(currentAssembly);    x.AddConsumers(currentAssembly);    x.AddRequestClient<createordercommand>();    x.UsingRabbitMq((context, cfg) =>    {// 配置RabbitMQcfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{    h.Username(Configuration["RabbitmqConfig:Username"]);    h.Password(Configuration["RabbitmqConfig:Password"]);});//配置Routing Slipcfg.ReceiveEndpoint("CreateOrderCommand", ep =>{    ep.ConfigureConsumer<createorderrequestproxy>(context);    ep.ConfigureConsumer<createorderresponseproxy>(context);});// 配置訂閱隊列及Handler處理cfg.ReceiveEndpoint("CreateOrder_execute", ep =>{    ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context);});    });});services.AddMassTransitHostedService();

            服務編排

            構建Routing Slip,此處依據用例的需求,對需要協作的服務編排,組合一系列的Activity。

            Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request){    builder.AddActivity("ReduceStock", new Uri("..."), new {});    builder.AddActivity("DeductBalance", new Uri("..."), new {});    builder.AddActivity("CreateOrder", new Uri("..."), new { });    return Task.CompletedTask;}

            執行請求

            當請求進入后,通過RequestClient發送CreateOrderCommand,同步等待執行結果,再由編排器類負責協調預設好的Activity,發送事件到消息隊列,經各Activity訂閱處理最終返回結果。

            [Route("[controller]")]public class OrderController : ControllerBase{    private readonly IRequestClient<createordercommand> _createOrderClient;    public OrderController(IRequestClient<createordercommand> createOrderClient)    {_createOrderClient = createOrderClient;    }    [HttpGet("CreateOrder")]    public async Task<commoncommandresponse<createorderresult>> CreateOrder()    {var result = await _createOrderClient.GetResponse<commoncommandresponse<createorderresult>>(new CreateOrderCommand(){    // ...});return result.Message;    }}

            各服務中對于Activity設置偵聽隊列以及請求信息,調用Execute執行邏輯,當出現異常時返回到MQ通知編排器類,在對之前執行的Activity執行Compensate。如在CreateOrderActivity中執行異常,由編排器類執行補償,ReduceStockActivity調用Compensate,執行增加庫存邏輯

            public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog>{    public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context)    {var argument = context.Arguments;// 扣減庫存await Task.Delay(100);return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 });    }    public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context)    {// 增加庫存await Task.Delay(100);return context.Compensated();    }}

            執行成功

            用例請求執行后,先由Controller發送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創建訂單,當創建失敗時,執行補償操作,庫存服務增加庫存,支付服務增加余額。

            執行補償

            用例請求執行后,先由Controller發送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創建訂單,當創建失敗時,執行補償操作,庫存服務增加庫存,支付服務增加余額。

            在整個事務失敗后,先會返回異常,再由編排器執行補償操作,實現最終的數據一致性。MassTransit也提供了重試機制以實現向前恢復,避免因數據庫連接超時、網絡波動等問題造成的失敗。

            參考文獻

            到此這篇關于AspNetCore&amp;MassTransit Courier實現分布式事務的文章就介紹到這了,更多相關AspNetCore分布式事務內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!

            標簽: ASP.NET
            日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
            四季av一区二区凹凸精品| 激情久久婷婷| 在线 亚洲欧美在线综合一区| 久久不卡国产精品一区二区| 亚洲日本三级| 日韩av资源网| 久久精品一区二区三区中文字幕| 日韩国产一二三区| 国产麻豆一区| 日本一区免费网站| 国产精品一级| а√天堂8资源在线| 欧美精品高清| 国产亚洲精品久久久久婷婷瑜伽| 在线观看免费一区二区| 视频在线观看91| 国产日韩中文在线中文字幕| 精品国产一区二区三区av片| 91精品蜜臀一区二区三区在线| 国产综合亚洲精品一区二| 久久久久久久久久久妇女| 国精品一区二区三区| 亚洲精选91| 国产成年精品| 久久黄色影院| 亚洲人成亚洲精品| 久久中文字幕一区二区| 亚洲天堂黄色| 国产亚洲电影| 欧美精品一卡| 国产精品极品| 欧美在线影院| 欧美国产另类| 国产色综合网| 日本久久精品| 亚洲日产国产精品| 三级精品视频| 国产精品久久久久久久免费软件| 欧美 日韩 国产一区二区在线视频| 亚洲精品极品| 91成人精品| 欧美男人天堂| 国产欧美88| 丝袜美腿亚洲一区二区图片| 精品成人18| 91亚洲无吗| 蜜桃久久久久久久| 亚洲先锋成人| 国产精品原创| 久久只有精品| 久久不卡国产精品一区二区| 在线精品亚洲| 亚洲一级在线| 亚洲国内精品| 不卡专区在线| 精品久久影院| 欧美极品一区二区三区| 欧美亚洲一级| 日韩精品国产精品| 亚洲最新av| 色婷婷成人网| 日韩国产在线一| 日韩精品免费视频一区二区三区| 日韩亚洲精品在线| 国产亚洲福利| 三级欧美韩日大片在线看| 亚洲综合另类| 爽好久久久欧美精品| 亚洲深深色噜噜狠狠爱网站| 999国产精品永久免费视频app| 日韩高清成人| 久久精品99久久无色码中文字幕| 天堂а√在线最新版中文在线| 国产精品久久久久久久久妇女| 手机在线电影一区| 久久亚洲在线| 免费在线欧美视频| 日本不卡视频一二三区| 国产精品美女在线观看直播| 成人污污视频| 亚洲国产成人精品女人| 中文字幕av一区二区三区人| 91精品福利观看| 国产一区二区三区亚洲综合| 国产伊人久久| 不卡在线一区| 日韩不卡一区二区三区| 久久免费视频66| 亚洲v在线看| 综合激情在线| 久久久精品国产**网站| 欧美69视频| 国产欧美另类| 亚洲午夜电影| 欧美国产视频| 91精品99| 国产乱人伦精品一区| 亚洲最新无码中文字幕久久| 老鸭窝亚洲一区二区三区| 亚洲综合小说| 国产成人精品福利| 亚洲毛片视频| 久久久精品日韩| 日本在线视频一区二区| 欧美aa在线观看| 日本伊人久久| 伊人久久亚洲影院| 国产欧洲在线| 国产日韩一区二区三区在线 | 日韩午夜视频在线| 91成人超碰| av日韩中文| 国产极品久久久久久久久波多结野| 伊人精品一区| 美女国产一区二区三区| 日韩国产91| 亚洲丝袜啪啪| 好看不卡的中文字幕| 久久国产中文字幕| 成人自拍av| 日韩在线不卡| 亚洲女同av| 韩国三级一区| av一区在线| 亚洲国产综合在线看不卡| 久久久久久免费视频| 亚洲调教视频在线观看| 国产真实久久| 日本在线啊啊| 久久高清免费| 九色精品91| 午夜一级在线看亚洲| 黄色成人在线网址| 欧美精品一线| 蜜臀av在线播放一区二区三区| 老司机精品久久| 日本v片在线高清不卡在线观看| 亚洲三级国产| 国产精品成人国产| 日韩欧美精品一区| 夜夜嗨一区二区| 综合视频一区| 久久不卡国产精品一区二区| 欧美欧美黄在线二区| 荡女精品导航| 91久久视频| 国产精品三级| 在线视频观看日韩| 午夜在线视频观看日韩17c| 午夜视频一区二区在线观看| 国产精品一页| 秋霞影院一区二区三区| 亚洲欧美日本日韩| 日韩极品在线观看| 91亚洲国产高清| 免费日韩av| 麻豆精品av| 亚洲成人免费| 日韩av中文字幕一区二区| 成人污污视频| 日日夜夜免费精品视频| 国产高清不卡| 91精品国产自产观看在线 | 五月天激情综合网| 国产午夜精品一区在线观看| 成人在线视频中文字幕| 四虎国产精品免费久久| 在线日韩av| 成人高清一区| 日韩一区欧美二区| 国产成人精品一区二区三区免费 | 国产色99精品9i| 婷婷综合网站| 91嫩草亚洲精品| 国产欧美日韩一区二区三区四区| 在线一区免费| 日韩欧美看国产| 麻豆国产一区| 日韩av在线免费观看不卡| 欧美日韩三区| 给我免费播放日韩视频| 日本亚州欧洲精品不卡| 亚洲精品小说| 久久久久国产精品一区二区| 欧美国产日本| 欧美亚洲一区二区三区| 日韩成人精品一区二区三区| 中文字幕中文字幕精品| 免费中文字幕日韩欧美| 美女网站一区| 日本精品影院| 久久久久久久久丰满| 成人国产精品| 香蕉视频亚洲一级| 一区二区精品伦理...| 国产精品蜜月aⅴ在线| 国产精品中文字幕制服诱惑| 国产三级一区| 九九久久国产| 日韩一区欧美|