這篇文章主要介紹“Flink與數(shù)據(jù)庫集成方法是什么”,在日常操作中,相信很多人在Flink與數(shù)據(jù)庫集成方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink與數(shù)據(jù)庫集成方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
成都創(chuàng)新互聯(lián)是一家專業(yè)提供巴南企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、H5場景定制、小程序制作等業(yè)務(wù)。10年已為巴南眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進行中。
JDBC Connector 在 Flink 1.11 版本發(fā)生了比較大的變化,我們先從以下幾個 Feature 來具體了解一下 Flink 社區(qū)在這個版本上對 JDBC 所做的改進。這個 issue 主要為 DataStream API 新增了 JdbcSink,對于使用 DataStream 編程的用戶會更加方便地把數(shù)據(jù)寫入到 JDBC;并且規(guī)范了一些命名規(guī)則,以前命名使用的是 JDBC 加上連接器名稱,目前命名規(guī)范為 Jdbc+ 連接器名稱這個 issue 將 flink-jdbc 包名重命名為 flink-connector-jdbc,與 Flink 的其他 connector 統(tǒng)一,將所有接口和類從 org.apache.flink.java.io.jdbc(舊包)規(guī)范為新包路徑 org.apache.flink.connector.jdbc(新包),通過這種重命名用戶在對底層源代碼的閱讀上面會更加容易的理解和統(tǒng)一。由于早期數(shù)據(jù)類型系統(tǒng)并不是很完善,導(dǎo)致了比較多的 Connector 在使用上會經(jīng)常報數(shù)據(jù)類型相關(guān)的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現(xiàn)下圖問題:
基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重構(gòu),目前數(shù)據(jù)精度方面的支持已經(jīng)很完善了。在 Flink 1.11 版本中,我們對 DDL 的 WITH 參數(shù)相對于 1.10 版本做了簡化,從用戶視角看上就是簡化和規(guī)范了參數(shù),如表格所示: | | | |
| |
| |
| |
| |
| |
connector.read.partition.column
| |
connector.read.partition.num
| |
connector.read.partition.lower-bound
| scan.partition.lower-bound
|
connector.read.partition.upper-bound
| scan.partition.upper-bound
|
connector.read.fetch-size
| |
connector.lookup.cache.max-rows
| |
connector.lookup.cache.ttl
| |
connector.lookup.max-retries
| |
connector.write.flush.max-rows
| sink.buffer-flush.max-rows
|
connector.write.flush.interval
| sink.buffer-flush.interval
|
connector.write.max-retries
| |
大家可以看到表格中有 3 個標(biāo)紅的地方,這個是相對于 1.10 有發(fā)生變化比較多的地方。這次 FLIP 希望進一步簡化連接器屬性,以便使屬性更加簡潔和可讀,并更好地與 FLIP-107 協(xié)作。如果需要了解更多的 Connector 參數(shù)可以進一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助于從舊版本遷移到新版本并了解參數(shù)的變化。Flink 1.10 存在某些 Query 無法推斷出主鍵導(dǎo)致無法進行 Upsert 更新操作(如下圖所示錯誤)。所以在 FLIP-87 中為 Flink SQL 引入的 Primary Key 約束。Flink 的主鍵約束遵循 SQL 標(biāo)準(zhǔn),主鍵約束分為 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對數(shù)據(jù)進行校驗。我們常見數(shù)據(jù)庫的主鍵約束屬于 PRIMARY KEY ENFORCED,會對數(shù)據(jù)進行校驗。因為 Flink 并不持有數(shù)據(jù),因此 Flink 支持的主鍵模式是 PRIMARY KEY NOT ENFORCED, 這意味著 Flink 不會校驗數(shù)據(jù),而是由用戶確保主鍵的完整性。例如 HBase 里面對應(yīng)的主鍵應(yīng)該是 RowKey,在 MySQL 中對應(yīng)的主鍵是在用戶數(shù)據(jù)庫的表中對應(yīng)的主鍵。
目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關(guān)系數(shù)據(jù)庫中的表,如果要在 Flink 中使用,用戶需要手動寫表的 DDL,一旦表的 Schema 發(fā)生改變,用戶需要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用于連接到各種關(guān)系型數(shù)據(jù)庫,使得 Flink 能夠自動檢索表,不用用戶手動輸入和修改。目前 JDBC Catalog 內(nèi)置目前實現(xiàn)了 Postgres Catalog。Postgres catalog 是一個 read-only (只讀)的 Catalog,只支持讀取 Postgres 表,支持的功能比較有限。下面代碼展示了目前 Postgres catalog 支持的 6 個功能:數(shù)據(jù)庫是否存在、數(shù)據(jù)庫列表、獲取數(shù)據(jù)庫、根據(jù)數(shù)據(jù)庫名獲取表列表、獲得表、表是否存在。// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)
如果需要支持其他 DB (如 MySQL),需要用戶根據(jù) FLIP-93 的 JdbcCatalog 接口實現(xiàn)對應(yīng)不同的 JDBC Catalog。Dialect (方言)對各個數(shù)據(jù)庫來說,Dialect 體現(xiàn)各個數(shù)據(jù)庫的特性,比如語法、數(shù)據(jù)類型等。如果需要查看詳細(xì)的差異,可以點擊這里[6]查看詳細(xì)差異。下面通過對比 MySQL 和 Postgres 的一些常見場景舉例: | | | | | | | |
Data Type (數(shù)據(jù)類型)
| | | |
| | | |
在數(shù)據(jù)類型上面,F(xiàn)link SQL 的數(shù)據(jù)類型目前映射規(guī)則如下:Flink 目前支持三種 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于測試,更多的類型映射可以點擊下方鏈接前往官方文檔查看。https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
如何保證 Dialect Upsert 的冪等性?如果定義了主鍵,JDBC 寫入時是能夠保證 Upsert 語義的, 如果 DB 不支持 Upsert 語法,則會退化成 DELETE + INSERT 語義。Upsert query 是原子執(zhí)行的,可以保證冪等性。這個在官方文檔中也詳細(xì)描述了更新失敗或者存在故障時候如何做出的處理,下面的表格是不同的 DB 對應(yīng)不同的 Upsert 語法: | | | INSERT .. ON DUPLICATE KEY UPDATE ..
|
| INSERT .. ON CONFLICT .. DO UPDATE SET ..
|
目前如果要實現(xiàn)自定義 Dialect (比如 SQL Server、Oracle 等), 需要用戶自己實現(xiàn)對應(yīng) Dialect 修改源碼并重新打包 flink-connector-jdbc。社區(qū)正在討論提供一種插件化 dialect 的機制, 讓用戶可以不用修改源碼打包就能實現(xiàn)自定義 Dialect,這個機制需要把 Dialect 接口暴露給用戶。目前的 Dialect 接口不夠清晰,沒有考慮 DataStream API 的使用場景,也沒有考慮到 一些復(fù)雜的 SQL 場景,所以這個接口目前不太穩(wěn)定(后續(xù)版本會修改) 。社區(qū)目前之所以沒有把這個 API 開放給用戶,是從用戶使用的體驗角度考慮,希望把這種頂級 API 設(shè)計得盡量穩(wěn)定、簡潔后再開放出來給用戶使用,避免用戶在后續(xù) Flink 版本的迭代中多次修改代碼。目前社區(qū)已經(jīng)有相應(yīng)的計劃去做了,大家可以留意 FLINK-16833[7]
提出的 JDBCDialect 插件化設(shè)計。大家看完上述 Flink 1.11 在 JDBC 所做的改動后,大家可以嘗試下面這個關(guān)于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步機制。環(huán)境與版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14- Flink standalone 環(huán)境準(zhǔn)備并在提供的地址下載好對應(yīng)的安裝包和 connector jar。
- 測試數(shù)據(jù)準(zhǔn)備,通過拉起容器運行已經(jīng)打包好的鏡像。其中 Kafka 中的 changelog 數(shù)據(jù)是通過 debezium connector 抓取的 MySQL orders表 的 binlog。
- 通過 SQL Client 編寫 SQL 作業(yè),分別創(chuàng)建 Flink 訂單表,維表,用戶表,產(chǎn)品表,并創(chuàng)建 Function UDF。從 PG Catalog 獲取結(jié)果表信息之后,把作業(yè)提交至集群執(zhí)行運行。
- 測試 CDC 數(shù)據(jù)同步和維表 join,通過新增訂單、修改訂單、刪除訂單、維表數(shù)據(jù)更新等一系列操作驗證 CDC 在 Flink 上如何運行以及寫入結(jié)果表。

