课时1:5分钟上手 Flink MySQL 连接器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 课时1:5分钟上手 Flink MySQL 连接器

Flink-Learning训练营:课时1:5分钟上手 Flink MySQL 连接器

课程地址:/trainingcamp/0bcc1ab57cf841a2af632d6252fecbab

实时数据接入:5分钟上手Fling MySQL连接器

 

内容简介

一、实验准备

二、实验内容

三、实验收获

 

一、实验准备

1)开通阿里云十计算的fling版的免费试用

图片1.png

在这个实验开始之前呢,需要去开通阿里云十计算的fling版的免费试用,然后因为我们需要去连接的是一个云数Mexico实例,所以我们也需要去开通阿里云数据库的rds免费试用。

本实验会flink是计算平台为基础使用flink自带的Mexico connect连接器,去连接rds云数据库的实例,并且一个实时商品销售数据统计的一个例子,尝试去上手connect的数据补货数据写入,,我们可以通过阿里云试用中心:
http://free.aliyun.com/?product=956456o&crowd=enterpise ,去申请实际计算flink跟云数据库IDS的适用,大家可以直接扫码或者点击链接就可以去使用了
图片2.png

实验之前我们需要先在Rds云数据库上面去进行一些简单配置,去建立一些数据库跟数据表,在完成我以上准备骤,之后我们就可以正式的开始我们的实验了

 

二、实验内容

(1).捕获原表数据变化

首先,我们之前已经建立了一张MySQL数据表,如下图

图片3.png

其中包含了实时更新的数据。

图片4.png 

为了将其转化为flink可以处理的流失数据,我们首先进入flink实时计算平台,创建一个flink作业,然后我们可以使用cdc连接器,连接到原数据表,它可以通过分析日志的方式捕获变更数据,并发送到下游系统。之后我们就可以使用table API编写数据处理的逻辑例如我们可以进入到实时计算平台,点击左侧的作业开发页面,点击新建按钮,我们可以新建一个job。之后我们可以创建一张source table的表,这个表跟我们之前创建的MySQL数据表的schema 致,之后我们在这个with里面提供一些连接所需要的参数,例如配置使用MySQL cdc连接器,host name是之前我们建立IDS实例的时候提供的ual, usaname 和pasorde是之前设定的实例登陆用户名和密码,为了验证连接是否成功,我们可以直接在这下面编写类似于cgo的语句。Select from sort table,获取source table中实时更新的所有数据。例如在这里我们看到了实时更新的数据,都来自于我们之前创建的源表,这说明我们的连接器生效了,它能够顺利的捕获变更的数据可以直接在这里添加新的数据库里逻辑,例如这段SQL语句实际上就是将原始输入的数据,以每15秒为一个时间窗口进行,并且按照商品ID以及商品在15秒内的销售量进行统计并展示结果可以看到它正确的执行了计算,并且会随着数据的到来实时进行更新。

(2)接入维度表,打宽数据

我们可以看到在原表中我们使用了good ID
图片5.png来表明每件商品的销售记录,但包括商品名称,商品价格之类的信息则保存在另一张单独的表中。我们同样可以使用类似的语句,建立连接维度表的临时表,并且使用left join语法将它之前计算的数据流结果进行连接。我们可以执行一下,看看结果,我们可以看到这里正常的进行了连接,并且计算出了每件商品的利润,

(3)数据写回会表

我们可以使用跟之前类似的方式建立连接汇表的临时表,并且使用insert into cycle语句数据流写回MySQL数据库。在这里,我们使用think table这张临时表来连接我们的数据库里的汇表,并且加入了一个scan auto commit为TRUE这个参数,表明我们希望这个数据库在发生更动时自动commit到数据库。然后我们只需要在上面加入一行insert into Table语句
图片6.png

表示我们希望把下面这行语句的执行结果插入到会表之中,我们来验证一下执行结果是否正确。可以看到,执行结果正确。因为在阿里云实际计算平台中使用测试,调试执行时数据不会写入下一个会表,所以为了验证这一功能是否正常,我们还需要点击上线功能,将作业部署到flink集群上面进行测试。点击确认然后我们可以进入运维页面,可以看到这个任务,现在是已停止的状态,

图片7.png

我们点击启动使用默认配置启动这个作业。在作业成功启动之后,我们可以进入数据库查看汇表中是否有新增的数据。可以看到这里有新增的数据代表我们的数据顺利写入了会表之中。同时,我们可以在作业总览中看到数据的数据流图,

图片8.png

例如1处是我们的原表获取数据,2处是我们进行窗口聚合的操作,3处是我们获取维度表中的数据,4是我们将原表处理后的数据跟维度表中的数据进行了一个聚合,最后我们将得到的结果写入会表5之中。

 

在我们不需要运行作业之后,我们可以点击停止按钮,停止这个作业

 

三、实验收获

学完该章节可以帮助您去掌握这些技术,包括去使用flink 10计算平台创建并且提交作业编写基于flink table APISQL语句并且使用MySQL Connector对数据库进行读写这方面的能力

这就是五分钟上手fling bicycle连接器的实验内容

 

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之用CTAS从mysql同步数据到hologres,改了字段长度,报错提示需要全部重新同步如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
23 8
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之程序初始化mysql没有完成就报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 4
|
1天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之sqlserver mysql都用的胖包,sqlserver的成功了,mysql报这个错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 6
|
1天前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接器换成2.4.2之后,mysql作业一直报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 3
|
4天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
722 5
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
8 1
|
1天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 3
|
4天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1809 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
4天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1795 2
官宣|Apache Flink 1.19 发布公告

热门文章

最新文章

  • 1
    实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
    8
  • 2
    实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
    7
  • 3
    实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
    9
  • 4
    实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
    8
  • 5
    实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
    7
  • 6
    实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
    6
  • 7
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    8
  • 8
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    9
  • 9
    实时计算 Flink版操作报错合集之查询sqlserver ,全量阶段出现报错如何解决
    8
  • 10
    实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
    8
  • 相关产品

  • 实时计算 Flink版
  • http://www.vxiaotou.com