Flink入坑指南第五章 - 语法糖 view

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。什么是view(视图):视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。

2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>


阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,


阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,

Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。

什么是view(视图):
视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。视图可以包含表中的所有列,或者仅包含选定的列。视图可以创建自一个或者多个表,这取决于创建该视图的 SQL 语句的写法。
视图,一种虚拟的表,允许用户执行以下操作:

  • 以用户或者某些类型的用户感觉自然或者直观的方式来组织数据;
  • 限制对数据的访问,从而使得用户仅能够看到或者修改(某些情况下)他们需要的数据;
  • 从多个表中汇总数据,以产生报表。

(引自:极客学院)

Flink SQL兼容标准SQL,view的作用与标准SQL相同,有几个特点:

  • 在Flink SQL中,view是一种临时表
  • 与标准SQL一样,视图可以创建自一个或多个表/视图
  • 视图的结果不会进行持久化,仅作为计算的中间结果进行传输
  • 视图的数据也可以被输出到结果表中

Flink SQL中,视图的语法非常简单,可参考:view语法。接下来我们通过一些例子来实际感受一下视图的作用。

假设在IoT场景中,要过滤出两个厂房中的传感器的异常数据。两个厂房的数据分别发到了datahub的两个不同topic,需要将两个datahub topic中异常数据过滤出来,再汇总。
原始数据结构如下:

  • date
  • hour
  • ip: device ip
  • event_id: 

DDL -- 定义输入输出数据的数据结构,具体语法请参见 datahub源表/结果表语法,维表相关语法详见Flink SQL维表语法

-- source1 定义厂房1的topic的数据结构
create table fab1(
  `date` int,
  hour int,
  ip varchar,
  event_id BIGINT
) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic1',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX'); 
  
 -- source2 定义厂房2的topic的数据结构
  create table fab2(
  `date` int,
  hour int,
  ip varchar,
  event_id BIGINT
) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic2',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX');
  
  -- 定义结果表1的数据结构
  create table sink(
  `date` int,
  hour int,
  event_id bigint,
  event_cnt bigint
  ) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic2',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX');
  
  -- 定义结果表2的数据结构
  create table sink(
  `date` int,
  hour int,
  event_id bigint,
  event_cnt bigint
  ) with (
  type='rds',
  url='xxxxxx',
  tableName='xxxxxx',
  userName='xxxxxx',
  password='xxxxxx'
);

  -- 维表
  CREATE TABLE device_whitelist (
  ip varchar,
  category varchar,
  PRIMARY KEY (ip),  -- 用作维表时,必须有声明的主键。
  PERIOD FOR SYSTEM_TIME  -- 定义维表的变化周期
) with (
  type = 'rds',
  ...
)

写法一,按照批处理系统/数据库的思维来看,这个需求非常简单:

insert into sink
select e.`ip`,e.`hour`,e.`date`,e.`event_id` from 
(
  select * from fab1
  where event_id='00001'
  union 
  select * from fab2
  where event_id='00001'
) e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`

写法二,使用view,将各个复杂SQL模块拆开:

-- 
CREATE VIEW view1(`date`,`hour`,`ip`,`event_id`) AS
SELECT * FROM fab1
WHERE event_id='00001'
UNION 
SELECT * FROM fab2
WHERE event_id='00001'

-- 
CREATE VIEW view2(`date`,`hour`,`ip`,`event_id`) AS
SELECT e.`date`,e.`hour`,e.`ip`,e.`event_id` FROM view1 e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`

-- INSERT INTO sink1
INSERT INTO sink1
SELECT * FROM view2

-- INSERT INTO sink2
INSERT INTO sink2
SELECT * FROM view1

Flink中SQL的数据是不断动态变化的,特别是涉及到一些特殊语法(如window级连/嵌套等),需要分步调试每个SQL模块的结果。如果用写法一,会大大增加SQL调试难度。因此,使用Flink SQL,建议使用第二种写法,用view将各个语法块串联,方便调试和排查问题。写法一和写法二最终生成的作业DAG图都是一样的,没有任何区别。一个Flink SQL作业可以同时定义多个输出表,结果可同时被输出到多种数据源中。

如果在使用实时计算产品过程中有任何问题,欢迎在博客下方回复交流。

相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
相关文章
|
4天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
722 5
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
8 1
|
1天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 3
|
4天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1809 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
4天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1795 2
官宣|Apache Flink 1.19 发布公告
|
4天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
349 3
|
4天前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
439 0
|
4天前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
269 0
|
4天前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
180 1

热门文章

最新文章

  • 1
    实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
    8
  • 2
    实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
    8
  • 3
    实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
    9
  • 4
    实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
    9
  • 5
    实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
    7
  • 6
    实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
    7
  • 7
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    8
  • 8
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    10
  • 9
    实时计算 Flink版操作报错合集之查询sqlserver ,全量阶段出现报错如何解决
    9
  • 10
    实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
    8
  • 相关产品

  • 实时计算 Flink版
  • http://www.vxiaotou.com