上圖為業(yè)務(wù)流程整體圖,項目 Demo 地址:https://github.com/leonardBang/flink-sql-etl
1.Flink SQL Client 上面執(zhí)行的 use default,是使用哪個 catlog 呢?答:Flink 內(nèi)部有一個內(nèi)置 Catlog,它把 meta 數(shù)據(jù)存于內(nèi)存中。在 SQL Client 上沒有顯式指定 Hive catlog 或者 jdbc catlog 時會使用內(nèi)置的 Catalog,剛剛的案例給大家演示的是 Postgres Catalog,里面有結(jié)果表。在內(nèi)置 Catlog 可以看到我們剛剛創(chuàng)建 Kafka 的表,MySQL 的維度表。2.Flink MySQL DDL 連接 8 小時后就會自動斷開的問題是否已經(jīng)解決?這個問題會在 1.12 版本解決此問題,目前 master 分支已經(jīng)合并,具體可以參考以下地址
,描述了相關(guān)問題的討論和解決辦法。3.什么是 CDC?能大概講下目前 Flink 支持的 CDC 嗎?通過 Change Data Capture 機制(CDC)來將外部系統(tǒng)的動態(tài)數(shù)據(jù)(如 Mysql BinLog、Kafka Compacted Topic)導(dǎo)入 Flink,以及將 Flink 的 Update/Retract 流寫出到外部系統(tǒng)中是用戶一直希望的功能。Flink 1.11 實現(xiàn)了對 CDC 數(shù)據(jù)讀取和寫出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴開源同步工具) 兩種 CDC 格式。Debezium 在國外用得比較多,Canal 在國內(nèi)用得比較多,兩者格式會有所區(qū)別,詳細(xì)可以參考官方使用文檔。到此,關(guān)于“Flink與數(shù)據(jù)庫集成方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章標(biāo)題:Flink與數(shù)據(jù)庫集成方法是什么
地址分享:http://m.jiaotiyi.com/article/jcedsj.html