在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

實現MySQL與elasticsearch數據同步的方法

OSC開源社區(qū) ? 來源:又拍云 ? 2023-03-17 13:49 ? 次閱讀

MySQL 自身簡單、高效、可靠,是又拍云內部使用最廣泛的數據庫。但是當數據量達到一定程度的時候,對整個 MySQL 的操作會變得非常遲緩。而公司內部 robin/logs 表的數據量已經達到 800w,后續(xù)又有全文檢索的需求。這個需求直接在 MySQL上實施是難以做到的。

原數據庫的同步問題

由于傳統的 mysql 數據庫并不擅長海量數據的檢索,當數據量到達一定規(guī)模時(估算單表兩千萬左右),查詢和插入的耗時會明顯增加。同樣,當需要對這些數據進行模糊查詢或是數據分析時,MySQL作為事務型關系數據庫很難提供良好的性能支持。使用適合的數據庫來實現模糊查詢是解決這個問題的關鍵。 但是,切換數據庫會迎來兩個問題,一是已有的服務對現在的 MySQL重度依賴,二是MySQL的事務能力和軟件生態(tài)仍然不可替代,直接遷移數據庫的成本過大。我們綜合考慮了下,決定同時使用多個數據庫的方案,不同的數據庫應用于不同的使用場景。

而在支持模糊查詢功能的數據庫中,elasticsearch 自然是首選的查詢數據庫。這樣后續(xù)對業(yè)務需求的切換也會非常靈活。 那具體該如何實現呢?在又拍云以往的項目中,也有遇到相似的問題。之前采用的方法是在業(yè)務中編寫代碼,然后同步到 elasticsearch 中。具體是這樣實施的:每個系統編寫特定的代碼,修改 MySQL數據庫后,再將更新的數據直接推送到需要同步的數據庫中,或推送到隊列由消費程序來寫入到數據庫中。 但這個方案有一些明顯的缺點:

系統高耦合,侵入式代碼,使得業(yè)務邏輯復雜度增加

方案不通用,每一套同步都需要額外定制,不僅增加業(yè)務處理時間,還會提升軟件復復雜度

工作量和復雜度增加

在業(yè)務中編寫同步方案,雖然在項目早期比較方便,但隨著數據量和系統的發(fā)展壯大,往往最后會成為業(yè)務的大痛點。

解決思路及方案

調整架構

既然以往的方案有明顯的缺點,那我們如何來解決它呢?優(yōu)秀的解決方案往往是 “通過架構來解決問題“,那么能不能通過架構的思想來解決問題呢? 答案是可以的。我們可以將程序偽裝成 “從數據庫”,主庫的增量變化會傳遞到從庫,那這個偽裝成 “從數據庫” 的程序就能實時獲取到數據變化,然后將增量的變化推送到消息隊列 MQ,后續(xù)消費者消耗 MQ 的數據,然后經過處理之后再推送到各自需要的數據庫。 這個架構的核心是通過監(jiān)聽 MySQL 的 binlog 來同步增量數據,通過基于 query 的查詢舊表來同步舊數據,這就是本文要講的一種異構數據庫同步的實踐。

改進數據庫

經過深度的調研,成功得到了一套異構數據庫同步方案,并且成功將公司生產環(huán)境下的 robin/logs 的表同步到了 elasticsearch 上。 首先對 MySQL 開啟 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生產環(huán)境的數據庫不宜修改。這里請教了海楊前輩,他提供了”從庫聯級“的思路,在從庫中監(jiān)聽 binlog 繞過了操作生產環(huán)境重啟主庫的操作,大大降低了系統風險。 后續(xù)操作比較順利,啟動 maxwell 監(jiān)聽從庫變化,然后將增量變化推送到 kafka ,最后配置 logstash 消費 kafka中的數據變化事件信息,將結果推送到 elasticsearch。配置 logstash需要結合表結構,這是整套方案實施的重點。 這套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 與 kafka已經在生產環(huán)境中有部署,所以無需單獨部署維護。而 logstash 與 maxwell 只需要修改配置文件和啟動命令即可快速上線。整套方案的意義不僅在于成本低,而且可以大規(guī)模使用,公司內有 MySQL 同步到其它數據庫的需求時,都可以上任。

