【问题标题】:How should I use sql_last_value in logstash?我应该如何在 logstash 中使用 sql_last_value?
【发布时间】:2017-03-14 21:07:38
【问题描述】:

我不太清楚sql_last_value 在我这样发表声明时做了什么:

statement => "SELECT * from mytable where id > :sql_last_value"

我可以稍微理解使用它的原因,它不会浏览整个数据库表以更新字段,而是只更新新添加的记录。如果我错了,请纠正我。

所以我想做的是,使用logstash 创建索引:

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db" 
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_validate_connection => true
        jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        schedule => "* * * * *"
        statement => "SELECT * from mytable where id > :sql_last_value"
        use_column_value => true
        tracking_column => id
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
}

output {
    elasticsearch {
        #protocol => http
        index => "myindex"
        document_type => "message_logs"
        document_id => "%{id}"
        action => index
        hosts => ["http://myhostmachine:9402"]
    }
}

一旦我这样做了,文档就根本不会上传到索引中。我哪里错了?

任何帮助都将不胜感激。

【问题讨论】:

  • 您的表中是否有一个时间戳列在每次记录更新时都会更新?
  • @Val nop 我不知道。我必须有一个才能更新每条记录吗?
  • 这样更容易获取最新更新的记录。更新记录时,id 不会更改,您可能无法获取更新记录。
  • 在您的主文件夹中,您可以尝试删除.logstash_jdbc_last_run 文件,看看是否效果更好。
  • 我将尝试使用时间戳值并返回。你的意思是logstash的主文件夹?

标签: elasticsearch jdbc logstash logstash-configuration elasticsearch-5


【解决方案1】:

如果您的表中有时间戳列(例如last_updated),您最好使用它而不是 ID 列。因此,当记录更新时,您也可以修改该时间戳,jdbc 输入插件将获取该记录(即 ID 列不会更改其值,并且不会获取更新的记录)

input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://hostmachine:3306/db" 
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_validate_connection => true
        jdbc_driver_library => "/path/mysql_jar/mysql-connector-java-5.1.39-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "* * * * *"
        statement => "SELECT * from mytable where last_updated > :sql_last_value"
    }
}

如果您仍然决定保留 ID 列,则应删除 $HOME/.logstash_jdbc_last_run 文件并重试。

【讨论】:

  • 我添加了一个 varchar 时间戳列,我已经手动将值插入到我的表中以用于测试目的 (2016-09-01 00:00:00) 并尝试创建索引,但是仍然没有任何记录被上传到索引。我也删除了logstash_jdbc_last_run
  • timestamp 列不应该是 varchar,而是 timestamp 或者 date 或 date_time
  • 我用日期时间类型列重新创建了场景。该值可以是这样的(即:2016-09-01 00:00:00)对吗?文档仍然没有上传到索引中。我可以提供 logstash conf 和 index docs count 输出。
  • 你能用--debug命令行开关运行logstash吗?
  • 如您所见,运行的查询是SELECT count(*) AS count FROM (SELECT * from TEST where time > '2016-11-01 17:45:18') AS t1 LIMIT 1,因此您的时间戳需要大于2016-11-01 17:45:18
【解决方案2】:

有几点需要注意:

  1. 如果您之前在没有计划的情况下运行了 Logstash,那么在使用计划运行 Logstash 之前,请删除该文件:

    $HOME/.logstash_jdbc_last_run
    

    在 Windows 中,此文件位于:

    C:\Users\<Username>\.logstash_jdbc_last_run
    
  2. Logstash 配置中的“statement =>”应该有 tracking_column 的“order by”。

  3. tracking_column 应该正确给出。

以下是 Logstash 配置文件的示例:

    input {
jdbc {
    # MySQL DB jdbc connection string to our database, softwaredevelopercentral
    jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
    # The user we wish to execute our statement as
    jdbc_user => "root"
    # The user password
    jdbc_password => ""
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
    # The name of the driver class for MySQL DB
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # our query
    schedule => "* * * * *"
    statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
    use_column_value => true
    tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch { 
   hosts => ["localhost:9200"]
   index => "students"
   document_type => "student"
   document_id => "%{studentid}"
   }

}

要查看相同的工作示例,您可以查看我的博客文章: http://softwaredevelopercentral.blogspot.com/2017/10/elasticsearch-logstash-kibana-tutorial.html

【讨论】:

    【解决方案3】:

    简单来说,sql_last_value 允许您将上次运行的 sql 中的数据保存为名称 sugets。

    当您安排查询时,此值特别有用。但为什么 ... ? 因为您可以根据sql_last_value 中存储的值创建您的 sql 语句条件,并且避免检索已经为您的 logstash 输入提取的行或在上次管道执行后更新的行

    使用sql_last_value 时的注意事项

    • 默认情况下,此变量存储上次运行的时间戳。当您需要提取基于 creation_date last_update 等列的数据时很有用。
    • 您可以通过使用特定表的列值跟踪sql_last_value 的值来定义它。当您需要基于自动增量数据摄取时很有用。为此,您需要指定use_column_value =&gt; truetracking_column =&gt; "column_name_to_track"

    下面的例子会将mytable最后一行的id存入:sql_last_value,以便在下一次执行时摄取之前没有被摄取的行,即id大于上一个的行摄取的 ID。

    input {
        jdbc {
            # ...
            schedule => "* * * * *"
            statement => "SELECT * from mytable where id > :sql_last_value"
            use_column_value => true
            tracking_column => id
        }
    }
    
    

    非常重要!!!

    当您在管道中使用多个输入时,每个输入块将覆盖最后一个的 sql_last_value 的值。为了避免这种行为,您可以使用last_run_metadata_path =&gt; "/path/to/sql_last_value/of_your_pipeline.yml" 选项,这意味着每个管道将自己的值存储在不同的文件中。

    【讨论】:

    • 是的,那个极其重要的部分让我头疼,但我认为这就是原因
    • 如何将 use_column_value 设置为 false。它仍然应该允许您增量更新,而是将最后一个检查点存储在元数据文件中......还是我误解了?如果设置为 true,则每次 logstash 将重新启动它会重置 sql_last_value 的值
    猜你喜欢
    • 2017-03-15
    • 2020-04-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-06
    • 2011-09-18
    • 2020-08-18
    相关资源
    最近更新 更多