在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

kafka client在 spring如何實(shí)現(xiàn)

科技綠洲 ? 來(lái)源:了不起 ? 作者:了不起 ? 2023-09-25 11:21 ? 次閱讀

之前寫過(guò)關(guān)于 Apache Pulsar 的簡(jiǎn)單示例,用來(lái)了解如何使用 Pulsar 這個(gè)新生代的消息隊(duì)列中間件,但是如果想要在項(xiàng)目中使用,還會(huì)欠缺很多,最明顯的就是 集成復(fù)雜,如果你用過(guò)其他消息中間件,比如 Kafka、RabbitMq,只需要簡(jiǎn)單的引入 jar,就可以通過(guò)注解+配置快速集成到項(xiàng)目中。

開(kāi)始一個(gè) Pulsar Starter

既然已經(jīng)了解了 Apache Pulsar,又認(rèn)識(shí)了 spring-boot-starter,今天不妨來(lái)看下如何寫一個(gè) pulsar-spring-boot-starter 模塊。

目標(biāo)

寫一個(gè)完整的類似 kafka-spring-boot-starter(springboot 項(xiàng)目已經(jīng)集成到 spring-boot-starter 中),需要考慮到很多 kafka 的特性, 今天我們主要實(shí)現(xiàn)下面幾個(gè)模板

  • 在項(xiàng)目中夠通過(guò)引入 jar 依賴快速集成
  • 提供統(tǒng)一的配置入口
  • 能夠快速發(fā)送消息
  • 能夠基于注解實(shí)現(xiàn)消息的消費(fèi)

定義結(jié)構(gòu)

└── pulsar-starter
    ├── pulsar-spring-boot-starter
    ├── pulsar-spring-boot-autoconfigure
    ├── spring-pulsar
    ├── spring-pulsar-xx
    ├── spring-pulsar-sample
└── README.md

整個(gè)模塊的結(jié)構(gòu)如上其中pulsar-starter作為一個(gè)根模塊,主要控制子模塊依賴的其他 jar 的版本以及使用到的插件版本。類似于 Spring-Bom,這樣我們?cè)诤罄m(xù)升級(jí) 時(shí),就可以解決各個(gè)第三方 jar 的可能存在版本沖突導(dǎo)致的問(wèn)題。

  • pulsar-spring-boot-starter

該模塊作為外部項(xiàng)目集成的直接引用 jar,可以認(rèn)為是 pulsar-spring-boot-starter 組件的入口,里面不需要寫任何代碼,只需要引入需要的依賴(也就是下面的子模塊)即可

  • pulsar-spring-boot-autoconfigure

該模塊主要定義了 spring.factories 以及 AutoConfigure、Properties。也就是自動(dòng)配置的核心(配置項(xiàng)+Bean 配置)

  • spring-pulsar

該模塊是核心模塊,主要的實(shí)現(xiàn)都在這里

  • spring-pulsar-xx

擴(kuò)展模塊,可以對(duì) spring-pulsar 做更細(xì)化的劃分

  • spring-pulsar-sample

starter 的使用示例項(xiàng)目

實(shí)現(xiàn)

上面我們說(shuō)到實(shí)現(xiàn)目標(biāo),現(xiàn)在看下各個(gè)模塊應(yīng)該包含什么內(nèi)容,以及怎么實(shí)現(xiàn)我們的目標(biāo)

  • 入口 pulsar-spring-boot-starter

上面說(shuō)到 starter 主要是引入整個(gè)模塊基礎(chǔ)的依賴即可,里面不用寫代碼。

< dependencies >
    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >spring-pulsar< /artifactId >
        < version >${project.version}< /version >
    < /dependency >

    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >pulsar-spring-boot-autoconfigure< /artifactId >
        < version >${project.version}< /version >
    < /dependency >
< /dependencies >
  • pulsar-spring-boot-autoconfigure
  1. 添加 spring-boot 基礎(chǔ)的配置
