【发布时间】:2019-04-26 06:34:58
【问题描述】:
我需要简要说明如何使用 logstash 将 MySQL 数据转换为 Elastic Search。 谁能解释一下这个的分步过程
【问题讨论】:
标签: mysql elasticsearch logstash kibana
我需要简要说明如何使用 logstash 将 MySQL 数据转换为 Elastic Search。 谁能解释一下这个的分步过程
【问题讨论】:
标签: mysql elasticsearch logstash kibana
这是一个宽泛的问题,我不知道您对MySQL 和ES 了解多少。假设您有一张桌子user。您可以简单地将其转储为csv 并将其加载到您的ES 会很好。但是如果你有一个动态数据,比如MySQL 就像一个管道,你需要写一个Script 来做这些事情。无论如何,您可以在询问如何之前查看以下链接以建立您的基本知识。
另外,因为您可能想知道如何将您的 CSV 转换为 json 文件,这是 ES 理解的最佳套件。
【讨论】:
您可以使用 jdbc input plugin 来进行日志存储。
Here 是一个配置示例。
【讨论】:
让我为您提供高级指令集。
#
input {
# Get the data from database, configure fields to get data incrementally
jdbc {
jdbc_driver_library => "./ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@db:1521:instance"
jdbc_user => "user"
jdbc_password => "pwd"
id => "some_id"
jdbc_validate_connection => true
jdbc_validation_timeout => 1800
connection_retry_attempts => 10
connection_retry_attempts_wait_time => 10
#fetch the db logs using logid
statement => "select * from customer.table where logid > :sql_last_value order by logid asc"
#limit how many results are pre-fetched at a time from the cursor into the client’s cache before retrieving more results from the result-set
jdbc_fetch_size => 500
jdbc_default_timezone => "America/New_York"
use_column_value => true
tracking_column => "logid"
tracking_column_type => "numeric"
record_last_run => true
schedule => "*/2 * * * *"
type => "log.customer.table"
add_field => {"source" => "customer.table"}
add_field => {"tags" => "customer.table" }
add_field => {"logLevel" => "ERROR" }
last_run_metadata_path => "last_run_metadata_path_table.txt"
}
}
# Massage the data to store in index
filter {
if [type] == 'log.customer.table' {
#assign values from db column to custom fields of index
ruby{
code => "event.set( 'errorid', event.get('ssoerrorid') );
event.set( 'msg', event.get('errormessage') );
event.set( 'logTimeStamp', event.get('date_created'));
event.set( '@timestamp', event.get('date_created'));
"
}
#remove the db columns that were mapped to custom fields of index
mutate {
remove_field => ["ssoerrorid","errormessage","date_created" ]
}
}#end of [type] == 'log.customer.table'
} #end of filter
# Insert into index
output {
if [type] == 'log.customer.table' {
amazon_es {
hosts => ["vpc-xxx-es-yyyyyyyyyyyy.us-east-1.es.amazonaws.com"]
region => "us-east-1"
aws_access_key_id => '<access key>'
aws_secret_access_key => '<secret password>'
index => "production-logs-table-%{+YYYY.MM.dd}"
}
}
}
logstash -f config.yml
【讨论】: