【技术实验】mysql准实时同步数据到Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasticsearch作为大数据场景下搜索和分析的引擎,广泛应用于实时数据分析等场景。本文作者梳理了从MySQL准实时同步数据到Elasticsearch的实操步骤,帮助开发者理解和快速上手。

2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>


阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,


阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,

实验背景

Elasticsearch在阿里云商业化已经有一段时间,它作为大数据场景下搜索和分析的引擎,可以用于很多场景。前两天有同学提到需要将MySQL中的数据准实时的同步到ElasticSearch中的需求,由于自己对ES也很感兴趣但一直没有机会实操,恰好趁这个机会学习验证了一下,并把过程记录下来,方便新人尽快上手少走弯路。

本次实操采用Logstash实现将MySQL数据实时同步到ElasticSearch,过程中主要以实操步骤为主,并没有详细介绍这两个产品本身,所以文档适合对Logstash和Elasticsearch有一些基础的人阅读。使用Logstash过程中会用到一些参数,每个参数的详细解释请参考官方文档。

实验步骤

1. 开通阿里云Elasticsearch

开通阿里云Elasticsearch。产品链接

_

ES控制台中修改配置,允许自动创建索引。

_

2. 开通对应的ECS和RDS(MySQL)

开通与Elasticsearch相同Region、相同专有网络下的ECS一台,建议配置2核8G,操作系统:Aliyun Linux 17.1 64位,用于运行Logstash程序。

_

开通RDS(MySQL)

_

3. 在ECS上安装和配置curl

SSH登陆到ESC中,检查ECS是否已经安装curl。

curl是利用URL语法在命令行方式下工作的开源文件传输工具,可以用于测试访问ES服务。(Aliyun Linux 17.1 64位默认是预装curl的,若其他系统没有预装请自行安装)。
_

检查ECS中是否可以通过curl命令访问到ES服务。

curl -u ES用户名:ES密码 -XGET 'http://ES服务IP:9200/?pretty'
红色字体部分要替换相应的用户名、密码和ES服务IP或域名。如下图所以,能返回红框内的内容说明curl是可以访问到ES服务的。
_

4. 安装JDK8、MySQL5.6驱动以及Logstash -6.0.0

ECS中分别安装JDK8、MySQL5.6驱动以及Logstash -6.0.0。如下图:

_

安装Logstash input、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。

安装插件的命令分别是(在Logstash主目录下运行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
_

5. MySQL中创建数据库、测试的数据表

如下图所示

_
建表语句(其中updatetime用于记录数据更新时间戳):

create table jm_es_employee (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );

6. 配置Logstash作业文件

ECS中创建Logstash作业配置文件,文件名为logstash-mysql-es.conf。

配置文件内容:

input{
     jdbc {
         jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name"
         jdbc_user => "db_user"
         jdbc_password => "db_password"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "select * from jm_es_employee where updatetime > :sql_last_value"
         use_column_value => true
         tracking_column => "updatetime"
         last_run_metadata_path => "./logstash_jdbc_last_run"
       } 
} 
output{
      elasticsearch {
         hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
         user => "elastic"
         password => "es_password"
         index => "employee"
         document_id => "%{id}"
      }
      stdout {
         codec => json_lines
     }
 } 

其中红色字体部分要做相应的替换,input中的 schedule参数用于配置数据刷新频率,schedule => " *"表示每分钟刷新一次,这也是MySQL数据同步的最小频率。Logstash支持丰富的参数配置,详情请参考Elasitc官网文档

7. 同步数据

ECS中指定参数启动Logstash服务,执行命令:

logstash -f logstash-mysql-es.conf

_

之后每分钟会去MySQL中刷新数据

_

RDS中写入几条测试数据,脚本如下:

INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]'); 

由于之前在Logstash配置文件中,output部分既配置了输出到ES,同时也输出到控制台。所以当检测到MySQL中有更新时,数据会输出到控制台中,如下图:

_

此时说明MySQL中的数据更新已经被Logstash推送到ES服务。通过在ECS执行命令检查ES服务中的索引是否被创建。执行命令:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'

_

红框内的employee即我们在配置文件中指定的索引名,说明ES中的索引已经被成功创建。

8. 结果验证

通过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令通过关键字“Smith “来检索数据:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty' 

_

至此,MySQL中的数据已经被成功索引到Elasticsearch,并也可以被准实时的检索到。

作者:奇米 阿里巴巴高级工程师

加入钉钉技术讨论群

dingQR


阿里云Elasticsearch已正式发布啦,Elastic开源官方联合开发,集成5.5.3商业版本XPack功能,欢迎开通使用。
点击了解更多产品信息
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5天前
|
存储 JSON 数据格式
Elasticsearch 8.X 可以按照数组下标取数据吗?
Elasticsearch 8.X 可以按照数组下标取数据吗?
16 0
|
3天前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之将数据写入Elasticsearch时,若Elasticsearch中的字段类型为date,对应的SQL类型应该是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
24 0
|
3天前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
13 0
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
|
3天前
|
JSON 搜索推荐 API
使用Elasticsearch进行全文搜索:技术深度解析
【5月更文挑战第16天】本文深入解析了使用Elasticsearch进行全文搜索的技术细节。Elasticsearch是一个基于Lucene的开源搜索引擎,支持全文、结构化搜索和数据分析,具备优秀的扩展性。文中介绍了其核心功能,包括全文搜索、结构化搜索、分析和可扩展性。详细步骤涉及安装配置、数据准备、创建索引、导入数据、构建查询及结果处理。Elasticsearch凭借其高效性能和灵活性,成为企业全文搜索的首选解决方案。
|
5天前
|
负载均衡 关系型数据库 MySQL
MySQL读写分离技术深度解析
在高并发、大数据量的互联网应用环境中,数据库作为数据存储的核心组件,其性能直接影响着整个系统的运行效率。MySQL作为最常用的开源关系型数据库之一,虽然功能强大,但在处理大量并发读写请求时,单点服务器的性能瓶颈逐渐显现。为了解决这一问题,MySQL读写分离技术应运而生,成为提升数据库性能、实现负载均衡的有效手段。
|
5天前
|
安全 搜索推荐 定位技术
一张图30个知识点,全方位认知 Elasticsearch 技术发展
一张图30个知识点,全方位认知 Elasticsearch 技术发展
22 3
|
5天前
|
SQL 监控 搜索推荐
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
38 2
|
5天前
|
存储 数据处理 索引
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
34 6
|
5天前
|
JSON 测试技术 数据格式
Elasticsearch 8.X 如何生成 TB 级的测试数据 ?
Elasticsearch 8.X 如何生成 TB 级的测试数据 ?
15 0
|
5天前
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
12 0

相关产品

  • 检索分析服务 Elasticsearch版
  • http://www.vxiaotou.com