diff --git a/README.md b/README.md index b032b83..7a0e05a 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ fluent plugin mysql bulk insert is high performance and on duplicate key update ## Note fluent-plugin-mysql-bulk merged this repository. - +This repository now includes the mysql_bulk plugin. [mysql plugin](README_mysql.md) is deprecated. You should use mysql_bulk. v0.1.5 only supports fluentd-0.12.X and v0.2.0 only supports fluentd-0.14.X. @@ -33,6 +33,7 @@ table|bulk insert table (require) on_duplicate_key_update|on duplicate key update enable (true:false) on_duplicate_update_keys|on duplicate key update column, comma separator transaction_isolation_level|set transaction isolation level(default: nil) +column_names_empty_to_null|Columns for which empty values should be converted to nil (null in SQL) ## Configuration Example(bulk insert) diff --git a/lib/fluent/plugin/out_mysql_bulk.rb b/lib/fluent/plugin/out_mysql_bulk.rb index 1539eda..0fe8e2c 100644 --- a/lib/fluent/plugin/out_mysql_bulk.rb +++ b/lib/fluent/plugin/out_mysql_bulk.rb @@ -54,6 +54,9 @@ class MysqlBulkOutput < Output config_param :transaction_isolation_level, :enum, list: [:read_uncommitted, :read_committed, :repeatable_read, :serializable], default: nil, desc: "Set transaction isolation level." + config_param :column_names_empty_to_null, :string, default: nil, + desc: "Columns for which empty values are stored as NULL (no quotes)" + attr_accessor :handler def initialize @@ -100,6 +103,7 @@ def configure(conf) @column_names = @column_names.split(',').collect(&:strip) @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip) + @column_names_empty_to_null = @column_names_empty_to_null.split(',').collect(&:strip) if @column_names_empty_to_null @json_key_names = @json_key_names.split(',') if @json_key_names @unixtimestamp_key_names = @unixtimestamp_key_names.split(',') if @unixtimestamp_key_names end @@ -200,6 +204,12 @@ def format_proc if @unixtimestamp_key_names && @unixtimestamp_key_names.include?(key) value = Time.at(value).strftime('%Y-%m-%d %H:%M:%S') end + + if @column_names_empty_to_null && @column_names_empty_to_null.include?(key) + if value.nil? || value.empty? + value = nil + end + end end values << value end