< dependencies >
     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-starter-logging< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-configuration-processor< /artifactId >
         < optional >true< /optional >
     < /dependency >
< /dependencies >
  1. 定義自動(dòng)配置類 PulsarAutoConfiguration
    • 引入 Properties ,基于EnableConfigurationPropertiesspring-boot-configuration-processor解析 Properties 生成對(duì)應(yīng)spring-configuration-metadata.json文件,這樣編寫 application.yml 配置時(shí)就可以自動(dòng)提示配置項(xiàng)的屬性和值了。
    • 構(gòu)建一些必須的 Bean,如 PulsarClient、ConsumerFactory、ConsumerFactory 等
    • Import 配置 PulsarAnnotationDrivenConfiguration,這個(gè)主要是一些額外的配置,用來(lái)支持后面的功能
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
@Import({PulsarAnnotationDrivenConfiguration.class})
public class PulsarAutoConfiguration {

    private final PulsarProperties properties;

    public PulsarAutoConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() {
        ClientBuilder clientBuilder = new ClientBuilderImpl(properties);
        return clientBuilder.build();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory pulsarConsumerFactory() {
        return new DefaultPulsarConsumerFactory(pulsarClient(), properties.getConsumer().buildProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory pulsarProducerFactory() {
        return new DefaultPulsarProducerFactory(pulsarClient(), properties.getProducer().buildProperties());
    }

}
  1. 配置 spring.factory

在目錄src/main/resources/META-INF下創(chuàng)建 spring.factories ,內(nèi)容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.sucl.pulsar.autoconfigure.PulsarAutoConfiguration
  • spring-pulsar
  1. 添加 pulsar-client 相關(guān)的依賴
< dependencies >
     < dependency >
         < groupId >org.apache.pulsar< /groupId >
         < artifactId >pulsar-client< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework.boot< /groupId >
         < artifactId >spring-boot-autoconfigure< /artifactId >
     < /dependency >

     < dependency >
         < groupId >org.springframework< /groupId >
         < artifactId >spring-messaging< /artifactId >
     < /dependency >
< /dependencies >
  1. 定義 EnablePulsar,之前說(shuō)到過(guò),@Enable 注解主要是配合 AutoConfigure 來(lái)做功能加強(qiáng),沒(méi)有了自動(dòng)配置,我們依然可以使用這些模塊的功能。這里做了一件事,向 Spring 容器注冊(cè)了兩個(gè) Bean
  • PulsarListenerAnnotationBeanProcessor 在 Spring Bean 生命周期中解析注解自定義注解 PulsarListener、PulsarHandler,
  • PulsarListenerEndpointRegistry 用來(lái)構(gòu)建 Consumer 執(zhí)行環(huán)境以及對(duì) TOPIC 的監(jiān)聽(tīng)、觸發(fā)消費(fèi)回調(diào)等等,可以說(shuō)是最核心的 Bean
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({PulsarListenerConfigurationSelector.class})
public @interface EnablePulsar {

}
  1. 定義注解,參考 RabbitMq,主要針對(duì)需要關(guān)注的類與方法,分別對(duì)應(yīng)注解@PulsarListener、@PulsarHandler,通過(guò)這兩個(gè)注解配合可以讓我們監(jiān)聽(tīng)到關(guān)注的 TOPIC, 當(dāng)有消息產(chǎn)生時(shí),觸發(fā)對(duì)應(yīng)的方法進(jìn)行消費(fèi)。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarListener {

    /**
     *
     * @return TOPIC 支持SPEL
     */
    String[] topics() default {};

    /**
     *
     * @return TAGS 支持SPEL
     */
    String[] tags() default {};
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarHandler {

}
  1. 注解@PulsarListener 的處理流程比較復(fù)雜,這里用一張圖描述,或者可以通過(guò)下面 github 的源代碼查看具體實(shí)現(xiàn)

圖片

flow

  • spring-pulsar-sample

按照下面的流程,你會(huì)發(fā)現(xiàn)通過(guò)簡(jiǎn)單的幾行代碼就能夠?qū)崿F(xiàn)消息的生產(chǎn)與消費(fèi),并集成到項(xiàng)目中去。

  1. 簡(jiǎn)單寫一個(gè) SpringBoot 項(xiàng)目,并添加 pulsar-spring-boot-starter
< dependencies >
    < dependency >
        < groupId >com.sucl< /groupId >
        < artifactId >pulsar-spring-boot-starter< /artifactId >
        < version >${project.version}< /version >
    < /dependency >

    < dependency >
        < groupId >org.springframework.boot< /groupId >
        < artifactId >spring-boot-starter-web< /artifactId >
    < /dependency >
< /dependencies >
  1. 添加配置
cycads:
  pulsar:
    service-url: pulsar://localhost:6650
  listener-topics: TOPIC_TEST
  1. 編寫對(duì)應(yīng)消費(fèi)代碼
@Slf4j
@Component
@PulsarListener(topics = "#{'${cycads.listener-topics}'.split(',')}")
public class PulsarDemoListener {

    @PulsarHandler
    public void onConsumer(Message message){
        log.info(" >> > 接收到消息:{}", message.getPayload());
    }

}
  1. 向 Pulsar Broker 發(fā)送消息進(jìn)行測(cè)試
@Slf4j
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ContextConfig.class})
@Import({PulsarAutoConfiguration.class})
public class ProducerTests {

