在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Spring Boot整合分布式消息平臺Pulsar介紹

jf_ro2CN3Fa ? 來源:君哥聊技術 ? 2023-07-12 09:58 ? 次閱讀

作為優秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。

部署 Pulsar

Pulsar 的部署方式主要有 3 種,本地安裝二進制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一個單節點的 Pulsar 集群。實驗環境是 2 核 CPU4G 內存。

部署命令如下:

dockerrun-it-p6650:6650-p8080:8080--mountsource=pulsardata,target=/pulsar/data--mountsource=pulsarconf,target=/pulsar/confapachepulsar/pulsar:2.9.1bin/pulsarstandalone

安裝過程可能會出現下面的錯誤:

unknownflag:--mount
See'dockerrun--help'.

這是因為 docker 版本低,不支持 mount 參數,把 docker 版本升級到 17.06 以上就可以了。

部署過程中可能會因為網絡的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。

2022-01-08T22:27:58,726+0000[main]INFOorg.apache.pulsar.broker.PulsarService-messagingserviceisready,bootstrapserviceport=8080,brokerurl=pulsar://localhost:6650,cluster=standalone

本地單節點集群啟動后,會創建一個 namespace,名字叫 public/default

Pulsar 客戶端

目前 Pulsar 支持多種語言的客戶端,包括:

Java 客戶端

Go 客戶端

Python 客戶端

C++ 客戶端

Node.js 客戶端

WebSocket 客戶端

C# 客戶端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:


org.apache.pulsar
pulsar-client
2.9.1

然后在 properties 文件中添加配置:

# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

創建 Client

創建客戶端非常簡單,代碼如下:

client=PulsarClient.builder()
.serviceUrl(url)
.build();

上面的 url 就是 properties 文件中定義的 pulsar.url 。

創建 Client 時,即使集群沒有啟成功,程序也不會報錯,因為這時還沒有真正地去連接集群。

創建 Producer

producer=client.newProducer()
.topic(topic)
.compressionType(CompressionType.LZ4)
.sendTimeout(0,TimeUnit.SECONDS)
.enableBatching(true)
.batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.maxPendingMessages(1000)
.blockIfQueueFull(true)
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();

創建 Producer,會真正的連接集群,這時如果集群有問題,就會報連接錯誤。

下面解釋一下創建 Producer 的參數:

topic :Producer 要寫入的 topic。

compressionType :壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個策略才會生效。

sendTimeout :超時時間,如果 Producer 在超時時間為收到 ACK,會進行重新發送。

enableBatching :是否開啟消息批量處理,這里默認 true,這個參數只有在異步發送 (sendAsync) 時才能生效,選擇同步發送會失效。

batchingMaxPublishDelay :批量發送消息的時間段,這里定義的是 10ms,需要注意的是,設置了批量時間,就不會受消息數量的影響。批量發送會把要發送的批量消息放在一個網絡包里發送出去,減少網絡 IO 次數,大大提高網卡的發送效率。

batchingMaxMessages :批量發送消息的最大數量。

maxPendingMessages :等待從 broker 接收 ACK 的消息隊列最大長度。如果這個隊列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設置了 blockIfQueueFull 值是 true。

blockIfQueueFull :Producer 發送消息時會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發送。

roundRobinRouterBatchingPartition-SwitchFrequency :如果發送消息時沒有指定 key,那默認采用 round robin 的方式發送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。

創建 Consumer

Pulsar 的消費模型如下圖:

f5c11b2e-2053-11ee-962d-dac502259ad0.png圖片

從圖中可以看到,Consumer 要綁定一個 subscription 才能進行消費。

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(60,TimeUnit.SECONDS)
.receiverQueueSize(1000)
.subscribe();

下面解釋一下創建 Consumer 的參數:

topic :Consumer 要訂閱的 topic。

subscriptionNam e:consumer 要關聯的 subscription 名字。

subscriptionType :訂閱類型,Pulsar 支持四種類型訂閱:

Exclusive :獨占模式,同一個 Topic 只能有一個消費者,如果多個消費者,就會出錯。

Failover :災備模式,同一個 Topic 可以有多個消費者,但是只能有一個消費者消費,其他消費者作為故障轉移備用,如果當前消費者出了故障,就從備用消費者中選擇一個進行消費。如下圖:

f5de2fde-2053-11ee-962d-dac502259ad0.png圖片

Shared :共享模式,同一個 Topic 可以由多個消費者訂閱和消費。消息通過 round robin 輪詢機制分發給不同的消費者,并且每個消息僅會被分發給一個消費者。當消費者斷開,如果發送給它消息沒有被消費,這些消息會被重新分發給其它存活的消費者。如下圖:

f6008dcc-2053-11ee-962d-dac502259ad0.png

Key_Shared :消息和消費者都會綁定一個key,消息只會發送給綁定同一個key的消費者。如果有新消費者建立連接或者有消費者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費者并發地消費消息,又能保證同一Key下的消息順序。如下圖:

