编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

使用Apache Kafka、Kafka Connect,Debezium和ksqlDB的实时流ETL

wxchong 2024-10-26 16:05:57 开源技术 43 ℃ 0 评论

每日分享最新,最流行的软件开发知识与最新行业趋势,希望大家能够一键三连,多多支持,跪求关注,点赞,留言。

阐明为什么我们需要将数据从一个点传输到另一个点,查看传统方法,并描述如何构建实时流式 ETL 流程。

众所周知,ETL 代表提取-转换-加载,是将数据从一个源系统移动到另一个源系统的过程。首先,我们将阐明为什么需要将数据从一个点传输到另一个点;其次,我们将研究传统方法;最后,我们将描述如何使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 构建实时流 ETL 流程。

当我们构建业务应用程序时,我们会根据应用程序的功能需求来设计数据模型。我们不考虑任何类型的运营或分析报告要求。用于报告需求的数据模型将被非规范化,而用于应用程序操作的数据模型大部分将被规范化。因此,为了报告或任何类型的分析目的,我们需要将我们的数据模型转换为非规范化形式。

为了重塑我们的数据,我们需要将其移动到另一个数据库。有人可能会争辩说,我们可以使用数据库视图或物化视图在同一个数据库中重塑我们的数据,但是报告数据库的配置可能与操作数据库不同,大部分操作数据库配置为 OLTP(事务性),而报告数据库配置为数据库配置为 OLAP(分析)。此外,在可操作的数据库上执行报告流程会减慢业务交易,并导致业务流程变慢,因此您的业务人员会对此不满意。TLDR;如果您需要准备一份报告或想要对您的操作数据库进行分析研究,您应该将您的数据移动到另一个数据库。

在行业中,人们大多从源系统中批量提取数据,在合理的时间段内,主要是每天一次,但也可以是每小时一次,也可以是两三天一次。保持较短的周期可能会导致源系统的资源使用率更高,目标系统的中断频繁;但是,保持较长时间可能会导致目标系统出现最新问题。因此,我们需要一些对源系统性能造成最小影响的东西,并且可以在更短的时间内或实时更新目标系统。

现在让我们看看建议的架构。您可以在我的 GitHub 存储库中找到此演示项目的完整源代码:https ://github.com/dursunkoc/ksqlwithconnect 。我们将使用 Debezium 源连接器从源系统中提取数据更改。

Debezium 不使用 SQL 提取数据。它使用数据库日志文件来跟踪数据库中的更改,因此它对源系统的影响最小。有关 Debezium 的更多信息,请访问他们的网站。

提取数据后,我们需要 Kafka Connect 将其流式传输到 Apache Kafka 中,以便根据需要使用它并对其进行重塑。我们将使用 ksqlDB 来以目标系统所需的方式重塑原始数据。让我们考虑一个简单的订购系统数据库,其中有一个客户表、一个产品表和一个订单表,如下所示。

现在,让我们考虑一下我们需要提交一份关于订单的报告,其中我们看到了购买者的电子邮件,并且在同一行中显示了产品的名称。所以我们需要一个表格,如下图:

客户列将包含位于客户表的电子邮件字段中的客户的电子邮件,而产品列将包含位于产品表的名称字段中的产品名称。

首先,我们需要创建一个源连接器来从源数据库中提取数据。在我们的示例案例中,源数据库是 MySQL 数据库,因此我们将使用 Debezium MySQL Source Connector,如下所示:

CREATE SOURCE CONNECTOR `mysql-connector` WITH( "connector.class"= 'io.debezium.connector.mysql.MySqlConnector', "tasks.max"= '1', "database.hostname"= 'mysql', "database.port"= '3306', "database.user"= 'root', "database.password"= 'debezium', "database.server.id"= '184054', "database.server.name"= 'dbserver1', "database.whitelist"= 'inventory', "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders', "database.history.kafka.bootstrap.servers"= 'kafka:9092', "database.history.kafka.topic"= 'schema-changes.inventory', "transforms"= 'unwrap', "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState', "key.converter"= 'org.apache.kafka.connect.json.JsonConverter', "key.converter.schemas.enable"= 'false', "value.converter"= 'org.apache.kafka.connect.json.JsonConverter', "value.converter.schemas.enable"= 'false');

现在我们将拥有来自源系统的表、客户、产品和订单的 Kafka 主题。

ksql> show topics; Kafka Topic | Partitions | Partition Replicas----------------------------------------------------------------- dbserver1 | 1 | 1 dbserver1.inventory.customers | 1 | 1 dbserver1.inventory.orders | 1 | 1 dbserver1.inventory.products | 1 | 1 default_ksql_processing_log | 1 | 1 my_connect_configs | 1 | 1 my_connect_offsets | 25 | 1 my_connect_statuses | 5 | 1 schema-changes.inventory | 1 | 1-----------------------------------------------------------------

现在,使用以下脚本,我们将为订单创建一个 ksqlDB 流,该流在订单数据旁边连接客户和产品数据。

CREATE STREAM S_CUSTOMER (ID INT, FIRST_NAME string, LAST_NAME string, EMAIL string) WITH (KAFKA_TOPIC='dbserver1.inventory.customers', VALUE_FORMAT='json'); CREATE TABLE T_CUSTOMERAS SELECT id, latest_by_offset(first_name) as fist_name, latest_by_offset(last_name) as last_name, latest_by_offset(email) as email FROM s_customer GROUP BY id EMIT CHANGES; CREATE STREAM S_PRODUCT (ID INT, NAME string, description string, weight DOUBLE) WITH (KAFKA_TOPIC='dbserver1.inventory.products', VALUE_FORMAT='json'); CREATE TABLE T_PRODUCTAS SELECT id, latest_by_offset(name) as name, latest_by_offset(description) as description, latest_by_offset(weight) as weight FROM s_product GROUP BY id EMIT CHANGES; CREATE STREAM s_order ( order_number integer, order_date timestamp, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json'); CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id from s_order as o left join t_product as p on o.product_id = p.idleft join t_customer as c on o.purchaser = c.idpartition by o.order_numberemit changes;

最后,在 JDBC 接收器连接器的帮助下,我们会将丰富的订单表推送到 PostgreSQL 数据库中。

CREATE SINK CONNECTOR `postgres-sink` WITH( "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector', "tasks.max"= '1', "dialect.name"= 'PostgreSqlDatabaseDialect', "table.name.format"= 'ENRICHED_ORDER', "topics"= 'SA_ENRICHED_ORDER', "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw', "auto.create"= 'true', "insert.mode"= 'upsert', "pk.fields"= 'ORDER_NUMBER', "pk.mode"= 'record_key', "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter', "key.converter.schemas.enable" = 'false', "value.converter"= 'io.confluent.connect.avro.AvroConverter', "value.converter.schemas.enable" = 'true', "value.converter.schema.registry.url"= 'http://schema-registry:8081');

您可以在我的 GitHub 存储库中找到此演示项目的完整源代码:https ://github.com/dursunkoc/ksqlwithconnect 。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表