    @Autowired
    private ProducerFactory producerFactory;

    @Test
    public void sendMessage() {
        Producer producer = producerFactory.createProducer("TOPIC_TEST");
        MessageId messageId = producer.send("this is a test message");
        log.info(" >> >> >> > 消息發(fā)送完成:{}", messageId);
    }

    @Configuration
    @PropertySource(value = "classpath:application-test.properties")
    static class ContextConfig {
        //
    }
}
  1. 控制臺(tái)可以看到這樣的結(jié)果
2023-02-26 19:57:15.572  INFO 26520 --- [pulsar-01] c.s.p.s.listener.PulsarDemoListener : > >> 接收到消息:GenericMessage [payload=this is a test message, headers={id=f861488c-2afb-b2e7-21a1-f15e9759eec5, timestamp=1677412635571}]

知識(shí)點(diǎn)

  • Pulsar Client

基于 pulsar-client 提供的 ConfigurationData 擴(kuò)展 Properties;了解 Pulsar Client 如何連接 Broker 并進(jìn)行消息消費(fèi),包括同步消費(fèi)、異步消費(fèi)等等

  • spring.factories

實(shí)現(xiàn) starter 自動(dòng)配置的關(guān)鍵,基于 SPI 完成配置的自動(dòng)加載

  • Spring Bean 生命周期

通過(guò) Bean 生命周期相關(guān)擴(kuò)展實(shí)現(xiàn)注解的解析與容器的啟動(dòng),比如 BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton, InitializingBean, DisposableBean 等

  • Spring Messaging

基于回調(diào)與 MethodHandler 實(shí)現(xiàn)消息體的封裝、參數(shù)解析以及方法調(diào)用;

源碼示例

https://github.com/sucls/pulsar-starter.git

結(jié)束語(yǔ)

如果你看過(guò) spring-kafka 的源代碼,那么你會(huì)發(fā)現(xiàn)所有代碼基本都是仿造其實(shí)現(xiàn)。一方面能夠閱讀 kafka client 在 spring 具體如何實(shí)現(xiàn);同時(shí)通過(guò)編寫自己的 spring starter 模塊,學(xué)習(xí) 整個(gè) starter 的實(shí)現(xiàn)過(guò)程。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 模塊
    +關(guān)注

    關(guān)注

    7

    文章

    2706