f6357d02-2053-11ee-962d-dac502259ad0.png

subscriptionInitialPosition :創建新的 subscription 時從哪里開始消費,有兩個選項:

Latest :從最新的消息開始消費

Earliest :從最早的消息開始消費

negativeAckRedeliveryDelay :消費失敗后間隔多久 broker 重新發送。

receiverQueueSize :在調用 receive 方法之前,最多能累積多少條消息。可以設置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設置為 0,可以防止批量消息多發給一個 Consumer 而導致其他 Consumer 空閑。

Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:

Messagemessage=consumer.receive()
CompletableFuturemessage=consumer.receiveAsync();
Messagesmessage=consumer.batchReceive();
CompletableFuturemessage=consumer.batchReceiveAsync();

對于批量接收,也可以設置批量接收的策略,代碼如下:

consumer=client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024*1024)
.timeout(200,TimeUnit.MILLISECONDS)
.build())
.subscribe();

代碼中的參數說明如下:

maxNumMessages :批量接收的最大消息數量。

maxNumBytes :批量接收消息的大小,這里是 1MB。

測試

首先編寫 Producer 發送消息的代碼,如下:

publicvoidsendMsg(Stringkey,Stringdata){
CompletableFuturefuture=producer.newMessage()
.key(key)
.value(data.getBytes()).sendAsync();
future.handle((v,ex)->{
if(ex==null){
logger.info("發送消息成功,key:{},msg:{}",key,data);
}else{
logger.error("發送消息失敗,key:{},msg:{}",key,data);
}
returnnull;
});
future.join();
logger.info("發送消息完成,key:{},msg:{}",key,data);
}

然后編寫一個 Consumer 消費消息的代碼,如下:

publicvoidstart()throwsException{
while(true){
Messagemessage=consumer.receive();
Stringkey=message.getKey();
Stringdata=newString(message.getData());
Stringtopic=message.getTopicName();
if(StringUtils.isNotEmpty(data)){
try{
logger.info("收到消息,topic:{},key:{},data:{}",topic,key,data);
}catch(Exceptione){
logger.error("接收消息異常,topic:{},key:{},data:{}",topic,key,data,e);
}
}
consumer.acknowledge(message);
}
}

最后編寫一個 Controller 類,調用 Producer 發送消息,代碼如下:

@RequestMapping("/send")
@ResponseBody
publicStringsend(@RequestParamStringkey,@RequestParamStringdata){
logger.info("收到消息發送請求,key:{},value:{}",key,data);
pulsarProducer.sendMsg(key,data);
return"success";
}

調用 Producer 發送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車

可以看到控制臺輸出下面日志:

2022-01-0822:42:33,199[pulsar-client-io-6-1][INFO]boot.pulsar.PulsarProducer-發送消息成功,key:key1,msg:data1
2022-01-0822:42:33,200[http-nio-8083-exec-1][INFO]boot.pulsar.PulsarProducer-發送消息完成,key:key1,msg:data1
2022-01-0822:42:33,232[Thread-22][INFO]boot.pulsar.PulsarConsumer-收到消息,topic//public/default/testTopic,key:key1,data:data1
2022-01-0822:43:14,498[pulsar-timer-5-1][INFO]org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl-[testTopic][topicGroup][7def6]Prefetchedmessages:0---Consumethroughputreceived:0.02msgs/s---0.00Mbit/s---Acksentrate:0.02ack/s---Failedmessages:0---batchmessages:0---Failedacks:0
2022-01-0822:43:14,961[pulsar-timer-9-1][INFO]org.apache.pulsar.client.impl.ProducerStatsRecorderImpl-[testTopic][standalone-9-0]Pendingmessages:0---Publishthroughput:0.02msg/s---0.00Mbit/s---Latency:med:69.000ms-95pct:69.000ms-99pct:69.000ms-99.9pct:69.000ms-max:69.000ms---Ackreceivedrate:0.02ack/s---Failedmessages:0

從日志中看到,這里使用的 namespace 就是創建集群時生成的public/default。

總結

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。






審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • ACK
    ACK
    +關注

    關注

    0

    文章

    28

    瀏覽量

    11155
  • URL
    URL
    +關注

    關注

    0

    文章

    139

    瀏覽量

    15367
  • python
    +關注

    關注

    56

    文章

    4797

    瀏覽量

    84774

