Apache Doris-原理实操-2

简介: 数据表sql语法、kafka数据源导入

数据表的创建与查看

2.1 创建数据库

CREATE DATABASE example_db;

2.2 可以通过 SHOW DATABASES; 查看数据库信息。

image.png

information_schema是为了兼容MySQL协议而存在。

2.3 建表使用 CREATE TABLE 命令建立一个表(Table)

首先切换数据库

USE example_db;

Doris支持单分区和复合分区两种建表方式。

在复合分区中:

第一级称为 Partition,即分区。用户可以指定某一维度列作为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。

第二级称为 Distribution,即分桶。用户可以指定一个或多个维度列以及桶数对数据进行 HASH 分布。

以下场景推荐使用复合分区

历史数据删除需求:如有删除历史数据的需求(比如仅保留最近N 天的数据)。使用复合分区,可以通过删除历史分区来达到目的。也可以通过在指定分区内发送 DELETE 语句进行数据删除。

.

单分区建表

CREATE TABLE table1
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

image.png

这个表的 schema 如下:

siteid:类型是INT(4字节), 默认值为10

citycode:类型是SMALLINT(2字节)

username:类型是VARCHAR, 最大长度为32, 默认值为空字符串

pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, Doris内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)


复合分区建表

CREATE TABLE table2
(
    event_day DATE,
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day, siteid, citycode, username)
PARTITION BY RANGE(event_day)
(
    PARTITION p201706 VALUES LESS THAN ('2017-07-01'),
    PARTITION p201707 VALUES LESS THAN ('2017-08-01'),
    PARTITION p201708 VALUES LESS THAN ('2017-09-01')
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");


image.png

event_day:类型是DATE,无默认值

siteid:类型是INT(4字节), 默认值为10

citycode:类型是SMALLINT(2字节)

username:类型是VARCHAR, 最大长度为32, 默认值为空字符串

pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, Doris 内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)

我们使用 event_day 列作为分区列,建立3个分区: p201706, p201707, p201708

p201706:范围为 [最小值, 2017-07-01)

p201707:范围为 [2017-07-01, 2017-08-01)

p201708:范围为 [2017-08-01, 2017-09-01)

每个分区使用 siteid 进行哈希分桶,通过hash均匀分散在 不同分表

2.4 数据的查询

table1数据
insert into table1  values(1,1,'jim',2),(2,1,'grace',2),(3,2,'tom',2),(4,3,'bush',3),(5,3,'helen',3);
table2数据
insert into table2  values('2017-07-03',1,1,'jim',2),('2017-06-05',2,1,'grace',2),('2017-07-12',3,2,'tom',2),('2017-07-15',4,3,'bush',3),('2017-08-12',5,3,'helen',3);

简单查询

SELECT * FROM table1 LIMIT 3;
SELECT * FROM table1 ORDER BY citycode;

image.png

Join 查询

SELECT SUM(table1.pv) FROM table1 JOIN table2 WHERE table1.siteid = table2.siteid;

image.png

子查询

SELECT SUM(pv) FROM table2 WHERE siteid IN (SELECT siteid FROM table1 WHERE siteid > 2);


image.png

3 数据源导入数据

doris订阅kafka

数据源读取数据,将数据导入到 Doris 中。目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入的数据。

语法:

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
    [load_properties]
    [job_properties]
    FROM data_source
    [data_source_properties]

1. [db.]job_name

.

导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。


2.tbl_name

.

指定需要导入的表的名称。


3.load_properties

.

用于描述导入数据。语法:

[column_separator],
[columns_mapping],
[where_predicates],
[partitions]

3.1 column_separator:

指定列分隔符,如:

COLUMNS TERMINATED BY ","

默认为:\t

3.2 columns_mapping:

指定源数据中列的映射关系,以及定义衍生列的生成方式。

3.2.1映射列:

按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。

假设目的表有三列 k1, k2, v1。源数据有4列,其中第1、2、4列分别对应 k2, k1, v1。则书写如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 为不存在的一列,用于跳过源数据中的第三列。

3.2.2 衍生列:

以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。

衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。

接上一个示例,假设目的表还有第4列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

3.2.3 where_predicates

用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:

WHERE k1 > 100 and k2 = 1000

3.2.4. partitions

指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)

4 job_properties

用于指定例行导入作业的通用参数。

语法:

PROPERTIES (
            "key1" = "val1",
            "key2" = "val2"
        )

目前我们支持以下参数:

4.1 desired_concurrent_number

期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。

这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

例:

"desired_concurrent_number" = "3"

4.2 max_batch_interval/max_batch_rows/max_batch_size

这三个参数分别表示:

1)每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。

2)每个子任务最多读取的行数。必须大于等于200000。默认是200000。

3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB。

这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。

例:

"max_batch_interval" = "20",
              "max_batch_rows" = "300000",
              "max_batch_size" = "209715200"

4.3 max_error_number

采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。

采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行。

4. 4 strict_mode

是否开启严格模式,默认为开启。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"


5. data_source

数据源的类型。当前支持:

KAFKA

6. data_source_properties

指定数据源相关的信息。

语法:

(
            "key1" = "val1",
            "key2" = "val2"
        )

KAFKA 数据源

6.1kafka_broker_list

Kafka 的 broker 连接信息。格式为 ip:host。多个broker之间以逗号分隔。

示例:

"kafka_broker_list" = "broker1:9092,broker2:9092"

6.2. kafka_topic

指定要订阅的 Kafka 的 topic。

示例:

"kafka_topic" = "my_topic"

6.3. kafka_partitions/kafka_offsets

指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset。