成果展示前后對比

- 使用該方案同步和業(yè)務實現同步的對比

052d642c-c47a-11ed-bfe3-dac502259ad0.png

- 寫入到 elasticsearch 性能對比 (8核4G內存)

項目 logstash 業(yè)務同步
寫入速度 1500 條/s 200 條/s

經過對比測試,800w 數據量全量同步,使用 logstash 寫到 elasticsearch,實際需要大概 3 小時,而舊方案的寫入時間需要 2.5 天。

方案實施細節(jié)

接下來,我們來看看具體是如何實現的。 本方案無需編寫額外代碼,非侵入式的,實現 MySQL數據與 elasticsearch 數據庫的同步。

0549731a-c47a-11ed-bfe3-dac502259ad0.png

下列是本次方案需要使用所有的組件:

MySQL

Kafka

Maxwell(監(jiān)聽 binlog)

Logstash(將數據同步給 elasticsearch)

Elasticsearch

1. MySQL配置

本次使用 MySQL 5.5 作示范,其他版本的配置可能稍許不同需要

首先我們需要增加一個數據庫只讀的用戶,如果已有的可以跳過。

-- 創(chuàng)建一個 用戶名為 maxwell 密碼為 xxxxxx 的用戶
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

開啟數據庫的 `binlog`,修改 `mysql` 配置文件,注意 `maxwell` 需要的 `binlog` 格式必須是`row`。

# /etc/mysql/my.cnf


[mysqld]
# maxwell 需要的 binlog 格式必須是 row
binlog_format=row


# 指定 server_id 此配置關系到主從同步需要按情況設置,
# 由于此mysql沒有開啟主從同步,這邊默認設置為 1
server_id=1


# logbin 輸出的文件名, 按需配置
log-bin=master
重啟 MySQL 并查看配置是否生效:
sudo systemctl restart mysqld
select @@log_bin;
-- 正確結果是 1
select @@binlog_format;
-- 正確結果是 ROW
如果要監(jiān)聽的數據庫開啟了主從同步,并且不是主數據庫,需要再從數據庫開啟 binlog 聯級同步。
# /etc/my.cnf


log_slave_updates = 1

需要被同步到 elasticsearch 的表結構。
-- robin.logs
show create table robin.logs;


