From a5af305cb0e59276a9a36971c34c30fbcafc3a08 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 27 Nov 2023 11:58:28 +0900 Subject: [PATCH] Adjust ilm endpoint Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/elasticsearch_compat.rb | 1 + ...lasticsearch_index_lifecycle_management.rb | 22 +++++++++---------- .../plugin/out_elasticsearch_data_stream.rb | 10 ++++----- ...lasticsearch_index_lifecycle_management.rb | 10 +++------ test/plugin/test_out_elasticsearch.rb | 18 +++------------ .../test_out_elasticsearch_data_stream.rb | 6 +---- 6 files changed, 24 insertions(+), 43 deletions(-) diff --git a/lib/fluent/plugin/elasticsearch_compat.rb b/lib/fluent/plugin/elasticsearch_compat.rb index 7c888d7b..0b6cda5e 100644 --- a/lib/fluent/plugin/elasticsearch_compat.rb +++ b/lib/fluent/plugin/elasticsearch_compat.rb @@ -12,6 +12,7 @@ begin require 'elasticsearch/xpack' rescue LoadError + require 'elasticsearch/api' # For elasticsearch-ruby 8 or later end end diff --git a/lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb b/lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb index 1734fee7..d1e52e31 100644 --- a/lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb +++ b/lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb @@ -45,19 +45,19 @@ def xpack_info end def get_ilm_policy - if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0") - client.ilm.get_policy - else - client.enrich.get_policy - end + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + client.ilm.get_lifecycle + else + client.ilm.get_policy + end end def ilm_policy_exists?(policy_id) begin - if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0") - client.ilm.get_policy(policy_id: policy_id) + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + client.ilm.get_lifecycle(policy: policy_id) else - client.enrich.get_policy(name: policy_id) + client.ilm.get_policy(policy_id: policy_id) end true rescue @@ -67,10 +67,10 @@ def ilm_policy_exists?(policy_id) def ilm_policy_put(policy_id, policy) log.info("Installing ILM policy: #{policy}") - if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0") - client.ilm.put_policy(policy_id: policy_id, body: policy) + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + client.ilm.put_lifecycle(policy: policy_id, body: policy) else - client.enrich.put_policy(name: policy_id, body: policy) + client.ilm.put_policy(policy_id: policy_id, body: policy) end end diff --git a/lib/fluent/plugin/out_elasticsearch_data_stream.rb b/lib/fluent/plugin/out_elasticsearch_data_stream.rb index 0fa6de49..649be76a 100644 --- a/lib/fluent/plugin/out_elasticsearch_data_stream.rb +++ b/lib/fluent/plugin/out_elasticsearch_data_stream.rb @@ -102,8 +102,8 @@ def create_ilm_policy(datastream_name, template_name, ilm_name, host = nil) retry_operate(@max_retry_putting_template, @fail_on_putting_template_retry_exceed, @catch_transport_exception_on_retry) do - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - client(host).enrich.put_policy(params.merge(name: ilm_name)) + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + client(host).ilm.put_lifecycle(params.merge(policy: ilm_name)) else client(host).xpack.ilm.put_policy(params.merge(policy_id: ilm_name)) end @@ -159,10 +159,10 @@ def create_data_stream(datastream_name, host = nil) def ilm_policy_exists?(policy_id, host = nil) begin - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - client(host).enrich.get_policy(name: policy_id) + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + client.ilm.get_lifecycle(policy: policy_id) else - client(host).ilm.get_policy(policy_id: policy_id) + client.ilm.get_policy(policy_id: policy_id) end true rescue diff --git a/test/plugin/test_elasticsearch_index_lifecycle_management.rb b/test/plugin/test_elasticsearch_index_lifecycle_management.rb index e398d6c3..b162c813 100644 --- a/test/plugin/test_elasticsearch_index_lifecycle_management.rb +++ b/test/plugin/test_elasticsearch_index_lifecycle_management.rb @@ -38,19 +38,15 @@ def elasticsearch_version end def ilm_existence_endpoint(policy_id) - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - "_enrich/policy/#{policy_id}" + if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") + "_ilm/policy/#{policy_id}" else "_ilm/policy/%7B:policy_id=%3E%22#{policy_id}%22%7D" end end def ilm_creation_endpoint(policy_id) - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - "_enrich/policy/#{policy_id}" - else - "_ilm/policy/#{policy_id}" - end + "_ilm/policy/#{policy_id}" end def stub_elastic_info(url="http://localhost:9200/", version=elasticsearch_version) diff --git a/test/plugin/test_out_elasticsearch.rb b/test/plugin/test_out_elasticsearch.rb index 3468b00c..ae6a491f 100644 --- a/test/plugin/test_out_elasticsearch.rb +++ b/test/plugin/test_out_elasticsearch.rb @@ -524,11 +524,7 @@ def stub_elastic_info_bad(url="http://localhost:9200/", version="6.4.2") end def ilm_endpoint - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - '_enrich'.freeze - else - '_ilm'.freeze - end + '_ilm'.freeze end data("legacy_template" => [true, "_template"], @@ -1179,11 +1175,7 @@ def setup end def ilm_endpoint - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - '_enrich'.freeze - else - '_ilm'.freeze - end + '_ilm'.freeze end data("legacy_template" => [true, "_template"], @@ -2601,11 +2593,7 @@ def setup def ilm_endpoint - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - '_enrich'.freeze - else - '_ilm'.freeze - end + '_ilm'.freeze end data("legacy_template" => [true, "_template"], diff --git a/test/plugin/test_out_elasticsearch_data_stream.rb b/test/plugin/test_out_elasticsearch_data_stream.rb index 5fd3fc6d..c28f91f1 100644 --- a/test/plugin/test_out_elasticsearch_data_stream.rb +++ b/test/plugin/test_out_elasticsearch_data_stream.rb @@ -31,11 +31,7 @@ def elasticsearch_version end def ilm_endpoint - if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0") - '_enrich'.freeze - else - '_ilm'.freeze - end + '_ilm'.freeze end def driver(conf='', es_version=elasticsearch_version.to_i, client_version=elasticsearch_version)