
本文介绍如何使用logstash同步mysql数据库信息到ElasticSearch
对于logstash2.x版本是没有集成logstash-jdbc-input插件的需要我们自己去安装;如果你是2.x版本可以参考:http://blog.csdn.net/yeyuma/article/details/50240595
对于logstash5.x版本他自身已经集成了这个插件,不需要我们去单独安装,直接使用即可
准备工作
检查是否安装jdk:https://bearjun.com/index.php/linux/245.html
检查是否安装es:https://bearjun.com/index.php/java/128.html
找一个和自己数据库一致的驱动包:mysql-connector-java-5.1.20.jar
因为我服务器的数据库版本是5.7版本,所以这边是一个对应5.7的驱动包
下载logstash
logstash官方下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash

注意:一定要和es的版本保持一致,前面讲的es是7.9.3,故logstash也是7.9.3
安装
- 上传logstash包还有驱动jar包到指定的目录
 - 解压logstash包
 
tar -zxvf logstash-7.9.3.tar.gz
解压完成之后,会多了一个目录

先简单看一下目录文件:

其中bin目录下是启动文件
- 创建同步文件信息
 
mkdir sync
我们的同步配置文件都会放在当前的文件夹里面
再创建一个数据同步的配置文件
# 注意后缀是.conf
vim logstash-db-sync.conf
再移动数据库启动到当前文件夹
# 注意mysql驱动的位置
mv /usr/local/mysql-connector-java-5.1.20.jar ./

- 配置
 
# 数据的输入
input {
  jdbc {
    # 驱动包位置
    jdbc_driver_library => "/usr/local/logstash/logstash-5.6.16/mysql-connector-java-5.1.6.jar"
    # 驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库地址
    jdbc_connection_string => "jdbc:mysql://ip:3306/dalaoyang"
    # 数据库连接用户名
    jdbc_user => "root"
    # 数据库连接用户密码
    jdbc_password => "password"
    # 执行sql语句文件位置(为了解耦,写在sql文件中)
    statement_filepath => "filename.sql"
    # 执行sql(如果不用statement_filepath,则把sql写在当前位置)
    # statement => "SELECT * from link_info"
    # 是否分页
    jdbc_paging_enabled => "true"
    # 分页数量
    jdbc_page_size => "50000"
    # 索引类型
    type => "_doc"
    # 执行任务时间间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => "* * * * *"
    # 是否开启记录上次追踪结果,也就是上次更新的时间,这个会记录到last_run-metadate_path的文件
    use_colnum_value => true
    # 记录上一次的追踪结果
    last_run_metadate_path => "/usr/local/logstash-7.9.3/sync/track_time"
    # 如果use_colnum_value为true,皮质本参数,追踪column,可以是自增的id或者时间
    tracking_column => "update_date"
    # tracking_column 对应的字段类型
    tracking_column_type => "timestamp"
    # 是否清除last_run_metadata_path的记录,true则每次都是从头开始查询所有
    clean_run => fasle
    # 数据库字段名称大写转小写
    lowercase_column_names => false
  }
}
 
# 数据的输出
output {
  elasticsearch {
        # es地址,集群配置数组
        hosts => ["ip:端口"]
        # 同步的索引名
        index => "test-mysql"
        # 设置_doc的id和数据库id一致, {id}:数据库的id
        document_id => "%{id}"
  }
  # 日志的输出
  stdout {
     codec => json_lines
  }
}
赋值配置文件,根据自己的需求修改,然后复制到logstash-db-sync.conf里面,
注意需要准备的sql文件
select 
t.id,
t.name,
t.age,
t.xx,
t.xx,
t.xx
from table t
where t.craete_time => :sql_last_value 
- 启动
准备好所有的文件后

进入bin,然后启动 
./logstash -f 配置文件.conf
# ./logstash -f /usr/local/logstash-5.6.16/sync/logstash-db-sync.conf
                      
                      
                    
            
          
            
          
评论区