流数据湖平台Apache Paimon(六)集成Spark之DML插入数据

简介: 流数据湖平台Apache Paimon(六)集成Spark之DML插入数据

4.4. 插入数据

INSERT 语句向表中插入新行。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。

INSERT INTO table_identifier [ part_spec ] [ column_list ] { value_expr | query }

part_spec

可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date’2019-01-02’)。

语法: PARTITION (分区列名称 = 分区列值 [ , … ] )

column_list

可选,指定以逗号分隔的字段列表。

语法:(col_name1 [,column_name2, …])

所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与 VALUES 子句或查询中的数据大小完全相同。

value_expr

指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。

语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]

注意:将 Nullable 字段写入 Not-null 字段

不能将另一个表的可为空列插入到一个表的非空列中。Spark可以使用nvl函数来处理,比如A表的key1是not null,B表的key2是nullable:

INSERT INTO A key1 SELECT nvl(key2, ) FROM B

案例:

INSERT INTO tests VALUES(1,1,'order','2023-07-01','1'), (2,2,'pay','2023-07-01','2');
INSERT INTO tests_p SELECT * from tests;

4.5. 查询数据

就像所有其他表一样,Paimon 表可以使用 SELECT 语句进行查询。

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。

4.5.1 时间旅行

可以在查询中使用 VERSION AS OF 和 TIMESTAMP AS OF 来进行时间旅行。

1)读取指定id的快照

SELECT * FROM tests VERSION AS OF 1;
SELECT * FROM tests VERSION AS OF 2;

2)读取指定时间戳的快照

-- 查看快照信息
SELECT * FROM tests&snapshots;
SELECT * FROM tests TIMESTAMP AS OF '2023-07-03 15:34:20.123';
-- 时间戳指定到秒(向上取整)
SELECT * FROM tests TIMESTAMP AS OF 1688369660;

3)读取指定标签

SELECT * FROM tests VERSION AS OF 'my-tag';

4.5.2 增量查询

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:

spark.read()
.format(“paimon”)
.option(“incremental-between”, “3,5”)
.load(“path/to/table”)

4.6 系统表

系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

4.6.1 快照表 Snapshots Table

通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。Spark中使用需要反引号表名$系统表名

SELECT * FROM tests$snapshots;

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

4.6.2 模式表 Schemas Table

通过schemas表可以查询该表的历史schema。

SELECT * FROM tests$schemas;

可以连接快照表和模式表以获取给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields
FROM tests$snapshots s JOIN tests$schemas t
ON s.schema_id=t.schema_id where s.snapshot_id=3;

4.6.3 选项表 Options Table

可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值。

SELECT * FROM tests$options;

4.6.4 审计日志表 Audit log Table

如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。

rowkind 有四个值:

+I:插入操作。

-U:使用更新行的先前内容进行更新操作。

+U:使用更新行的新内容进行更新操作。

-D:删除操作。

SELECT * FROM tests$audit_log;

4.6.5 文件表 Files Table

可以查询特定快照表的文件。

– 查询最新快照的文件

SELECT * FROM tests$files;

4.6.6 标签表 Tags Table

通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。

SELECT * FROM tests$tags;


目录
相关文章
|
6天前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
6天前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
6天前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
|
8天前
|
消息中间件 JSON Kafka
AutoMQ 生态集成 Apache Doris
Apache Doris 是一个高性能的分析型数据库,以其亚秒级查询响应和对复杂分析的支持而知名。它适合报表分析、即席查询等场景,能从 AutoMQ 通过 Routine Load 导入 Kafka 主题数据。本文详述了如何配置 Doris 环境,创建测试数据,以及设置 Routine Load 作业从 AutoMQ 导入 JSON 数据到 Doris 表的过程。最后,文中展示了验证数据成功导入的方法。Apache Doris 提供了低成本、高弹性的数据处理解决方案,其团队由 Apache RocketMQ 和 Linux LVS 的核心成员组成。
27 0
|
17天前
|
分布式计算 Java 关系型数据库
|
17天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
17天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
28 0
|
17天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
38 0
|
17天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
31 0
|
17天前
|
机器学习/深度学习 算法 数据挖掘
R语言气象模型集成预报:神经网络、回归、svm、决策树用环流因子预测降雨降水数据
R语言气象模型集成预报:神经网络、回归、svm、决策树用环流因子预测降雨降水数据

推荐镜像

更多
http://www.vxiaotou.com