    瀏覽量

    47468
  • 模板
    +關(guān)注

    關(guān)注

    0

    文章

    108

    瀏覽量

    20562
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4788

    瀏覽量

    68603
  • spring
    +關(guān)注

    關(guān)注

    0

    文章

    340

    瀏覽量

    14343
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    51

    瀏覽量

    5221
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Spring Boot如何實(shí)現(xiàn)異步任務(wù)

    Spring Boot 提供了多種方式來(lái)實(shí)現(xiàn)異步任務(wù),這里介紹三種主要實(shí)現(xiàn)方式。 1、基于注解 @Async @Async 注解是 Spring 提供的一種輕量級(jí)異步方法
    的頭像 發(fā)表于 09-30 10:32 ?1440次閱讀

    Spring Boot Starter需要些什么

    前面我們簡(jiǎn)單介紹了如何使用消息中間件 Apache Pulsar ,但是項(xiàng)目中那樣使用,顯然是不太好的,不管從易用性和擴(kuò)展性來(lái)看,都是遠(yuǎn)遠(yuǎn)不夠, 為了和springboot項(xiàng)目集成,寫一個(gè)
    的頭像 發(fā)表于 09-25 11:35 ?763次閱讀
    <b class='flag-5'>Spring</b> Boot Starter需要些什么

    Spring狀態(tài)機(jī)的實(shí)現(xiàn)原理和使用方法

    說(shuō)起 Spring 狀態(tài)機(jī),大家很容易聯(lián)想到這個(gè)狀態(tài)機(jī)和設(shè)計(jì)模式中狀態(tài)模式的區(qū)別是啥呢?沒(méi)錯(cuò),Spring 狀態(tài)機(jī)就是狀態(tài)模式的一種實(shí)現(xiàn)介紹 S
    的頭像 發(fā)表于 12-26 09:39 ?2005次閱讀
    <b class='flag-5'>Spring</b>狀態(tài)機(jī)的<b class='flag-5'>實(shí)現(xiàn)</b>原理和使用方法

    java spring教程

    java spring教程理解Spring 實(shí)現(xiàn)原理掌握Spring IOC,AOP掌握Spring的基礎(chǔ)配置和用法熟練使用SSH開(kāi)發(fā)項(xiàng)目
    發(fā)表于 09-11 11:09

    什么是java spring

    或多個(gè)模塊聯(lián)合實(shí)現(xiàn)簡(jiǎn)單來(lái)說(shuō),Spring是一個(gè)輕量級(jí)的控制反轉(zhuǎn)(IoC)和面向切面(AOP)的容器框架。■ 輕量——從大小與開(kāi)銷兩方面而言Spring都是輕量的。完整的Spring框架
    發(fā)表于 09-11 11:16

    Spring筆記分享