-- 表結構
CREATE TABLE `logs` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `content` text NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
  `type` varchar(20) DEFAULT '',
  `meta` text,
  `created_at` bigint(15) NOT NULL,
  `idx_host` varchar(255) DEFAULT '',
  `idx_domain_id` int(11) unsigned DEFAULT NULL,
  `idx_record_value` varchar(255) DEFAULT '',
  `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
  `idx_orig_record_value` varchar(255) DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8

2.Maxwell 配置

本次使用 maxwell-1.39.2 作示范, 確保機器中包含 java 環(huán)境, 推薦 openjdk11

下載 maxwell 程序

wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&**  cd maxwell-1.39.2
maxwell 使用了兩個數據庫:

一個是需要被監(jiān)聽binlog的數據庫(只需要讀權限)

另一個是記錄maxwell服務狀態(tài)的數據庫,當前這兩個數據庫可以是同一個

重要參數說明:

host 需要監(jiān)聽binlog的數據庫地址

port 需要監(jiān)聽binlog的數據庫端口

user 需要監(jiān)聽binlog的數據庫用戶名

password 需要監(jiān)聽binlog的密碼

replication_host 記錄maxwell服務的數據庫地址

replication_port 記錄maxwell服務的數據庫端口

replication_user 記錄maxwell服務的數據庫用戶名

filter 用于監(jiān)聽binlog數據時過濾不需要的數據庫數據或指定需要的數據庫

producer 將監(jiān)聽到的增量變化數據提交給的消費者 (如 stdout、kafka)

kafka.bootstrap.servers kafka 服務地址

kafka_version kafka 版本

kafka_topic 推送到kafka的主題

啟動 maxwell

注意,如果 kafka 配置了禁止自動創(chuàng)建主題,需要先自行在 kafka 上創(chuàng)建主題,kafka_version 需要根據情況指定, 此次使用了兩張不同的庫

./bin/maxwell 
        --host=mysql-maxwell.mysql.svc.cluster.fud3 
        --port=3306 
        --user=root 
        --password=password 
        --replication_host=192.168.5.38 
        --replication_port=3306 
        --replication_user=cloner 
        --replication_password=password
        --filter='exclude: *.*, include: robin.logs' 
        --producer=kafka 
        --kafka.bootstrap.servers=192.168.30.10:9092 
        --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1

3. 安裝 Logstash

Logstash 包中已經包含了 openjdk,無需額外安裝。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz
刪除不需要的配置文件。
rm config/logstash.yml
修改logstash配置文件,此處語法參考官方文檔(https://www.elastic.co/guide/en/logstash/current/input-plugins.html)。
# config/logstash-sample.conf


input {
 kafka {
    bootstrap_servers => "192.168.30.10:9092"
    group_id => "main"
    topics => ["maxwell-robinlogs"]
 }
}


filter {
  json {
    source => "message"
  }


  # 將maxwell的事件類型轉化為es的事件類型
  # 如增加 -> index 修改-> update
  translate {
    source => "[type]"
    target => "[action]"
    dictionary => {
      "insert" => "index"
      "bootstrap-insert" => "index"
      "update" => "update"
      "delete" => "delete"
    }
    fallback => "unknown"
  }


  # 過濾無效的數據
  if ([action] == "unknown") {
    drop {}
  }


  # 處理數據格式
  if [data][idx_host] {
    mutate {
      add_field => { "idx_host" => "%{[data][idx_host]}" }
    }
  } else {
    mutate {
      add_field => { "idx_host" => "" }
    }
  }


  if [data][idx_domain_id] {
    mutate {
      add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
    }
  } else {
    mutate {
      add_field => { "idx_domain_id" => "" }
    }
  }


  if [data][idx_record_value] {
    mutate {
      add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_value" => "" }
    }
  }
  
   if [data][idx_record_opt] {
    mutate {
      add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_opt" => "" }
    }
  }
 
  if [data][idx_orig_record_value] {
    mutate {
      add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_orig_record_value" => "" }
    }
  }
 
  if [data][type] {
    mutate {
      replace => { "type" => "%{[data][type]}" }
    }
  } else {
    mutate {
      replace => { "type" => "" }
    }
  }
 
  mutate {
    add_field => {
      "id" => "%{[data][id]}"
      "content" => "%{[data][content]}"
      "user_id" => "%{[data][user_id]}"
      "status" => "%{[data][status]}"
      "meta" => "%{[data][meta]}"
      "created_at" => "%{[data][created_at]}"
    }
    remove_field => ["data"]
  }


  mutate {
    convert => {
      "id" => "integer"
      "user_id" => "integer"
      "idx_domain_id" => "integer"
      "created_at" => "integer"
    }
  }


  # 只提煉需要的字段
  mutate {
    remove_field => [
      "message",
      "original",
      "@version",
      "@timestamp",
      "event",
      "database",
      "table",
      "ts",
      "xid",
      "commit",
      "tags"
    ]
   }
}


output {
  # 結果寫到es
  elasticsearch {
    hosts => ["http://es-zico2.service.upyun:9500"]
    index => "robin_logs"
    action => "%{action}"
    document_id => "%{id}"
    document_type => "robin_logs"
  }


  # 結果打印到標準輸出
  stdout {
    codec => rubydebug
  }
}
執(zhí)行程序:
# 測試配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit


# 啟動*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic

4. 全量同步

完成啟動后,后續(xù)的增量數據 maxwell 會自動推送給 logstash 最終推送到 elasticsearch ,而之前的舊數據可以通過 maxwell 的 bootstrap 來同步,往下面表中插入一條任務,那么 maxwell 會自動將所有符合條件的 where_clause 的數據推送更新。

INSERT INTO maxwell.bootstrap 
        ( database_name, table_name, where_clause, client_id ) 
values 
        ( 'robin', 'logs', 'id > 1', 'maxwell' );
后續(xù)可以在 elasticsearch 檢測數據是否同步完成,可以先查看數量是否一致,然后抽樣對比詳細數據。
# 檢測 elasticsearch  中的數據量
GET robin_logs/robin_logs/_count

?





審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規(guī)問題,請聯系本站處理。 舉報投訴
  • 數據庫
    +關注

    關注

    7

    文章

    3827

    瀏覽量

    64515
  • MySQL
    +關注

    關注

    1

    文章

    817

    瀏覽量

    26629
  • Maxwell
    +關注

    關注

    4

    文章

    36

    瀏覽量

    12676

原文標題:如何高效實現MySQL與elasticsearch的數據同步

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    構建數據庫解決方案,基于華為云 Flexus X 實例容器化 MySQL 主從同步架構

    前言**** 華為云 Flexus X 實例,融合柔性算力與智能調度,為數據庫解決方案帶來全新突破。采用容器化 MySQL 主從同步架構,實現數據
    的頭像 發(fā)表于 01-07 17:22 ?112次閱讀
    構建<b class='flag-5'>數據</b>庫解決方案,基于華為云 Flexus X 實例容器化 <b class='flag-5'>MySQL</b> 主從<b class='flag-5'>同步</b>架構

    構建高效搜索解決方案,Elasticsearch &amp; Kibana 的完美結合

    的流暢運行。部署 Elasticsearch,享受分布式搜索的精準與快速;結合 Kibana,實現數據可視化,決策更直觀。在 828 華為云企業(yè)上云節(jié)不僅降低成本,更提升效率。云端部署,資源按需分配,靈活應對業(yè)務增長。立即體驗,
    的頭像 發(fā)表于 12-27 13:48 ?93次閱讀
    構建高效搜索解決方案,<b class='flag-5'>Elasticsearch</b> &amp; Kibana 的完美結合

    數據數據恢復—Mysql數據庫表記錄丟失的數據恢復流程

    Mysql數據庫故障: Mysql數據庫表記錄丟失。 Mysql數據庫故障表現: 1、
    的頭像 發(fā)表于 12-16 11:05 ?179次閱讀
    <b class='flag-5'>數據</b>庫<b class='flag-5'>數據</b>恢復—<b class='flag-5'>Mysql</b><b class='flag-5'>數據</b>庫表記錄丟失的<b class='flag-5'>數據</b>恢復流程

    數據數據恢復—MYSQL數據庫ibdata1文件損壞的數據恢復案例

    mysql數據庫故障: mysql數據庫文件ibdata1、MYI、MYD損壞。 故障表現:1、數據庫無法進行查詢等操作;2、使用my
    的頭像 發(fā)表于 12-09 11:05 ?180次閱讀

    MySQL還能跟上PostgreSQL的步伐嗎

    Can MySQL Catch Up with PostgreSQL’s Momentum?[2] 譯者:馮若航[3],Vonng,Pigsty[4]?作者,PostgreSQL 大法師,數據庫老司機,云計算泥石流。 MySQL
    的頭像 發(fā)表于 11-18 10:16 ?229次閱讀
    <b class='flag-5'>MySQL</b>還能跟上PostgreSQL的步伐嗎

    香港云服務器怎么部署MySQL數據庫?

    在香港云服務器上部署MySQL數據庫的步驟如下: 步驟 1: 更新軟件包列表 首先,確保軟件包列表是最新的。在終端中執(zhí)行以下命令: sudo apt update 步驟 2: 安裝 MySQL
    的頭像 發(fā)表于 11-14 16:15 ?193次閱讀

    Elasticsearch 再次開源

    Elasticsearch 和 Kibana 又可以被稱為開源了。很難表達這句話讓我有多高興。我激動得簡直要跳起來了。我們 Elastic 的所有人都是如此。開源是我的 DNA。這也是Elastic的DNA。能夠再次將 Elasticsearch 稱為開源,我感到非常高興
    的頭像 發(fā)表于 11-13 12:14 ?153次閱讀
    <b class='flag-5'>Elasticsearch</b> 再次開源

    適用于MySQL的dbForge架構比較

    dbForge Schema Compare for MySQL 是一種工具,用于輕松有效地比較和部署 MySQL 數據庫結構和腳本文件夾差異。該工具提供了 MySQL
    的頭像 發(fā)表于 10-28 09:41 ?227次閱讀
    適用于<b class='flag-5'>MySQL</b>的dbForge架構比較

    MySQL的整體邏輯架構

    支持多種存儲引擎是眾所周知的MySQL特性,也是MySQL架構的關鍵優(yōu)勢之一。如果能夠理解MySQL Server與存儲引擎之間是怎樣通過API交互的,將大大有利于理解MySQL的核心
    的頭像 發(fā)表于 04-30 11:14 ?466次閱讀
    <b class='flag-5'>MySQL</b>的整體邏輯架構

    Redis與MySQL協同升級企業(yè)緩存

    傳統的MySQL數據庫在處理大規(guī)模應用時已經到了瓶頸,RedisEnterprise怎樣助力突破這一瓶頸?RedisEnterprise與MYSQL共同用作企業(yè)級緩存或副本數據庫,會產
    的頭像 發(fā)表于 02-19 13:18 ?388次閱讀
    Redis與<b class='flag-5'>MySQL</b>協同升級企業(yè)緩存

    labview 創(chuàng)建mysql 表時 設置時間 怎么在mysql中是格式是date 而不是datetime?

    選擇 時間日期 但是在mysql中是date而不是datetime類型 ,除了sql語句創(chuàng)建表 ,怎么能實現創(chuàng)建表中數據為datetime類型
    發(fā)表于 02-04 09:46

    如何將MS訪問數據轉換為MySQL

    借助dbForgeStudio for MySQL,您可以輕松地將數據從MicrosoftAccess遷移到MySQL,并保持數據和功能的完整性。這個過程將允許您利用更具可伸縮性和功能
    的頭像 發(fā)表于 01-23 13:47 ?441次閱讀
    如何將MS訪問<b class='flag-5'>數據</b>轉換為<b class='flag-5'>MySQL</b>

    如何實現更高精度的同步測量?如何做好同步數據采集的時間校準?

    如何實現更高精度的同步測量?如何做好同步數據采集的時間校準? 實現更高精度的同步測量是科研和工程
    的頭像 發(fā)表于 01-16 15:10 ?1615次閱讀

    傳送網如何實現頻率同步和時間同步

    、頻率同步 在傳送網中,頻率同步是指網絡中的各個節(jié)點之間的時鐘頻率保持一致,以便實現數據傳輸的精確同步。在頻率
    的頭像 發(fā)表于 01-16 14:42 ?1174次閱讀

    MySQL密碼忘記了怎么辦?MySQL密碼快速重置方法步驟命令示例!

    MySQL密碼忘記了怎么辦?MySQL密碼快速重置方法步驟命令示例! MySQL是一種常用的關系型數據庫管理系統,如果你忘記了
    的頭像 發(fā)表于 01-12 16:06 ?773次閱讀
    主站蜘蛛池模板: 欧美一级片网址| 欧美性受一区二区三区| 国产在播放一区| 久久电影www成人网| 日本www色| 毛片站| 黄视频在线播放| 国产91久久最新观看地址| 成年人黄色大片大全| 永久免费精品影视网站| 天天摸天天做天天爽在线| 女bbbbxxxx毛片视频0| 国产手机在线观看视频| 91在线网| 亚洲 欧美 视频| 亚洲色图图片| 四虎国产精品高清在线观看| 日本黄色免费看| 国模私拍一区二区| 午夜在线看片| 波多久久夜色精品国产| 4hu影院最新地址www| 手机看片福利在线| 深爱五月激情| av网站免费线看| 看全色黄大色大片免费| 在线天堂bt中文www在线| 色多多免费观看在线| 黄色片免费看视频| 五月天在线婷婷| 国产精品久久久久影视不卡| 亚洲精品aaa揭晓| 五月婷婷色视频| 激情开心婷婷| 天天干天天操天天添| 1024人成网色www| 国产午夜精品久久久久免费视| 五月激激| 韩国最新三级网站在线播放| 狼狼色丁香久久女婷婷综合| 夜夜天天|