原文標題:Spring Boot 整合分布式消息平臺 Pulsar

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    Spring Boot Starter需要些什么

    pulsar-spring-boot-starter是非常有必要的,在此之前,我們先看看一個starter需要些什么。 Spring Boot Starter spring-boot
    的頭像 發表于 09-25 11:35 ?774次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> Starter需要些什么

    HarmonyOS應用開發-分布式設計

    設計理念HarmonyOS 是面向未來全場景智慧生活方式的分布式操作系統。對消費者而言,HarmonyOS 將生活場景中的各類終端進行能力整合,形成“One Super Device”,以實現
    發表于 09-22 17:11

    啟動Spring Boot項目應用的三種方法

    ,從而使開發人員不再需要定義樣板化的配置。用我的話來理解,就是spring boot其實不是什么新的框架,它默認配置了很多框架的使用方式,就像maven整合了所有的jar包,spring
    發表于 01-14 17:33

    如何高效完成HarmonyOS分布式應用測試?

    及云測平臺接入Portal共5項測試服務,詳見圖2。針對分布式應用測試面臨的挑戰,我們接下來將重點介紹分布式UI測試框架和評分工具。(1)分布式
    發表于 12-13 18:07

    Spring Boot嵌入Web容器原理是什么

    Spring Boot嵌入Web容器原理Spring Boot的目標是構建“非常容易創建、獨立、產品級別的基于
    發表于 12-16 07:57

    Spring Boot Web相關的基礎知識

    上一篇文章我們已經學會了如何通過IDEA快速建立一個Spring Boot項目,還介紹Spring Boot項目的結構,
    的頭像 發表于 03-17 15:03 ?664次閱讀

    kafka client在 spring如何實現

    認識了 spring-boot-starter ,今天不妨來看下如何寫一個 pulsar-spring-boot-starter 模塊。 目標 寫一個完整的類似 kafka-spring-boot-st
    的頭像 發表于 09-25 11:21 ?508次閱讀
    kafka client在 <b class='flag-5'>spring</b>如何實現

    Spring Boot Actuator快速入門

    不知道大家在寫 Spring Boot 項目的過程中,使用過 Spring Boot Actuator 嗎?知道 Spring
    的頭像 發表于 10-09 17:11 ?651次閱讀

    Spring Boot的啟動原理

    可能很多初學者會比較困惑,Spring Boot 是如何做到將應用代碼和所有的依賴打包成一個獨立的 Jar 包,因為傳統的 Java 項目打包成 Jar 包之后,需要通過 -classpath 屬性
    的頭像 發表于 10-13 11:44 ?654次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>的啟動原理

    Spring Boot 的設計目標

    什么是Spring Boot Spring BootSpring 開源組織下的一個子項目,也是 S
    的頭像 發表于 10-13 14:56 ?592次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> 的設計目標

    spring分布式框架有哪些

    Spring分布式框架是一套基于Spring框架的解決方案,用于構建分布式系統。它提供了一系列的組件和模塊,可以幫助開發人員輕松地構建可擴展、高可用、高性能的
    的頭像 發表于 11-16 10:58 ?791次閱讀

    springclould分布式教程

    Spring Cloud是一個基于Spring Boot分布式系統開發工具,它提供了一系列的分布式系統解決方案,可以幫助開發者快速構建和部
    的頭像 發表于 11-16 10:59 ?522次閱讀

    springcloud如何實現分布式

    Spring Cloud是基于Spring Boot開發的一套分布式系統解決方案,它主要包括了多個子項目,如服務注冊與發現、配置中心、負載均衡、斷路器、路由等等。通過使用
    的頭像 發表于 11-16 11:01 ?694次閱讀

    springcloud 分布式事務解決方案實例

    么都執行成功,要么都執行失敗。本文將介紹如何使用Spring Cloud來實現分布式事務。 在分布式系統中,使用數據庫事務來保證數據一致性是常見的做法。
    的頭像 發表于 12-03 16:32 ?1152次閱讀

    基于分布式對象存儲WDS的信托非結構化數據整合平臺

    基于分布式對象存儲WDS的信托非結構化數據整合平臺
    的頭像 發表于 08-28 09:56 ?354次閱讀
    基于<b class='flag-5'>分布式</b>對象存儲WDS的信托非結構化數據<b class='flag-5'>整合</b><b class='flag-5'>平臺</b>
    主站蜘蛛池模板: 亚洲一区二区三区电影| 日本资源在线| 国产亚洲第一| 中文在线三级中文字幕| aaa亚洲| 一级片免费在线| 天堂网最新版www| 三级毛片网| 免费日本黄色片| 国产一级做a爰片久久毛片| 福利社91| 夜夜春夜夜夜夜猛噜噜噜噜噜| 永久看片| 欧美777| 欧美性白人极品1819hd| 亚洲码欧美码一区二区三区| 美欧毛片| 性欧美xxxx视频在线观看| 色综合天天综一个色天天综合网| 日本午夜色| 国产主播在线观看| 一级黄色片a| 久久夜夜视频| 欧美在线视频一区二区三区| xxxxxx性| 亚洲3级| 欧美成人看片一区二区三区| 国产精品一级毛片不收费| 狼人激情网| cum4k在线| 美女被草视频| 中文字幕一区二区三区免费看 | freesex性欧美重口| 日日操夜夜操天天操| 日本人xxxxxxxx6969| se色成人亚洲综合| 亚色视频在线| 四虎国产精品高清在线观看| 精品一级毛片| 国产美女特级嫩嫩嫩bbb| 久久婷婷影院|