    Spring實(shí)現(xiàn)了使用簡(jiǎn)單的組件配置組合成一個(gè)復(fù)雜的應(yīng)用。 Spring 中可以使用XML和Java注解組合這些對(duì)象。6) 一站式:I
    發(fā)表于 11-04 07:51

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    Spring認(rèn)證」什么是Spring GraphQL?

    這個(gè)項(xiàng)目建立 Boot 2.x 上,但它應(yīng)該與最新的 Boot2.4.x5 相關(guān)。 要?jiǎng)?chuàng)建項(xiàng)目,請(qǐng)轉(zhuǎn)到start.spring.io并為要使用的GraphQL傳輸選擇啟動(dòng)器: 啟動(dòng)機(jī) 運(yùn)輸 執(zhí)行
    的頭像 發(fā)表于 08-10 14:08 ?825次閱讀
    「<b class='flag-5'>Spring</b>認(rèn)證」什么是<b class='flag-5'>Spring</b> GraphQL?

    Kafka的概念及Kafka的宕機(jī)

    很好奇Kafka的高可用實(shí)現(xiàn)和保障。從 Kafka 部署后,系統(tǒng)內(nèi)部使用的 Kafka 一直運(yùn)行穩(wěn)定,沒(méi)有出現(xiàn)不可用的情況。 但最近系統(tǒng)測(cè)試人員常反饋偶有
    的頭像 發(fā)表于 08-27 11:21 ?2103次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    Spring Boot實(shí)現(xiàn)各種參數(shù)校驗(yàn)

    之前也寫過(guò)一篇關(guān)于Spring Validation使用的文章,不過(guò)自我感覺(jué)還是浮于表面,本次打算徹底搞懂Spring Validation。本文會(huì)詳細(xì)介紹Spring Validation各種場(chǎng)景下的最佳實(shí)踐及其
    的頭像 發(fā)表于 08-14 15:54 ?967次閱讀

    Kafka 的簡(jiǎn)介

    ,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能 高吞吐率。即使非常廉價(jià)的機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)
    的頭像 發(fā)表于 07-03 11:10 ?613次閱讀
    <b class='flag-5'>Kafka</b> 的簡(jiǎn)介

    物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實(shí)現(xiàn)kafka協(xié)議對(duì)接到云平臺(tái)

    Kafka協(xié)議是一種基于TCP層的網(wǎng)絡(luò)協(xié)議,用于分布式消息傳遞系統(tǒng)Apache Kafka中發(fā)送和接收消息。Kafka協(xié)議定義了客戶端和服務(wù)器之間的通信方式和數(shù)據(jù)格式,允許客戶端發(fā)送
    的頭像 發(fā)表于 07-11 10:44 ?510次閱讀

    Spring Kafka的各種用法

    Kafka 是不支持消息重試的。但是 Spring Kafka 2.7+ 封裝了 Retry Topic 這個(gè)功能。 1. @RetryableTopic 使用注解的方式啟用 Retry Topic,
    的頭像 發(fā)表于 09-25 17:04 ?1010次閱讀

    Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

    Kafka 給自己的定位是事件流平臺(tái)(event stream platform)。因此消息隊(duì)列中經(jīng)常使用的 "消息"一詞, Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2377次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)技術(shù):<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計(jì)

    如何將Kafka使用到我們的后端設(shè)計(jì)中

    本文介紹了以下內(nèi)容: 1.什么是Kafka? 2.為什么我們需要使用Kafka這樣的消息系統(tǒng)及使用它的好處 3.如何將Kafka使用到我們的后端設(shè)計(jì)中。 譯自timber.io
    的頭像 發(fā)表于 10-30 14:30 ?524次閱讀
    如何將<b class='flag-5'>Kafka</b>使用到我們的后端設(shè)計(jì)中
    主站蜘蛛池模板: 狠狠色丁香婷婷综合激情| 手机看片1024欧美| 夜夜夜网| 日本xxxxbbbb| 亚洲啪啪网站| 久久精品视频热| 婷婷丁香社区| 综合亚洲色图| 日韩毛片免费看| 国产性片在线观看| 天天色天| 91操碰| 日本三区四区免费高清不卡| 亚洲va久久久噜噜噜久久狠狠| 日本色黄视频| 丁香午夜| 国产在线观看福利| 亚洲一区毛片| 天天狠狠色噜噜| 老子影院午夜精品欧美视频| 亚洲第一久久| 欧美黑人巨大xxx猛交| 一级特黄aaa大片免色| 婷婷社区五月天| www.xxx.国产| 福利精品| 亚洲综合色婷婷| 成人在线免费网站| 91学院派女神| 三级在线观看视频网站| bt种子搜索在线| 在线视频一区二区三区四区| 深爱激情小说网| www一级毛片| 操操操干干| 亚洲成人高清| 亚洲午夜久久久精品影院| 欧美 亚洲 国产 精品有声| 欧美影院一区二区三区| 天天在线天天看成人免费视频 | 色碰人色碰人视频|