offset 可以指定从大于等于 0 的具体 offset,或者:

1) OFFSET_BEGINNING: 从有数据的位置开始订阅。

2) OFFSET_END: 从末尾开始订阅。

如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。

示例

"kafka_partitions" = "0,1,2,3",
                    "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"

实例1:

为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且自动默认消费所有分区,且从末尾(OFFSET_END)开始订阅

CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false",
            "format" = "json"  //接受的数据为json格式
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "broker1:9092,broker2:9092",
            "kafka_topic" = "my_topic"
        );

实例2:

导入八卦炉表数据

CREATE ROUTINE LOAD test.job3 ON ba_gua_lu_1003_log COLUMNS
( id, reserve1, reserve2, reserve3, reserve4, reserve5, user_id, channel, registerTime, holdCard, isForge, leftTime, create_time )
PROPERTIES
(
                "desired_concurrent_number" = "3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false",
                "format" = "json"
)
FROM
KAFKA (
        "kafka_broker_list" = "192.168.1.27:9092",
        "kafka_topic" = "kafka-pull",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

查看任务状态

SHOW ROUTINE LOAD;

DORIS个人使用感受


从个人的使用中,我觉得DORIS比较适合实时/离线的业务数据的计算。

Doris兼容sql协议,让它能很好的处理业务库中的关系型数据,然后它的缺点就在于对非关系型数据的处理了,因为doris实际上也是以表作为处理单位的。对服务器的性能要求特别高。

Doris的数据导入问题。Doris支持的五种两类数据导入方式,基本能覆盖kafka、hdfs、mysql等作为数据上游。但是从官方的介绍来看,doris的数据导入是有一个弊端的,就是导入数据的部分异常,可能会导致整个任务的失败。

Doris市面相关资料不多,使用学习和运维成本高、风险大。出了问题,比较难维护需要熟悉源码,才能去调试 bug 。

相关文章
|
5天前
|
SQL Java 数据库连接
Apache Doris 支持 Arrow Flight SQL 协议,数据传输效率实现百倍飞跃
近年来,随着数据科学、数据湖分析等场景的兴起,对数据读取和传输速度提出更高的要求。而 JDBC/ODBC 作为与数据库交互的主流标准,在应对大规模数据读取和传输时显得力不从心,无法满足高性能、低延迟等数据处理需求。为提供更高效的数据传输方案,Apache Doris 在 2.1 版本中基于 Arrow Flight SQL 协议实现了高速数据传输链路,使得数据传输性能实现百倍飞跃。
|
5天前
|
缓存 安全 Java
阿里云数据库 SelectDB 内核 Apache Doris 2.0.6 版本正式发布
阿里云数据库 SelectDB 内核 Apache Doris 2.0.6 版本正式发布
|
5天前
|
SQL 存储 JSON
阿里云数据库 SelectDB 内核 Apache Doris 2.1.0 版本发布:开箱盲测性能大幅优化,复杂查询性能提升 100%
亲爱的社区小伙伴们,Apache Doris 2.1.0 版本已于 2024 年 3 月 8 日正式发布,新版本开箱盲测性能大幅优化,在复杂查询性能方面提升100%,新增Arrow Flight接口加速数据读取千倍,支持半结构化数据类型与分析函数。异步多表物化视图优化查询并助力仓库分层建模。引入自增列、自动分区等存储优化,提升实时写入效率。Workload Group 资源隔离强化及运行时监控功能升级,保障多负载场景下的稳定性。新版本已经上线,欢迎大家下载使用!
阿里云数据库 SelectDB 内核 Apache Doris 2.1.0 版本发布:开箱盲测性能大幅优化,复杂查询性能提升 100%
|
5天前
|
关系型数据库 Apache 流计算
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
5天前
|
存储 监控 安全
360 企业安全浏览器基于阿里云数据库 SelectDB 版内核 Apache Doris 的数据架构升级实践
为了提供更好的日志数据服务,360 企业安全浏览器设计了统一运维管理平台,并引入 Apache Doris 替代了 Elasticsearch,实现日志检索与报表分析架构的统一,同时依赖 Doris 优异性能,聚合分析效率呈数量级提升、存储成本下降 60%....为日志数据的可视化和价值发挥提供了坚实的基础。
360 企业安全浏览器基于阿里云数据库 SelectDB 版内核 Apache Doris 的数据架构升级实践
|
4天前
|
SQL 存储 调度
从 Volcano 火山模型到 Pipeline 执行模型,阿里云数据库 SelectDB 内核 Apache Doris 执行模型的迭代
一个合适的执行模型对于提高查询效率和系统性能至关重要。本文全面剖析 Apache Doris Pipeline 执行模型的设计与改造历程,并在 2.1 版本对并发执行模式与调度模式进一步优化,解决了执行并发受限、执行及调度开销大等问题。
从 Volcano 火山模型到 Pipeline 执行模型,阿里云数据库 SelectDB 内核 Apache Doris 执行模型的迭代
|
5天前
|
SQL 大数据 BI
从离线到实时:无锡锡商银行基于 Apache Doris 的数据仓库演进实践
从离线到实时:无锡锡商银行基于 Apache Doris 的数据仓库演进实践
|
5天前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
5天前
|
存储 SQL Apache
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
|
5天前
|
Kubernetes 关系型数据库 Apache
Apache Doris 2.1.2 版本正式发布!
Apache Doris 2.1.2 版本正式发布!该版本提交了若干改进项以及问题修复,进一步提升了系统的性能及稳定性,欢迎大家下载体验!

热门文章

最新文章

推荐镜像

更多
http://www.vxiaotou.com