本文介紹了以下內(nèi)容:
1.什么是Kafka?
2.為什么我們需要使用Kafka這樣的消息系統(tǒng)及使用它的好處
3.如何將Kafka使用到我們的后端設(shè)計(jì)中。
譯自timber.io:《hello-world-in-kafka-using-python》,有部分刪改。
1.Kafka是什么、為什么我們需要它?
簡(jiǎn)而言之,Kafka是一個(gè)分布式消息系統(tǒng)。這是什么意思呢?
想象一下,你現(xiàn)在有一個(gè)簡(jiǎn)單的Web應(yīng)用,其包含了網(wǎng)頁前端客戶端(Client)、服務(wù)端和數(shù)據(jù)庫(kù):
你需要記錄所有發(fā)生在你的Web應(yīng)用的事件,比如點(diǎn)擊、請(qǐng)求、搜索等,以便后續(xù)進(jìn)行計(jì)算和運(yùn)營(yíng)分析。
假設(shè)每個(gè)事件都由單獨(dú)的APP完成,那么一個(gè)簡(jiǎn)單的解決方案就是將數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)庫(kù)中,所有APP連接到數(shù)據(jù)庫(kù)進(jìn)行存儲(chǔ):
這看起來簡(jiǎn)單,但是其中還會(huì)出現(xiàn)許多問題:
1.點(diǎn)擊、請(qǐng)求、搜索等事件會(huì)產(chǎn)生大量的數(shù)據(jù)到數(shù)據(jù)庫(kù)中,這可能會(huì)導(dǎo)致插入事件存在延遲。
2.如果選擇將高頻數(shù)據(jù)存儲(chǔ)在SQL或MongoDB等數(shù)據(jù)庫(kù)中,很難再原有歷史數(shù)據(jù)的基礎(chǔ)上擴(kuò)展數(shù)據(jù)庫(kù)。
3.如果你需要用這些數(shù)據(jù)進(jìn)行數(shù)據(jù)分析,你可能無法直接對(duì)數(shù)據(jù)庫(kù)進(jìn)行高頻率的讀取操作。
4.每個(gè)APP可以遵循自己的數(shù)據(jù)格式,這就意味著當(dāng)你需要在不同的APP進(jìn)行數(shù)據(jù)交換時(shí),你需要進(jìn)行數(shù)據(jù)格式的轉(zhuǎn)換。
通過使用像Kafka這樣的消息流系統(tǒng),可以很好地解決這些問題,因?yàn)樗麄兛梢詧?zhí)行以下操作:
1.存儲(chǔ)的大量數(shù)據(jù)可以被持久化、校驗(yàn)和復(fù)制,具備容錯(cuò)能力。
2.支持跨系統(tǒng)實(shí)時(shí)處理連續(xù)的數(shù)據(jù)流。
3.允許APP獨(dú)立發(fā)布數(shù)據(jù)或數(shù)據(jù)流,并與使用它的APP無關(guān)。
那么它和傳統(tǒng)數(shù)據(jù)庫(kù)有何不同?
盡管Kafka可以持久化地存儲(chǔ)數(shù)據(jù),但它不是數(shù)據(jù)庫(kù)。
Kafka不僅允許APP存儲(chǔ)或提取連續(xù)的數(shù)據(jù)流,還支持實(shí)時(shí)處理。這與對(duì)被動(dòng)數(shù)據(jù)執(zhí)行CRUD操作或?qū)鹘y(tǒng)數(shù)據(jù)庫(kù)執(zhí)行查詢的方式不同。
聽起來不錯(cuò),那么Kafka是如何解決以上挑戰(zhàn)的?
Kafka是一個(gè)分布式平臺(tái),是為規(guī)模而構(gòu)建的,這意味著它可以處理高頻率的讀寫和存儲(chǔ)大量數(shù)據(jù)。它確保數(shù)據(jù)始終可靠。它還支持從故障中恢復(fù)的強(qiáng)大機(jī)制。
以下是為什么應(yīng)該使用Kafka的一些關(guān)鍵因素:
1.1 簡(jiǎn)化后端架構(gòu)
在Kafka的幫助下,我們前面的結(jié)構(gòu)會(huì)變得簡(jiǎn)單一些:
1.2 通用數(shù)據(jù)管道
如上所示,Kafka充當(dāng)多個(gè)APP和服務(wù)的通用數(shù)據(jù)管道,這給了我們兩個(gè)好處:
1.數(shù)據(jù)是集成的,我們將來自不同系統(tǒng)的數(shù)據(jù)都存在一個(gè)地方,這使得Kafka成為真正的數(shù)據(jù)源。任何APP都可以將數(shù)據(jù)推送到該平臺(tái),然后由另一個(gè)APP提取數(shù)據(jù)。
2.Kafka使得應(yīng)用程序之間交換數(shù)據(jù)變得容易。因?yàn)槲覀兛梢詷?biāo)準(zhǔn)化數(shù)據(jù)格式,減少了數(shù)據(jù)格式的轉(zhuǎn)換。
1.3 通用連接性
盡管Kafka允許你使用標(biāo)準(zhǔn)數(shù)據(jù)格式,但并不意味著你的APP就不需要數(shù)據(jù)轉(zhuǎn)換了,它只是減少了我們轉(zhuǎn)換數(shù)據(jù)的頻率罷了。
此外,Kafka提供了一個(gè)叫 Kafka Connect 的框架允許我們維護(hù)遺留的老系統(tǒng)。
1.4 實(shí)時(shí)數(shù)據(jù)處理
類似于監(jiān)控系統(tǒng)這樣的實(shí)時(shí)APP,往往需要連續(xù)的數(shù)據(jù)流,這些數(shù)據(jù)需要被立即處理或盡量減少延遲處理。
Kafka的流式處理,使得處理引擎可以在很短的時(shí)間內(nèi)(幾毫米到幾分鐘)內(nèi)取數(shù)、分析、以及響應(yīng)。
2.Kafka入門
2.1 安裝
安裝Kafka是一個(gè)相當(dāng)簡(jiǎn)單的過程。只需遵循以下給定步驟:
2.使用以下命令解壓縮下載文件: tar -xzf kafka_2.11-1.1.0.tgz
3.cd到Kafka目錄開始使用它: cd kafka_2.11-1.1.0
2.2 啟動(dòng)服務(wù)器
ZooKeeper是一個(gè)針對(duì)Kafka等分布式環(huán)境的集中管理工具,它為大型分布式系統(tǒng)提供配置服務(wù)、同步服務(wù)及命名注冊(cè)表。
因此,我們需要先啟動(dòng)ZooKeeper服務(wù)器,然后再啟動(dòng)Kafka服務(wù)器。使用以下命令即可:
# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Server
bin/kafka-server-start.sh config/server.properties
2.3 Kafka 基本概念
我們快速介紹一下Kafka體系結(jié)構(gòu)的核心概念:
1.Kafka在一個(gè)或多個(gè)服務(wù)器上作為集群運(yùn)行。
2.Kafka將數(shù)據(jù)流存儲(chǔ)在名為topics的類別中。每條數(shù)據(jù)均由鍵、值、時(shí)間戳組成。
3.Kafka使用發(fā)布-訂閱模式。它允許某些APP充當(dāng)producers(生產(chǎn)者),記錄數(shù)據(jù)并將數(shù)據(jù)發(fā)布到Kafka topic中。
同樣,它允許某些APP充當(dāng)consumer(消費(fèi)者)和訂閱Kafka topic并處理由它產(chǎn)生的數(shù)據(jù)。
4.除了Prodcuer API 和 Consumer API,Kafka還為應(yīng)用提供了一個(gè) Streams API 作為流處理器。通過 Connector API 我們可以將Kafka連接到其他現(xiàn)有的應(yīng)用程序和數(shù)據(jù)系統(tǒng)。
2.4 架構(gòu)
如你所見,每個(gè)Kafka的 Topic 可以分為多個(gè)Partition(分區(qū)),可以使用broker(經(jīng)紀(jì)人)在不同的計(jì)算機(jī)上復(fù)制這些 Topic,從而使消費(fèi)者可以并行讀取 Topic.
kafka的復(fù)制是針對(duì)分區(qū)的:
比如上圖中有4個(gè)broker, 1個(gè)topic, 2個(gè)分區(qū),復(fù)制因子是3。當(dāng)producer發(fā)送一個(gè)消息的時(shí)候,它會(huì)選擇一個(gè)分區(qū),比如topic1-part1
分區(qū),將消息發(fā)送給這個(gè)分區(qū)的leader, broker2、broker3會(huì)拉取這個(gè)消息,一旦消息被拉取過來,slave會(huì)發(fā)送ack給master,這時(shí)候master才commit這個(gè)log。
因此,整個(gè)系統(tǒng)的容錯(cuò)級(jí)別極高。當(dāng)系統(tǒng)正常運(yùn)行時(shí),對(duì)Topic的所有讀取和寫入都將通過leader,且leader會(huì)保證所有其他broker均被更新。
如果Broker失效了,系統(tǒng)會(huì)自動(dòng)重新配置,此時(shí)副本也可以接管成為L(zhǎng)eader.
2.5 創(chuàng)建Kafka Topic
讓我們創(chuàng)建一個(gè)名為 sample,含有一個(gè)partition(分區(qū))和一個(gè)replica(副本)的Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
列出所有的Kafka Topics,檢查是否成功創(chuàng)建了sample Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
describe topics 命令還可以獲得特定Topic的詳細(xì)信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sample
2.6 創(chuàng)建生產(chǎn)者與消費(fèi)者
這里是本章的代碼實(shí)戰(zhàn)部分,利用Kafka-Python實(shí)現(xiàn)簡(jiǎn)單的生產(chǎn)者和消費(fèi)者。
1.首先需要安裝kafka-python:
pip install kafka-python
2.創(chuàng)建消費(fèi)者(consumer.py)
from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
print (message)
3.創(chuàng)建生產(chǎn)者(producer.py)
有一個(gè)消費(fèi)者正在訂閱我們的消息流,因此我們要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者,發(fā)布消息到Kafka:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
現(xiàn)在,你重新運(yùn)行消費(fèi)者(consumer.py),你就會(huì)接收到生產(chǎn)者發(fā)送過來的消息。
-
存儲(chǔ)
+關(guān)注
關(guān)注
13文章
4337瀏覽量
85992 -
數(shù)據(jù)庫(kù)
+關(guān)注
關(guān)注
7文章
3839瀏覽量
64542 -
服務(wù)端
+關(guān)注
關(guān)注
0文章
66瀏覽量
7025 -
Web應(yīng)用
+關(guān)注
關(guān)注
0文章
16瀏覽量
3510 -
kafka
+關(guān)注
關(guān)注
0文章
51瀏覽量
5227
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論