diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 31478e8e8f65..510a6bebd4e2 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -9,7 +9,7 @@ jobs: stale: runs-on: ubuntu-latest steps: - - uses: actions/stale@v8 + - uses: actions/stale@v9 with: # PAT for GitHub API authentication repo-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/.gitmodules b/.gitmodules index 088ae3b577ce..a5fa46943ca3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -75,3 +75,6 @@ [submodule "src/jaegertracing/opentelemetry-cpp"] path = src/jaegertracing/opentelemetry-cpp url = https://github.com/open-telemetry/opentelemetry-cpp.git +[submodule "src/nvmeof/gateway"] + path = src/nvmeof/gateway + url = https://github.com/ceph/ceph-nvmeof.git diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8aa271a2b5b2..a1be041685f7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -865,6 +865,107 @@ if(WITH_FUSE) install(PROGRAMS mount.fuse.ceph DESTINATION ${CMAKE_INSTALL_SBINDIR}) endif(WITH_FUSE) +# NVMEOF GATEWAY MONITOR CLIENT + +option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" OFF) +if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT) + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf REQUIRED) + + set(_REFLECTION grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() + + # Gateway Proto file + get_filename_component(nvmeof_gateway_proto "nvmeof/gateway/control/proto/gateway.proto" ABSOLUTE) + get_filename_component(nvmeof_gateway_proto_path "${nvmeof_gateway_proto}" PATH) + + # Generated sources + set(nvmeof_gateway_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.cc") + set(nvmeof_gateway_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.h") + set(nvmeof_gateway_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.cc") + set(nvmeof_gateway_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_gateway_proto_srcs}" "${nvmeof_gateway_proto_hdrs}" "${nvmeof_gateway_grpc_srcs}" "${nvmeof_gateway_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_gateway_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_gateway_proto}" + DEPENDS "${nvmeof_gateway_proto}") + + + # Monitor Proto file + get_filename_component(nvmeof_monitor_proto "nvmeof/gateway/control/proto/monitor.proto" ABSOLUTE) + get_filename_component(nvmeof_monitor_proto_path "${nvmeof_monitor_proto}" PATH) + + # Generated sources + set(nvmeof_monitor_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.cc") + set(nvmeof_monitor_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.h") + set(nvmeof_monitor_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.cc") + set(nvmeof_monitor_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_monitor_proto_srcs}" "${nvmeof_monitor_proto_hdrs}" "${nvmeof_monitor_grpc_srcs}" "${nvmeof_monitor_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_monitor_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_monitor_proto}" + DEPENDS "${nvmeof_monitor_proto}") + + # Include generated *.pb.h files + include_directories("${CMAKE_CURRENT_BINARY_DIR}") + + set(ceph_nvmeof_srcs + ${nvmeof_gateway_proto_srcs} + ${nvmeof_gateway_proto_hdrs} + ${nvmeof_gateway_grpc_srcs} + ${nvmeof_gateway_grpc_hdrs} + ${nvmeof_monitor_proto_srcs} + ${nvmeof_monitor_proto_hdrs} + ${nvmeof_monitor_grpc_srcs} + ${nvmeof_monitor_grpc_hdrs} + ceph_nvmeof.cc + nvmeof/NVMeofGwClient.cc + nvmeof/NVMeofGwMonitorGroupClient.cc + nvmeof/NVMeofGw.cc) + add_executable(ceph-nvmeof ${ceph_nvmeof_srcs}) + add_dependencies(ceph-nvmeof ceph-common) + target_link_libraries(ceph-nvmeof + client + mon + global-static + ceph-common + ${_REFLECTION} + ${_GRPC_GRPCPP} + ) + install(TARGETS ceph-nvmeof DESTINATION bin) +endif() +# END OF NVMEOF GATEWAY MONITOR CLIENT + if(WITH_DOKAN) add_subdirectory(dokan) endif(WITH_DOKAN) diff --git a/src/ceph_nvmeof.cc b/src/ceph_nvmeof.cc new file mode 100644 index 000000000000..0e1259164842 --- /dev/null +++ b/src/ceph_nvmeof.cc @@ -0,0 +1,73 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat Inc + * + * Author: Alexander Indenbaum + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/types.h" +#include "include/compat.h" +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "common/errno.h" +#include "common/pick_address.h" +#include "global/global_init.h" + +#include "nvmeof/NVMeofGw.h" + +static void usage() +{ + std::cout << "usage: ceph-nvmeof -i [flags]\n" + << std::endl; + generic_server_usage(); +} + +/** + * A short main() which just instantiates a Nvme and + * hands over control to that. + */ +int main(int argc, const char **argv) +{ + ceph_pthread_setname(pthread_self(), "ceph-nvmeof"); + + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, // maybe later use CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); + + global_init_daemonize(g_ceph_context); + global_init_chdir(g_ceph_context); + common_init_finish(g_ceph_context); + + NVMeofGw gw(argc, argv); + int rc = gw.init(); + if (rc != 0) { + std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; + return rc; + } + + return gw.main(args); +} + diff --git a/src/common/options/mon.yaml.in b/src/common/options/mon.yaml.in index ff8813c982f9..a62999dc63dc 100644 --- a/src/common/options/mon.yaml.in +++ b/src/common/options/mon.yaml.in @@ -63,6 +63,14 @@ options: default: 30 services: - mon +- name: mon_nvmeofgw_beacon_grace + type: secs + level: advanced + desc: Period in seconds from last beacon to monitor marking a manager daemon as + failed + default: 10 + services: + - mon - name: mon_mgr_inactive_grace type: int level: advanced @@ -1338,3 +1346,22 @@ options: with_legacy: true see_also: - osd_heartbeat_use_min_delay_socket +- name: nvmf_mon_log_level + type: int + level: advanced + desc: log level of the nvmeofMon + fmt_desc: Monitor will set the log level. + default: 5 + services: + - mon + with_legacy: true +- name: nvmf_mon_mapdump + type: bool + level: advanced + desc: dump maps of nvmeofMon + fmt_desc: Monitor will dump maps + default: false + services: + - mon + with_legacy: true + diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h new file mode 100644 index 000000000000..0735e57fa149 --- /dev/null +++ b/src/messages/MNVMeofGwBeacon.h @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOFGWBEACON_H +#define CEPH_NVMEOFGWBEACON_H + +#include +#include +#include "messages/PaxosServiceMessage.h" +#include "mon/MonCommand.h" +#include "mon/NVMeofGwMap.h" + +#include "include/types.h" +class MNVMeofGwBeacon final : public PaxosServiceMessage { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +protected: + std::string gw_id; + std::string gw_pool; + std::string gw_group; + GwSubsystems subsystems; // gateway susbsystem and their state machine states + GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable + uint32_t version; + +public: + MNVMeofGwBeacon() + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION} + {} + + MNVMeofGwBeacon(const std::string &gw_id_, + const std::string& gw_pool_, + const std::string& gw_group_, + const GwSubsystems& subsystems_, + const GW_AVAILABILITY_E& availability_, + const uint32_t& version_ + ) + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}, + gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_), + availability(availability_), version(version_) + {} + + const std::string& get_gw_id() const { return gw_id; } + const std::string& get_gw_pool() const { return gw_pool; } + const std::string& get_gw_group() const { return gw_group; } + const GW_AVAILABILITY_E& get_availability() const { return availability; } + const uint32_t& get_version() const { return version; } + const GwSubsystems& get_subsystems() const { return subsystems; }; + +private: + ~MNVMeofGwBeacon() final {} + +public: + + std::string_view get_type_name() const override { return "nvmeofgwbeacon"; } + + void encode_payload(uint64_t features) override { + header.version = HEAD_VERSION; + header.compat_version = COMPAT_VERSION; + using ceph::encode; + paxos_encode(); + encode(gw_id, payload); + encode(gw_pool, payload); + encode(gw_group, payload); + encode((int)subsystems.size(), payload); + for (const NqnState& st: subsystems) { + encode(st.nqn, payload); + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + encode((int)st.sm_state[i], payload); + encode(st.opt_ana_gid, payload); + } + encode((int)availability, payload); + encode(version, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + + paxos_decode(p); + decode(gw_id, p); + decode(gw_pool, p); + decode(gw_group, p); + int n; + int tmp; + decode(n, p); + // Reserve memory for the vector to avoid reallocations + subsystems.clear(); + subsystems.reserve(n); + for (int i = 0; i < n; i++) { + std::string nqn; + decode(nqn, p); + NqnState st(nqn); + for (int j = 0; j < MAX_SUPPORTED_ANA_GROUPS; j++) { + decode(tmp, p); + st.sm_state[j] = static_cast(tmp); + } + decode(st.opt_ana_gid, p); + subsystems.push_back(st); + } + decode(tmp, p); + availability = static_cast(tmp); + decode(version, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; + + +#endif diff --git a/src/messages/MNVMeofGwMap.h b/src/messages/MNVMeofGwMap.h new file mode 100644 index 000000000000..14b1eed5a27f --- /dev/null +++ b/src/messages/MNVMeofGwMap.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MNVMEOFGWMAP_H +#define CEPH_MNVMEOFGWMAP_H + +#include "msg/Message.h" +#include "mon/NVMeofGwMap.h" + +class MNVMeofGwMap final : public Message { +protected: + NVMeofGwMap map; + +public: + const NVMeofGwMap& get_map() {return map;} + +private: + MNVMeofGwMap() : + Message{MSG_MNVMEOF_GW_MAP} {} + MNVMeofGwMap(const NVMeofGwMap &map_) : + Message{MSG_MNVMEOF_GW_MAP}, map(map_) + {} + ~MNVMeofGwMap() final {} + +public: + std::string_view get_type_name() const override { return "nvmeofgwmap"; } + void print(std::ostream& out) const override { + // ../src/messages/MNVMeofGwMap.h:40:39: error: no match for ‘operator<<’ (operand types are ‘std::basic_ostream’ and ‘const NVMeofGwMap’) + out << get_type_name() << "(map " << "should be map instance here" << ")"; + } + + void decode_payload() override { + // ../src/messages/MNVMeofGwMap.h:46:11: error: no matching function for call to ‘decode(NVMeofGwMap&, ceph::buffer::v15_2_0::list::iterator_impl&)’ + auto p = payload.cbegin(); + map.decode( p, false); + } + void encode_payload(uint64_t features) override { + //../src/messages/MNVMeofGwMap.h:51:11: error: no matching function for call to ‘encode(NVMeofGwMap&, ceph::buffer::v15_2_0::list&, uint64_t&)’ + //using ceph::encode; + //encode(map, payload, features); + map.encode(payload, false); + } +private: + using RefCountedObject::put; + using RefCountedObject::get; + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); + template + friend MURef crimson::make_message(Args&&... args); +}; + +#endif diff --git a/src/mon/CMakeLists.txt b/src/mon/CMakeLists.txt index 784b4c3ee0b3..35e27d35e85b 100644 --- a/src/mon/CMakeLists.txt +++ b/src/mon/CMakeLists.txt @@ -21,6 +21,8 @@ set(lib_mon_srcs ConnectionTracker.cc HealthMonitor.cc KVMonitor.cc + NVMeofGwMon.cc + NVMeofGwMap.cc ../mds/MDSAuthCaps.cc ../mgr/mgr_commands.cc ../osd/OSDCap.cc diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 74ef2206c02b..a680e793ecdd 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1339,8 +1339,19 @@ COMMAND("config generate-minimal-conf", "Generate a minimal ceph.conf file", "config", "r") - - +/* NVMeofGwMon*/ +COMMAND("nvme-gw create" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "create nvmeof gateway id for (pool, group)", + "nvme-gw", "rw") +COMMAND("nvme-gw delete" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "delete nvmeof gateway id for (pool, group)", + "nvme-gw", "rw") // these are tell commands that were implemented as CLI commands in // the broken pre-octopus way that we want to allow to work when a diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 349ca30a8981..9b12c2b0f7d5 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -84,6 +84,7 @@ #include "MgrStatMonitor.h" #include "ConfigMonitor.h" #include "KVMonitor.h" +#include "NVMeofGwMon.h" #include "mon/HealthMonitor.h" #include "common/config.h" #include "common/cmdparse.h" @@ -247,6 +248,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health")); paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config")); paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv")); + paxos_service[PAXOS_NVMEGW].reset(new NVMeofGwMon(*this, *paxos, "nvmeofgw")); bool r = mon_caps.parse("allow *", NULL); ceph_assert(r); @@ -3591,7 +3593,11 @@ void Monitor::handle_command(MonOpRequestRef op) mgrmon()->dispatch(op); return; } - + if (module == "nvme-gw"){ + nvmegwmon()->dispatch(op); + dout(10) << " Dispatching module " << module << " to NVMeofGwMon" << dendl; + return; + } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); @@ -4420,6 +4426,7 @@ void Monitor::_ms_dispatch(Message *m) } MonOpRequestRef op = op_tracker.create_request(m); + dout(10) << "Received message: " << op->get_req()->get_type() << dendl; bool src_is_mon = op->is_src_mon(); op->mark_event("mon:_ms_dispatch"); MonSession *s = op->get_session(); @@ -4525,6 +4532,9 @@ void Monitor::_ms_dispatch(Message *m) void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); + + dout(10) << "Received message: " << op->get_req()->get_type() << dendl; + MonSession *s = op->get_session(); ceph_assert(s); if (s->closed) { @@ -4638,6 +4648,11 @@ void Monitor::dispatch_op(MonOpRequestRef op) paxos_service[PAXOS_MGR]->dispatch(op); return; + case MSG_MNVMEOF_GW_BEACON: + paxos_service[PAXOS_NVMEGW]->dispatch(op); + return; + + // MgrStat case MSG_MON_MGR_REPORT: case CEPH_MSG_STATFS: @@ -5325,6 +5340,10 @@ void Monitor::handle_subscribe(MonOpRequestRef op) } else if (p->first.find("kv:") == 0) { kvmon()->check_sub(s->sub_map[p->first]); } + else if (p->first == "NVMeofGw") { + dout(10) << "NVMeofGw->check_sub " << dendl; + nvmegwmon()->check_sub(s->sub_map[p->first]); + } } if (reply) { diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 7f9a16a9a36c..2c4dd6d37b0f 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -712,6 +712,11 @@ class Monitor : public Dispatcher, return (class KVMonitor*) paxos_service[PAXOS_KV].get(); } + class NVMeofGwMon *nvmegwmon() { + return (class NVMeofGwMon*) paxos_service[PAXOS_NVMEGW].get(); + } + + friend class Paxos; friend class OSDMonitor; friend class MDSMonitor; diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc new file mode 100755 index 000000000000..034c1cc835da --- /dev/null +++ b/src/mon/NVMeofGwMap.cc @@ -0,0 +1,484 @@ +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "NVMeofGwMap.h" + +using std::map; +using std::make_pair; +using std::ostream; +using std::ostringstream; +using std::string; + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, this, this) +using namespace TOPNSPC::common; + +static ostream& _prefix(std::ostream *_dout, const NVMeofGwMap *h,//const Monitor &mon, + const NVMeofGwMap *map) { + return *_dout << "gw-mon." << map->mon->name << "@" << map->mon->rank; +} + +static std::string G_gw_avail[] = { + "GW_CREATED", + "GW_AVAILAB", + "GW_UNAVAIL"}; + +static std::string G_gw_ana_states[] = { + "IDLE_STATE ", + "STANDBY_STATE ", + "ACTIVE_STATE ", + "BLOCKED_OWNER ", + "WAIT_FLBACK_RDY" +}; + +int NVMeofGwMap::cfg_add_gw(const GW_ID_T &gw_id, const GROUP_KEY& group_key) { + // Calculate allocated group bitmask + bool allocated[MAX_SUPPORTED_ANA_GROUPS] = {false}; + for (auto& itr: Created_gws[group_key]) { + allocated[itr.second.ana_grp_id] = true; + if(itr.first == gw_id) { + dout(4) << __func__ << " ERROR create GW: already exists in map " << gw_id << dendl; + return -EEXIST ; + } + } + + // Allocate the new group id + for(int i=0; i<=MAX_SUPPORTED_ANA_GROUPS; i++) { + if (allocated[i] == false) { + GW_CREATED_T gw_created(i); + Created_gws[group_key][gw_id] = gw_created; + + dout(4) << __func__ << "Created GW: " << gw_id << " pool " << group_key.first << " group " << group_key.second + << " grpid " << gw_created.ana_grp_id << dendl; + return 0; + } + } + + dout(4) << __func__ << " ERROR create GW: " << gw_id << " ANA groupId was not allocated " << dendl; + return -EINVAL; +} + +int NVMeofGwMap::cfg_delete_gw(const GW_ID_T &gw_id, const GROUP_KEY& group_key) { + int rc = 0; + for (auto& nqn_gws_states: Gmap[group_key]) { + auto& nqn = nqn_gws_states.first; + auto& gw_states = nqn_gws_states.second; + auto gw_id_state_it = gw_states.find(gw_id); + if (gw_id_state_it != gw_states.end()) { + auto& state = gw_id_state_it->second; + for(int i=0; i= 2){//TODO define + fsm_handle_to_expired (gw_id, std::make_pair(pool, group), nqn, i, propose_pending); + } + } + } + } + } +} + + +int NVMeofGwMap::process_gw_map_gw_down(const GW_ID_T &gw_id, const GROUP_KEY& group_key, + const NQN_ID_T& nqn, bool &propose_pending) { + int rc = 0; + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + if (gw_state != nqn_gws_states.end()) { + dout(4) << "GW down " << gw_id << " nqn " <second; + st.availability = GW_AVAILABILITY_E::GW_UNAVAILABLE; + for (ANA_GRP_ID_T i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i ++) { + fsm_handle_gw_down (gw_id, group_key, nqn, st.sm_state[i], i, propose_pending); + st.standby_state(i); + } + } + else { + dout(4) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_id << dendl; + rc = -EINVAL; + } + return rc; +} + + +void NVMeofGwMap::process_gw_map_ka(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn , bool &propose_pending) +{ + +#define FAILBACK_PERSISTENCY_INT_SEC 8 + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + ceph_assert (gw_state != nqn_gws_states.end()); + auto& st = gw_state->second; + dout(4) << "KA beacon from the GW " << gw_id << " in state " << (int)st.availability << dendl; + + if (st.availability == GW_AVAILABILITY_E::GW_CREATED) { + // first time appears - allow IO traffic for this GW + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + if (st.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW + st.sm_state[st.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + } + propose_pending = true; + } + else if (st.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE) { + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + if (st.optimized_ana_group_id == REDUNDANT_GW_ANA_GROUP_ID) { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + propose_pending = true; //TODO try to find the 1st GW overloaded by ANA groups and start failback for ANA group that it is not an owner of + } + else { + //========= prepare to Failback to this GW ========= + // find the GW that took over on the group st.optimized_ana_group_id + bool some_found = false; + propose_pending = true; + find_failback_gw(gw_id, group_key, nqn, some_found); + if (!some_found ) { // There is start of single GW so immediately turn its group to GW_ACTIVE_STATE + dout(4) << "Warning - not found the GW responsible for" << st.optimized_ana_group_id << " that took over the GW " << gw_id << "when it was fallen" << dendl; + st.sm_state[st.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + } + } + } +} + + +void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose) +{ + propose = false; + for (auto& group_state: Gmap) { + auto& group_key = group_state.first; + auto& nqn_gws_states = group_state.second; + + for (auto& nqn_gws_state: nqn_gws_states) { + auto& nqn = nqn_gws_state.first; + auto& gws_states = nqn_gws_state.second; + dout(4) << "NQN " << nqn << dendl; + + for (auto& gw_state : gws_states) { // loop for GWs inside nqn group + auto& gw_id = gw_state.first; + GW_STATE_T& state = gw_state.second; + + //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? + if (state.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE && state.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID) { + auto found_gw_for_ana_group = false; + for (auto& gw_state2 : gws_states) { + GW_STATE_T& state2 = gw_state2.second; + if (state2.availability == GW_AVAILABILITY_E::GW_AVAILABLE && state2.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + found_gw_for_ana_group = true; // dout(4) << "Found GW " << ptr2.first << " that handles ANA grp " << (int)state->optimized_ana_group_id << dendl; + break; + } + } + if (found_gw_for_ana_group == false) { //choose the GW for handle ana group + dout(4)<< "Was not found the GW " << " that handles ANA grp " << (int)state.optimized_ana_group_id << " find candidate "<< dendl; + + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + find_failover_candidate( gw_id, group_key, nqn, i, propose ); + } + } + + //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it + else if (state.availability == GW_AVAILABILITY_E::GW_AVAILABLE + && state.optimized_ana_group_id != REDUNDANT_GW_ANA_GROUP_ID && + state.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE) + { + bool found = false; + for (auto& gw_state2 : gws_states) { + auto& state2 = gw_state2.second; + if (state2.sm_state[state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED){ + found = true; + break; + } + } + if (!found) { + dout(4) << __func__ << " GW " << gw_id << " turns to be Active for ANA group " << state.optimized_ana_group_id << dendl; + state.sm_state[state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + propose = true; + } + } + } + } + } +} + +/* + sync our sybsystems from the beacon. systems subsystems not in beacon are removed. +*/ +void NVMeofGwMap::handle_removed_subsystems (const std::vector ¤t_subsystems, const GROUP_KEY& group_key, bool &propose_pending) +{ + auto& nqn_gws_states = Gmap[group_key]; + for (auto it = nqn_gws_states.begin(); it != nqn_gws_states.end(); ) { + if (std::find(current_subsystems.begin(), current_subsystems.end(), it->first) == current_subsystems.end()) { + // Erase the susbsystem nqn if the nqn is not in the current subsystems + it = nqn_gws_states.erase(it); + } else { + // Move to the next pair + ++it; + } + } +} + +void NVMeofGwMap::set_failover_gw_for_ANA_group(const GW_ID_T &failed_gw_id, const GROUP_KEY& group_key, const GW_ID_T &gw_id, const NQN_ID_T& nqn, ANA_GRP_ID_T ANA_groupid) +{ + GW_STATE_T& gw_state = Gmap[group_key][nqn][gw_id]; + gw_state.sm_state[ANA_groupid] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + gw_state.failover_peer[ANA_groupid] = failed_gw_id; + + dout(4) << "Set failower GW " << gw_id << " for ANA group " << (int)ANA_groupid << dendl; +} + + +void NVMeofGwMap::find_failback_gw(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &some_found) +{ + bool found_some_gw = false; + bool found_candidate = false; + auto& nqn_gws_states = Gmap[group_key][nqn]; + auto& gw_state = Gmap[group_key][nqn][gw_id]; + for (auto& nqn_gw_state: nqn_gws_states) { + auto& found_gw_id = nqn_gw_state.first; + auto& st = nqn_gw_state.second; + if (st.sm_state[gw_state.optimized_ana_group_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + ceph_assert(st.failover_peer[gw_state.optimized_ana_group_id] == gw_id); + + dout(4) << "Found GW " << found_gw_id << ", nqn " << nqn << " that took over the ANAGRP " << gw_state.optimized_ana_group_id << " of the available GW " << gw_id << dendl; + st.sm_state[gw_state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED; + start_timer(found_gw_id, group_key, nqn, gw_state.optimized_ana_group_id);// Add timestamp of start Failback preparation + gw_state.sm_state[gw_state.optimized_ana_group_id] = GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER; + found_candidate = true; + + break; + } + else found_some_gw = true; + } + some_found = found_candidate |found_some_gw; + //TODO cleanup myself (gw_id) from the Block-List +} + + +// TODO When decision to change ANA state of group is prepared, need to consider that last seen FSM state is "approved" - means it was returned in beacon alone with map version +void NVMeofGwMap::find_failover_candidate(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T grpid, bool &propose_pending) +{ + // dout(4) <<__func__<< " process GW down " << gw_id << dendl; + #define ILLEGAL_GW_ID " " + #define MIN_NUM_ANA_GROUPS 0xFFF + int min_num_ana_groups_in_gw = 0; + int current_ana_groups_in_gw = 0; + GW_ID_T min_loaded_gw_id = ILLEGAL_GW_ID; + + auto& nqn_gws_states = Gmap[group_key][nqn]; + + auto gw_state = nqn_gws_states.find(gw_id); + ceph_assert(gw_state != nqn_gws_states.end()); + + // this GW may handle several ANA groups and for each of them need to found the candidate GW + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || gw_state->second.optimized_ana_group_id == grpid) { + // Find a GW that takes over the ANA group(s) + min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; + min_loaded_gw_id = ILLEGAL_GW_ID; + for (auto& found_gw_state: nqn_gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + current_ana_groups_in_gw = 0; + for (int j = 0; j < MAX_SUPPORTED_ANA_GROUPS; j++) { + if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER || st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED) { + current_ana_groups_in_gw = 0xFFFF; + break; // dont take into account GWs in the transitive state + } + else if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) + //dout(4) << " process GW down " << current_ana_groups_in_gw << dendl; + current_ana_groups_in_gw++; // how many ANA groups are handled by this GW + } + + if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) { + min_num_ana_groups_in_gw = current_ana_groups_in_gw; + min_loaded_gw_id = found_gw_state.first; + dout(4) << "choose: gw-id min_ana_groups " << min_loaded_gw_id << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; + } + } + } + if (min_loaded_gw_id != ILLEGAL_GW_ID) { + propose_pending = true; + set_failover_gw_for_ANA_group(gw_id, group_key, min_loaded_gw_id, nqn, grpid); + } + else { + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE){// not found candidate but map changed. + propose_pending = true; + dout(4) << "gw down no candidate found " << dendl; + } + } + gw_state->second.sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + } +} + + + void NVMeofGwMap::fsm_handle_gw_down(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified) + { + switch (state) + { + case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: + case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: + // nothing to do + break; + + case GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED: + cancel_timer(gw_id, group_key, nqn, grpid); + + for (auto& gw_st: Gmap[group_key][nqn]) { + auto& st = gw_st.second; + if (st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_BLOCKED_AGROUP_OWNER) { // found GW that was intended for Failback for this ana grp + dout(4) << "Warning: Outgoing Failback when GW is down back - to rollback it" << nqn <<" GW " < +#include +#include "include/encoding.h" +#include "include/utime.h" +#include "common/Formatter.h" +#include "common/ceph_releases.h" +#include "common/version.h" +#include "common/options.h" +#include "common/Clock.h" +#include "PaxosService.h" +#include "msg/Message.h" +#include "common/ceph_time.h" +#include "NVMeofGwTypes.h" + +using ceph::coarse_mono_clock; + +/*-------------------*/ +class NVMeofGwMap +{ +public: + Monitor* mon = NULL; // just for logs in the mon module file + epoch_t epoch = 0; // epoch is for Paxos synchronization mechanizm + bool delay_propose = false; + + // State: GMAP and Created_gws are sent to the clients, while Gmetadata is not + std::map Gmap; + std::map Created_gws; + std::map Gmetadata; + + int cfg_add_gw (const GW_ID_T &gw_id, const GROUP_KEY& group_key); + int cfg_delete_gw (const GW_ID_T &gw_id, const GROUP_KEY& group_key); + void process_gw_map_ka (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &propose_pending); + int process_gw_map_gw_down (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &propose_pending); + void update_active_timers (bool &propose_pending); + void handle_abandoned_ana_groups (bool &propose_pending); + void handle_removed_subsystems (const std::vector ¤t_subsystems, const GROUP_KEY& group_key, bool &propose_pending); + +private: + void fsm_handle_gw_down (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified); + void fsm_handle_gw_delete (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, GW_STATES_PER_AGROUP_E state, ANA_GRP_ID_T grpid, bool &map_modified); + void fsm_handle_to_expired (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T grpid, bool &map_modified); + + void find_failover_candidate(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T grpid, bool &propose_pending); + void find_failback_gw (const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, bool &found); + void set_failover_gw_for_ANA_group (const GW_ID_T &failed_gw_id, const GROUP_KEY& group_key, const GW_ID_T &gw_id, const NQN_ID_T& nqn, + ANA_GRP_ID_T groupid); + + void start_timer(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid); + int get_timer(const GW_ID_T &gw_id, GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid); + void cancel_timer(const GW_ID_T &gw_id, const GROUP_KEY& group_key, const NQN_ID_T& nqn, ANA_GRP_ID_T anagrpid); + +public: + void debug_encode_decode(){ + ceph::buffer::list bl; + encode(bl); + auto p = bl.cbegin(); + decode(p); + } + + void encode(ceph::buffer::list &bl, bool full_encode = true) const { + using ceph::encode; + __u8 struct_v = 0; + encode(struct_v, bl); // version + encode(epoch, bl);// global map epoch + + encode(Created_gws, bl); //Encode created GWs + encode(Gmap, bl); + if (full_encode) { + encode(Gmetadata, bl); + } + } + + void decode(ceph::buffer::list::const_iterator &bl, bool full_decode = true) { + using ceph::decode; + __u8 struct_v; + decode(struct_v, bl); + ceph_assert(struct_v == 0); + decode(epoch, bl); + + decode(Created_gws, bl); + decode(Gmap, bl); + if (full_decode) { + decode(Gmetadata, bl); + } + } +}; + +#include "NVMeofGwSerialize.h" + +#endif /* SRC_MON_NVMEOFGWMAP_H_ */ diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc new file mode 100644 index 000000000000..fb28844c4e11 --- /dev/null +++ b/src/mon/NVMeofGwMon.cc @@ -0,0 +1,567 @@ +/* + * NVMeGWMonitor.cc + * + * Created on: Oct 17, 2023 + * Author: + */ + + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + +using std::string; +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, this, this) +using namespace TOPNSPC::common; + +static std::ostream& _prefix(std::ostream *_dout, const NVMeofGwMon *h,//const Monitor &mon, + const NVMeofGwMon *hmon) { + return *_dout << "gw-mon." << hmon->mon.name << "@" << hmon->mon.rank; +} +#define MY_MON_PREFFIX " NVMeGW " + +void NVMeofGwMon::init(){ + dout(4) << MY_MON_PREFFIX << __func__ << "called " << dendl; +} + +void NVMeofGwMon::on_restart(){ + dout(4) << MY_MON_PREFFIX << __func__ << "called " << dendl; + last_beacon.clear(); + last_tick = ceph::coarse_mono_clock::now(); +} + +void NVMeofGwMon::on_shutdown() {} + +static int cnt ; +#define start_cnt 6 +void NVMeofGwMon::inject1(){ + //bool propose = false; + if( ++cnt == 4 ){// simulation that new configuration was added + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NONCE_VECTOR_T new_nonces = {"abc", "def","hij"}; + //ANA_GRP_ID_T grp = 1; + //pending_map.update_gw_nonce("GW1.g1.p1", grp, new_nonces); + pending_map.Created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + + // pending_map.update_gw_nonce("GW1.g1.p1", grp, new_nonces); + pending_map.Created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + GW_STATE_T gst1(1); + std::string nqn1 = "nqn2008.node1"; + pending_map.Gmap[group_key][nqn1]["GW2"] = gst1; + + GW_STATE_T gst2(2); + pending_map.Gmap[group_key][nqn1]["GW3"] = gst2; + dout(4) << pending_map << dendl; + + + pending_map.debug_encode_decode(); + dout(4) << "Dump map after decode encode:" <("mon_nvmeofgw_beacon_grace"); + dout(4) << MY_MON_PREFFIX << __func__ << "NVMeofGwMon leader got a real tick, pending epoch "<< pending_map.epoch << dendl; + + const auto mgr_tick_period = g_conf().get_val("mgr_tick_period"); + + if (last_tick != ceph::coarse_mono_clock::zero() + && (now - last_tick > (nvmegw_beacon_grace - mgr_tick_period))) { + // This case handles either local slowness (calls being delayed + // for whatever reason) or cluster election slowness (a long gap + // between calls while an election happened) + dout(4) << __func__ << ": resetting beacon timeouts due to mon delay " + "(slow election?) of " << now - last_tick << " seconds" << dendl; + for (auto &i : last_beacon) { + i.second = now; + } + } + + last_tick = now; + bool propose = false; + + pending_map.update_active_timers(propose); // Periodic: check active FSM timers + _propose_pending |= propose; + + + //TODO handle exception of tick overdued in order to avoid false detection of overdued beacons , see MgrMonitor::tick + + const auto cutoff = now - nvmegw_beacon_grace; + for(auto &itr : last_beacon){// Pass over all the stored beacons + auto& lb = itr.first; + auto last_beacon_time = itr.second; + if(last_beacon_time < cutoff){ + dout(4) << "beacon timeout for GW " << lb.gw_id << " nqn " << lb.nqn << dendl; + pending_map.process_gw_map_gw_down( lb.gw_id, lb.group_key, lb.nqn, propose); + _propose_pending |= propose; + last_beacon.erase(lb); + } + else { + dout(4) << "beacon live for GW key: " << lb.gw_id << " nqn " << lb.nqn << dendl; + } + } + + pending_map.handle_abandoned_ana_groups(propose); // Periodic: take care of not handled ANA groups + _propose_pending |= propose; + + if(_propose_pending){ + //pending_map.delay_propose = true; // not to send map to clients immediately in "update_from_paxos" + dout(4) << "decision to delayed_map" < &changed) +{ + dout(4) << __func__ << " " << changed << dendl; + + if (changed.count("nvmef_gw_mapdump")) { + dout(4) << "pending_map " << pending_map << dendl; + } + if (changed.count("nvmf_mon_log_level")){ + dout(4) << "TODO SET LOG LEVEL >= " << g_conf()->nvmf_mon_log_level << dendl; + } +} + +void NVMeofGwMon::create_pending(){ + + pending_map = map;// deep copy of the object + // TODO since "pending_map" can be reset each time during paxos re-election even in the middle of the changes ... + pending_map.epoch++; + dout(4) << MY_MON_PREFFIX << __func__ << " pending " << pending_map << dendl; +} + +void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t){ + + dout(4) << MY_MON_PREFFIX << __func__ << dendl; + bufferlist bl; + pending_map.encode(bl); + put_version(t, pending_map.epoch, bl); + put_last_committed(t, pending_map.epoch); +} + +void NVMeofGwMon::update_from_paxos(bool *need_bootstrap){ + version_t version = get_last_committed(); + + //dout(4) << MY_MON_PREFFIX << __func__ << " version " << version << " map.epoch " << map.epoch << dendl; + + if (version != map.epoch) { + dout(4) << " NVMeGW loading version " << version << " " << map.epoch << dendl; + + bufferlist bl; + int err = get_version(version, bl); + ceph_assert(err == 0); + + auto p = bl.cbegin(); + map.decode(p); + if(!mon.is_leader()) { + dout(4) << "leader map: " << map << dendl; + } + check_subs(true); + + } +} + +void NVMeofGwMon::check_sub(Subscription *sub) +{ + /* MgrMonitor::check_sub*/ + //if (sub->type == "NVMeofGw") { + dout(4) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl; + if (sub->next <= map.epoch) + { + dout(4) << "Sending map to subscriber " << sub->session->con << " " << sub->session->con->get_peer_addr() << dendl; + sub->session->con->send_message2(make_message(map)); + + if (sub->onetime) { + mon.session_map.remove_sub(sub); + } else { + sub->next = map.epoch + 1; + } + } +} + +void NVMeofGwMon::check_subs(bool t) +{ + const std::string type = "NVMeofGw"; + dout(4) << MY_MON_PREFFIX << __func__ << " count " << mon.session_map.subs.count(type) << dendl; + + if (mon.session_map.subs.count(type) == 0){ + return; + } + for (auto sub : *(mon.session_map.subs[type])) { + dout(4) << "sub-type "<< sub->type << " delay_propose until next tick" << t << dendl; + if (t) map.delay_propose = true; + else check_sub(sub); + } +} + +bool NVMeofGwMon::preprocess_query(MonOpRequestRef op){ + dout(4) << MY_MON_PREFFIX <<__func__ << dendl; + + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return preprocess_beacon(op); + + case MSG_MON_COMMAND: + try { + return preprocess_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return true; + } + + default: + mon.no_reply(op); + derr << "Unhandled message type " << m->get_type() << dendl; + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_update(MonOpRequestRef op){ + dout(4) << MY_MON_PREFFIX <<__func__ << dendl; + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return prepare_beacon(op); + + case MSG_MON_COMMAND: + try { + return prepare_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return false; /* nothing to propose! */ + } + + default: + mon.no_reply(op); + derr << "Unhandled message type " << m->get_type() << dendl; + return false; /* nothing to propose! */ + } + return true; +} + +bool NVMeofGwMon::preprocess_command(MonOpRequestRef op) +{ + dout(4) << MY_MON_PREFFIX << __func__ << dendl; + auto m = op->get_req(); + std::stringstream ss; + bufferlist rdata; + + cmdmap_t cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) + { + string rs = ss.str(); + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + MonSession *session = op->get_session(); + if (!session) + { + mon.reply_command(op, -EACCES, "access denied", rdata, + get_last_committed()); + return true; + } + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + string prefix; + cmd_getval(cmdmap, "prefix", prefix); + dout(4) << "MonCommand : "<< prefix << dendl; + // TODO need to check formatter per preffix - if f is NULL + + return false; +} + +bool NVMeofGwMon::prepare_command(MonOpRequestRef op) +{ + dout(4) << MY_MON_PREFFIX << __func__ << dendl; + auto m = op->get_req(); + int rc; + std::stringstream ss; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) + { + string rs = ss.str(); + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + MonSession *session = op->get_session(); + if (!session) + { + mon.reply_command(op, -EACCES, "access denied", rdata, get_last_committed()); + return true; + } + + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + const auto prefix = cmd_getval_or(cmdmap, "prefix", string{}); + + dout(4) << "MonCommand : "<< prefix << dendl; + if( prefix == "nvme-gw create" || prefix == "nvme-gw delete" ) { + std::string id, pool, group; + + cmd_getval(cmdmap, "id", id); + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + + if(prefix == "nvme-gw create"){ + rc = pending_map.cfg_add_gw(id, group_key); + ceph_assert(rc!= -EINVAL); + } + else{ + rc = pending_map.cfg_delete_gw(id, group_key); + ceph_assert(rc!= -EINVAL); + } + if(rc != -EEXIST){ + propose_pending(); + goto update; + } + else { + goto reply_no_propose; + } + } + + reply_no_propose: + getline(ss, rs); + if (err < 0 && rs.length() == 0) + { + rs = cpp_strerror(err); + dout(4) << "Error command err : "<< err << " rs-len: " << rs.length() << dendl; + } + mon.reply_command(op, err, rs, rdata, get_last_committed()); + return false; /* nothing to propose */ + + update: + getline(ss, rs); + wait_for_commit(op, new Monitor::C_Command(mon, op, 0, rs, + get_last_committed() + 1)); + return true; +} + + +bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op){ + dout(4) << MY_MON_PREFFIX <<__func__ << dendl; + auto m = op->get_req(); + mon.no_reply(op); // we never reply to beacons + dout(4) << "beacon from " << m->get_type() << dendl; + MonSession *session = op->get_session(); + if (!session){ + dout(4) << "beacon no session " << dendl; + return true; + } + + return false; // allways return false to call leader's prepare beacon +} + + +#define BYPASS_GW_CREATE_CLI + +bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ + dout(4) << MY_MON_PREFFIX <<__func__ << dendl; + auto m = op->get_req(); + + dout(4) << "availability " << m->get_availability() << " GW : " <get_gw_id() << " subsystems " << m->get_subsystems() << " epoch " << m->get_version() << dendl; + + GW_ID_T gw_id = m->get_gw_id(); + GROUP_KEY group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); + GW_AVAILABILITY_E avail = m->get_availability(); + const GwSubsystems& subsystems = m->get_subsystems(); + bool propose = false; + ANA_GRP_ID_T ana_grp_id = 0; + std::vector configured_subsystems; + + if (avail == GW_AVAILABILITY_E::GW_CREATED){ + // in this special state GWs receive map with just "created_gws" vector + auto& created_gw = pending_map.Created_gws[group_key][gw_id]; + if(created_gw.ana_grp_id == ana_grp_id) {// GW is created administratively + dout(4) << "GW " << gw_id << " sent beacon being in state GW_WAIT_INITIAL_MAP" << dendl; + propose = true; + } + else{ + dout(4) << "GW " << gw_id << " sent beacon being in state GW_WAIT_INITIAL_MAP but it is not created yet!!! "<< dendl; +#ifdef BYPASS_GW_CREATE_CLI + pending_map.cfg_add_gw(gw_id ,group_key); + dout(4) << "GW " << gw_id << " created since mode is bypass-create-cli "<< dendl; + propose= true; +#endif + } + goto set_propose; + } + + // Validation gw is in the database + for (const NqnState &st : subsystems) + { + auto& nqn_gws_states = pending_map.Gmap[group_key][st.nqn]; + auto gw_state = nqn_gws_states.find(gw_id); + if (gw_state == nqn_gws_states.end()) + { + dout(4) << "GW + NQN pair is not in the database: " << gw_id << " " << st.nqn << dendl; + // if GW is created + auto& group_gws = pending_map.Created_gws[group_key]; + auto gw_state = group_gws.find(gw_id); + if (gw_state != group_gws.end()) { + GW_STATE_T gst(ana_grp_id); + pending_map.Gmap[group_key][st.nqn][gw_id] = gst; + GW_METADATA_T md; + pending_map.Gmetadata[group_key][st.nqn][gw_id] = md; + } + else { + //drop beacon on the floor silently discard + return 0; + } + } + configured_subsystems.push_back(st.nqn); + } + pending_map.handle_removed_subsystems( configured_subsystems, group_key, propose ); + + if(avail == GW_AVAILABILITY_E::GW_AVAILABLE) + { + auto now = ceph::coarse_mono_clock::now(); + // check pending_map.epoch vs m->get_version() - if different - drop the beacon + + for (const NqnState& st: subsystems) { + LastBeacon lb = { gw_id, group_key, st.nqn }; + last_beacon[lb] = now; + pending_map.process_gw_map_ka( gw_id, group_key, st.nqn, propose ); + } + } + else if(avail == GW_AVAILABILITY_E::GW_UNAVAILABLE){ // state set by GW client application + // TODO: remove from last_beacon if found . if gw was found in last_beacon call process_gw_map_gw_down + + for (const NqnState& st: subsystems) { + LastBeacon lb = { gw_id, group_key, st.nqn }; + + auto it = last_beacon.find(lb); + if (it != last_beacon.end()){ + last_beacon.erase(lb); + pending_map.process_gw_map_gw_down( gw_id, group_key, st.nqn, propose ); + } + } + } +set_propose: + if (propose){ + dout(4) << "decision to delayed_map in prepare_beacon" < last_beacon; + + + // when the mon was not updating us for some period (e.g. during slow + // election) to reset last_beacon timeouts + ceph::coarse_mono_clock::time_point last_tick; + + std::vector command_descs; + std::vector pending_command_descs; + +public: + NVMeofGwMon(Monitor &mn, Paxos &p, const std::string& service_name) + : PaxosService(mn, p, service_name) {map.mon = &mn; } + ~NVMeofGwMon() override {} + + + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override; + + // 3 pure virtual methods of the paxosService + void create_initial()override{}; + void create_pending()override ; + void encode_pending(MonitorDBStore::TransactionRef t)override ; + + void init() override; + void on_shutdown() override; + void on_restart() override; + void update_from_paxos(bool *need_bootstrap) override; + + + bool preprocess_query(MonOpRequestRef op) override; + bool prepare_update(MonOpRequestRef op) override; + + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + + void encode_full(MonitorDBStore::TransactionRef t) override { } + + bool preprocess_beacon(MonOpRequestRef op); + bool prepare_beacon(MonOpRequestRef op); + + void tick() override; + + void print_summary(ceph::Formatter *f, std::ostream *ss) const; + + void check_subs(bool type); + void check_sub(Subscription *sub); +private: + void inject1(); + void get_gw_and_nqn_from_key(std::string key, GW_ID_T &gw_id , std::string& nqn); + +}; + +#endif /* SRC_MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h new file mode 100755 index 000000000000..bb90b733dfe0 --- /dev/null +++ b/src/mon/NVMeofGwSerialize.h @@ -0,0 +1,321 @@ +/* + * NVMeofGwSerialize.h + * + * Created on: Dec 29, 2023 + */ + +#ifndef MON_NVMEOFGWSERIALIZE_H_ +#define MON_NVMEOFGWSERIALIZE_H_ + +inline void encode(const GW_STATE_T& state, ceph::bufferlist &bl) { + for(int i = 0; i & created_gws, ceph::bufferlist &bl) { + encode (created_gws.size(), bl); // number of groups + for (auto& group_gws: created_gws) { + auto& group_key = group_gws.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + auto& gws = group_gws.second; + encode (gws, bl); // encode group gws + } +} + +inline void decode(std::map& created_gws, ceph::buffer::list::const_iterator &bl) { + created_gws.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i + } +} + +inline void decode(GWMAP& nqn_gws_states, ceph::buffer::list::const_iterator &bl) { + size_t num_subsystems; + + decode(num_subsystems, bl); + SUBSYST_GWMAP gw_map; + nqn_gws_states.clear(); + + for (size_t i = 0; i < num_subsystems; i++) { + std::string nqn; + decode(nqn, bl); + SUBSYST_GWMAP gw_map; + decode(gw_map, bl); + nqn_gws_states[nqn] = gw_map; + } +} + + +inline void encode(const std::map& gmap, ceph::bufferlist &bl) { + encode (gmap.size(), bl); // number of groups + for (auto& group_state: gmap) { + auto& group_key = group_state.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_state.second, bl); + } +} + +inline void decode(std::map& gmap, ceph::buffer::list::const_iterator &bl) { + gmap.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i& gmetadata, ceph::bufferlist &bl) { + encode (gmetadata.size(), bl); // number of groups + for (auto& group_md: gmetadata) { + auto& group_key = group_md.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_md.second, bl); + } +} + +inline void decode(std::map& gmetadata, ceph::buffer::list::const_iterator &bl) { + gmetadata.clear(); + size_t ngroups; + decode(ngroups, bl); + for(size_t i = 0; i "; + for (auto& gw_state: nqn_state.second) { + os << "\n (gw-mon) { gw_id: " << gw_state.first << " -> " << gw_state.second << "}"; + } + os << "}"; + } + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) { + os << "NVMeofGwMap [ Gmap: "; + for (auto& group_state: value.Gmap) { + os << " { " << group_state.first << " } -> { " << group_state.second << " }"; + } + os << " ] \n (gw-mon)[ Created_gws: "; + for (auto& group_gws: value.Created_gws) { + os << " { " << group_gws.first << " } -> { "; + for (auto& gw: group_gws.second) { + os << " \n (gw-mon) { gw_id " << gw.first << " } -> { ana-grp-id:" << gw.second.ana_grp_id << " nonces : " ; // << " }"; + // dump nonces map + if(gw.second.nonce_map.size()) + for( auto &nonce_map : gw.second.nonce_map){ + os << "ana_grp: " << nonce_map.first << " [ " ; + for (auto & nonces : nonce_map.second){ + os << nonces << " "; + } + os << "]" ; + } + + } + os << " }"; + } + os << "]"; + return os; +} +#endif /* SRC_MON_NVMEOFGWSERIALIZEP_H_ */ diff --git a/src/mon/NVMeofGwTypes.h b/src/mon/NVMeofGwTypes.h new file mode 100755 index 000000000000..4a759eeacb77 --- /dev/null +++ b/src/mon/NVMeofGwTypes.h @@ -0,0 +1,108 @@ +/* + * NVMeofGwTypes.h + * + * Created on: Dec 29, 2023 + */ + +#ifndef MON_NVMEOFGWTYPES_H_ +#define MON_NVMEOFGWTYPES_H_ +#include +#include +#include +#include + +using GW_ID_T = std::string; +using GROUP_KEY = std::pair; +using NQN_ID_T = std::string; +using ANA_GRP_ID_T = uint32_t; + + +enum class GW_STATES_PER_AGROUP_E { + GW_IDLE_STATE = 0, //invalid state + GW_STANDBY_STATE, + GW_ACTIVE_STATE, + GW_BLOCKED_AGROUP_OWNER, + GW_WAIT_FAILBACK_PREPARED +}; + +enum class GW_AVAILABILITY_E { + GW_CREATED = 0, + GW_AVAILABLE, + GW_UNAVAILABLE, + GW_DELETED +}; + +#define MAX_SUPPORTED_ANA_GROUPS 16 +#define INVALID_GW_TIMER 0xffff +#define REDUNDANT_GW_ANA_GROUP_ID 0xFF + +typedef GW_STATES_PER_AGROUP_E SM_STATE[MAX_SUPPORTED_ANA_GROUPS]; + +struct NqnState { + std::string nqn; // subsystem NQN + SM_STATE sm_state; // susbsystem's state machine state + uint16_t opt_ana_gid; // optimized ANA group index + + // Default constructor + NqnState(const std::string& _nqn) : nqn(_nqn), opt_ana_gid(0) { + for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + sm_state[i] = GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; + } +}; + +typedef std::vector GwSubsystems; + +struct GW_STATE_T { + SM_STATE sm_state; // state machine states per ANA group + GW_ID_T failover_peer[MAX_SUPPORTED_ANA_GROUPS]; + ANA_GRP_ID_T optimized_ana_group_id; // optimized ANA group index as configured by Conf upon network entry, note for redundant GW it is FF + GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable + uint64_t version; // version per all GWs of the same subsystem. subsystem version + + GW_STATE_T(ANA_GRP_ID_T id): + optimized_ana_group_id(id), + availability(GW_AVAILABILITY_E::GW_CREATED), + version(0) + { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + sm_state[i] = GW_STATES_PER_AGROUP_E::GW_IDLE_STATE; + }; + + GW_STATE_T() : GW_STATE_T(REDUNDANT_GW_ANA_GROUP_ID) {}; + + void standby_state(ANA_GRP_ID_T grpid) { + sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + failover_peer[grpid] = ""; + }; +}; + +struct GW_METADATA_T { + int anagrp_sm_tstamps[MAX_SUPPORTED_ANA_GROUPS]; // statemachine timer(timestamp) set in some state + + GW_METADATA_T() { + for (int i=0; i >; +using GWMETADATA = std::map >; +using SUBSYST_GWMAP = std::map; +using SUBSYST_GWMETA = std::map; + +using NONCE_VECTOR_T = std::vector; +using GW_ANA_NONCE_MAP = std::map ; + + +struct GW_CREATED_T { + ANA_GRP_ID_T ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id + GW_ANA_NONCE_MAP nonce_map; + + GW_CREATED_T(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}; + GW_CREATED_T(ANA_GRP_ID_T id): ana_grp_id(id) {}; +}; + +using GW_CREATED_MAP = std::map; + +#endif /* SRC_MON_NVMEOFGWTYPES_H_ */ diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index cce9976f3c35..7f5ed911dacb 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -36,6 +36,7 @@ enum { PAXOS_HEALTH, PAXOS_CONFIG, PAXOS_KV, + PAXOS_NVMEGW, PAXOS_NUM }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 70ac4ad13389..e8be64f63139 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -217,6 +217,9 @@ #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + #ifdef WITH_BLKIN #include "Messenger.h" #endif @@ -875,6 +878,10 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_BEACON: + m = make_message(); + break; + case MSG_MON_MGR_REPORT: m = make_message(); break; @@ -934,6 +941,9 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_MAP: + m = make_message(); + break; // -- simple messages without payload -- case CEPH_MSG_SHUTDOWN: diff --git a/src/msg/Message.h b/src/msg/Message.h index 40833744b67d..62f27109dce2 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -227,7 +227,7 @@ #define MSG_MGR_MAP 0x704 // *** ceph-mon(MgrMonitor) -> ceph-mgr -#define MSG_MGR_DIGEST 0x705 +#define MSG_MGR_DIGEST 0x705 // *** cephmgr -> ceph-mon #define MSG_MON_MGR_REPORT 0x706 #define MSG_SERVICE_MAP 0x707 @@ -237,7 +237,13 @@ #define MSG_MGR_COMMAND_REPLY 0x70a // *** ceph-mgr <-> MON daemons *** -#define MSG_MGR_UPDATE 0x70b +#define MSG_MGR_UPDATE 0x70b + +// *** nvmeof mon -> gw daemons *** +#define MSG_MNVMEOF_GW_MAP 0x800 + +// *** gw daemons -> nvmeof mon *** +#define MSG_MNVMEOF_GW_BEACON 0x801 // ====================================================== diff --git a/src/nvmeof/NVMeofGw.cc b/src/nvmeof/NVMeofGw.cc new file mode 100644 index 000000000000..0b97a45e92a5 --- /dev/null +++ b/src/nvmeof/NVMeofGw.cc @@ -0,0 +1,380 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include + +#include "common/errno.h" +#include "common/signal.h" +#include "common/ceph_argparse.h" +#include "include/compat.h" + +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + + +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" +#include "NVMeofGw.h" +#include "NVMeofGwClient.h" +#include "NVMeofGwMonitorGroupClient.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +using std::map; +using std::string; +using std::stringstream; +using std::vector; + +NVMeofGw::NVMeofGw(int argc, const char **argv) : + Dispatcher(g_ceph_context), + monc{g_ceph_context, poolctx}, + client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())), + objecter{g_ceph_context, client_messenger.get(), &monc, poolctx}, + client{client_messenger.get(), &monc, &objecter}, + finisher(g_ceph_context, "Nvmeof", "nvme-fin"), + timer(g_ceph_context, lock), + orig_argc(argc), + orig_argv(argv) +{ +} + +NVMeofGw::~NVMeofGw() = default; + +const char** NVMeofGw::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + NULL + }; + return KEYS; +} + +int NVMeofGw::init() +{ + dout(0) << dendl; + std::string val; + auto args = argv_to_vec(orig_argc, orig_argv); + + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-name", (char*)NULL)) { + name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-pool", (char*)NULL)) { + pool = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-group", (char*)NULL)) { + group = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-address", (char*)NULL)) { + gateway_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--monitor-address", (char*)NULL)) { + monitor_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-key", (char*)NULL)) { + server_key = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-cert", (char*)NULL)) { + server_cert = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-cert", (char*)NULL)) { + client_cert = val; + } else { + ++i; + } + } + + dout(0) << "gateway name: " << name << + " pool:" << pool << + " group:" << group << + " address: " << gateway_address << dendl; + ceph_assert(name != "" && pool != "" && gateway_address != "" && monitor_address != ""); + + // todo + ceph_assert(server_key == "" && server_cert == "" && client_cert == ""); + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + + std::lock_guard l(lock); + + // Start finisher + finisher.start(); + + // Initialize Messenger + client_messenger->add_dispatcher_tail(this); + client_messenger->add_dispatcher_head(&objecter); + client_messenger->add_dispatcher_tail(&client); + client_messenger->start(); + + poolctx.start(2); + + // Initialize MonClient + if (monc.build_initial_monmap() < 0) { + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc.sub_want("NVMeofGw", 0, 0); + + monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc.set_messenger(client_messenger.get()); + + // We must register our config callback before calling init(), so + // that we see the initial configuration message + monc.register_config_callback([this](const std::string &k, const std::string &v){ + // removing value to hide sensitive data going into mgr logs + // leaving this for debugging purposes + dout(10) << "nvmeof config_callback: " << k << " : " << v << dendl; + + return false; + }); + monc.register_config_notify_callback([this]() { + dout(4) << "nvmeof monc config notify callback" << dendl; + }); + dout(4) << "nvmeof Registered monc callback" << dendl; + + int r = monc.init(); + if (r < 0) { + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(0) << "nvmeof Registered monc callback" << dendl; + + r = monc.authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify an ID with a valid keyring?" << dendl; + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(0) << "monc.authentication done" << dendl; + // only forward monmap updates after authentication finishes, otherwise + // monc.authenticate() will be waiting for MgrStandy::ms_dispatch() + // to acquire the lock forever, as it is already locked in the beginning of + // this method. + monc.set_passthrough_monmap(); + + client_t whoami = monc.get_global_id(); + client_messenger->set_myname(entity_name_t::MGR(whoami.v)); + objecter.set_client_incarnation(0); + objecter.init(); + objecter.start(); + client.init(); + timer.init(); + + tick(); + + dout(0) << "Complete." << dendl; + return 0; +} + +void NVMeofGw::send_beacon() +{ + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + //dout(0) << "sending beacon as gid " << monc.get_global_id() << dendl; + GW_AVAILABILITY_E gw_availability = GW_AVAILABILITY_E::GW_CREATED; + GwSubsystems subs; + if (map.epoch > 0) { // handled map already + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, grpc::InsecureChannelCredentials())); + subsystems_info gw_subsystems; + bool ok = gw_client.get_subsystems(gw_subsystems); + if (ok) { + for (int i = 0; i < gw_subsystems.subsystems_size(); i++) { + const subsystem& sub = gw_subsystems.subsystems(i); + struct NqnState nqn_state(sub.nqn()); + auto group_key = std::make_pair(pool, group); + GW_STATE_T& gw_state = map.Gmap[group_key][nqn_state.nqn][name]; + nqn_state.opt_ana_gid = gw_state.optimized_ana_group_id; + for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + nqn_state.sm_state[i] = gw_state.sm_state[i]; + subs.push_back(nqn_state); + } + } + gw_availability = ok ? GW_AVAILABILITY_E::GW_AVAILABLE : GW_AVAILABILITY_E::GW_UNAVAILABLE; + } + dout(0) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << dendl; + auto m = ceph::make_message( + name, + pool, + group, + subs, + gw_availability, + map.epoch); + monc.send_mon_message(std::move(m)); +} + +void NVMeofGw::tick() +{ + dout(0) << dendl; + send_beacon(); + + timer.add_event_after( + g_conf().get_val("mgr_tick_period").count(), + new LambdaContext([this](int r){ + tick(); + } + )); +} + +void NVMeofGw::shutdown() +{ + finisher.queue(new LambdaContext([&](int) { + std::lock_guard l(lock); + + dout(4) << "nvmeof Shutting down" << dendl; + + + // stop sending beacon first, I use monc to talk with monitors + timer.shutdown(); + // client uses monc and objecter + client.shutdown(); + // Stop asio threads, so leftover events won't call into shut down + // monclient/objecter. + poolctx.finish(); + // stop monc, so mon won't be able to instruct me to shutdown/activate after + // the active_mgr is stopped + monc.shutdown(); + + // objecter is used by monc and active_mgr + objecter.shutdown(); + // client_messenger is used by all of them, so stop it in the end + client_messenger->shutdown(); + })); + + // Then stop the finisher to ensure its enqueued contexts aren't going + // to touch references to the things we're about to tear down + finisher.wait_for_empty(); + finisher.stop(); +} + +void NVMeofGw::handle_nvmeof_gw_map(ceph::ref_t mmap) +{ + dout(0) << "handle nvmeof gw map" << dendl; + auto &mp = mmap->get_map(); + dout(0) << "received map epoch " << mp.epoch << dendl; + dout(0) << "mp " << mp << dendl; + ana_info ai; + auto group_key = std::make_pair(pool, group); + if (map.epoch == 0){ // initial map + auto group_gws = mp.Created_gws.find(group_key); + if (group_gws == mp.Created_gws.end()) { + dout(0) << "Failed to find group key " << group_key << "created gw for " << name << dendl; + return; + } + auto gw = group_gws->second.find(name); + if(gw == group_gws->second.end()) + { + dout(0) << "Failed to find created gw for " << name << dendl; + return; + } + bool set_group_id = false; + while (!set_group_id) { + NVMeofGwMonitorGroupClient monitor_group_client( + grpc::CreateChannel(monitor_address, grpc::InsecureChannelCredentials())); + dout(0) << "GRPC set_group_id: " << gw->second.ana_grp_id << dendl; + set_group_id = monitor_group_client.set_group_id( gw->second.ana_grp_id); + if (!set_group_id) { + dout(0) << "GRPC set_group_id failed" << dendl; + usleep(1000); // TODO: conf options + } + } + } + + // Interate over NQNs + auto subsystems = mp.Gmap.find(group_key); + if (subsystems == mp.Gmap.end()) { + dout(0) << "Gmap find failed for group_key " << group_key << dendl; + } else { + for (const auto& subsystemPair: subsystems->second) { + const auto& nqn = subsystemPair.first; + const auto& idStateMap = subsystemPair.second; + nqn_ana_states nas; + nas.set_nqn(nqn); + + // This gateway state for the current subsystem / nqn + const auto& new_gateway_state = idStateMap.find(name); + + // There is no subsystem update for this gateway + if (new_gateway_state == idStateMap.end()) continue; + + // Previously monitor distributed state + GW_STATE_T& old_gw_state = map.Gmap[group_key][nqn][name]; + + // Iterate over possible ANA Groups + for (ANA_GRP_ID_T ana_grp_index = 0; ana_grp_index < MAX_SUPPORTED_ANA_GROUPS; ana_grp_index++) { + ana_group_state gs; + gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 + + // There is no state change for this ANA Group + auto old_state = old_gw_state.sm_state[ana_grp_index]; + if (old_state == new_gateway_state->second.sm_state[ana_grp_index]) continue; + + // detect was active, but not any more transition + if ((old_state == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || old_state == GW_STATES_PER_AGROUP_E::GW_IDLE_STATE ) && + new_gateway_state->second.sm_state[ana_grp_index] != GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + gs.set_state(INACCESSIBLE); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " INACCESSIBLE" <second.sm_state[ana_grp_index] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + gs.set_state(OPTIMIZED); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(0) << "nqn: " << nqn << " grpid " << (ana_grp_index + 1) << " OPTIMIZED" <Add(std::move(nas)); + } + } + if (ai.states_size()) { + bool set_ana_state = false; + while (!set_ana_state) { + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, grpc::InsecureChannelCredentials())); + set_ana_state = gw_client.set_ana_state(ai); + if (!set_ana_state) { + dout(0) << "GRPC set_ana_state failed" << dendl; + usleep(1000); // TODO conf option + } + } + } + map = mp; +} + +bool NVMeofGw::ms_dispatch2(const ref_t& m) +{ + std::lock_guard l(lock); + dout(0) << "got map type " << m->get_type() << dendl; + + if (m->get_type() == MSG_MNVMEOF_GW_MAP) { + handle_nvmeof_gw_map(ref_cast(m)); + } + bool handled = false; + return handled; +} + +int NVMeofGw::main(vector args) +{ + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGHUP, sighup_handler); + shutdown_async_signal_handler(); + + return 0; +} diff --git a/src/nvmeof/NVMeofGw.h b/src/nvmeof/NVMeofGw.h new file mode 100644 index 000000000000..af5844c82f8a --- /dev/null +++ b/src/nvmeof/NVMeofGw.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef NVMEOFGW_H_ +#define NVMEOFGW_H_ + +#include "auth/Auth.h" +#include "common/async/context_pool.h" +#include "common/Finisher.h" +#include "common/Timer.h" +#include "common/LogClient.h" + +#include "client/Client.h" +#include "mon/MonClient.h" +#include "osdc/Objecter.h" +#include "messages/MNVMeofGwMap.h" + +class NVMeofGw : public Dispatcher, + public md_config_obs_t { +private: + std::string name; + std::string pool; + std::string group; + std::string gateway_address; + std::string monitor_address; + std::string server_key; + std::string server_cert; + std::string client_cert; + +protected: + ceph::async::io_context_pool poolctx; + MonClient monc; + std::unique_ptr client_messenger; + Objecter objecter; + Client client; + NVMeofGwMap map; + ceph::mutex lock = ceph::make_mutex("NVMeofGw::lock"); + Finisher finisher; + SafeTimer timer; + + int orig_argc; + const char **orig_argv; + + void send_config_beacon(); + void send_beacon(); + +public: + NVMeofGw(int argc, const char **argv); + ~NVMeofGw() override; + + // Dispatcher interface + bool ms_dispatch2(const ceph::ref_t& m) override; + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; }; + + // config observer bits + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override {}; + + int init(); + void shutdown(); + int main(std::vector args); + void tick(); + + void handle_nvmeof_gw_map(ceph::ref_t m); +}; + +#endif + diff --git a/src/nvmeof/NVMeofGwClient.cc b/src/nvmeof/NVMeofGwClient.cc new file mode 100644 index 000000000000..977e69f0ae28 --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.cc @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwClient.h" + +bool NVMeofGwClient::get_subsystems(subsystems_info& reply) { + get_subsystems_req request; + ClientContext context; + + Status status = stub_->get_subsystems(&context, request, &reply); + + return status.ok(); +} + +bool NVMeofGwClient::set_ana_state(const ana_info& info) { + req_status reply; + ClientContext context; + + Status status = stub_->set_ana_state(&context, info, &reply); + + return status.ok() && reply.status(); +} diff --git a/src/nvmeof/NVMeofGwClient.h b/src/nvmeof/NVMeofGwClient.h new file mode 100644 index 000000000000..dcb16c65f76c --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWCLIENT_H__ +#define __NVMEOFGWCLIENT_H__ +#include +#include +#include + +#include + +#include "gateway.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwClient { + public: + NVMeofGwClient(std::shared_ptr channel) + : stub_(Gateway::NewStub(channel)) {} + + bool get_subsystems(subsystems_info& reply); + bool set_ana_state(const ana_info& info); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.cc b/src/nvmeof/NVMeofGwMonitorGroupClient.cc new file mode 100644 index 000000000000..cbea04e66f48 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwMonitorGroupClient.h" + +bool NVMeofGwMonitorGroupClient::set_group_id(const uint32_t& id) { + group_id_req request; + request.set_id(id); + google::protobuf::Empty reply; + ClientContext context; + + Status status = stub_->group_id(&context, request, &reply); + + return status.ok(); +} diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.h b/src/nvmeof/NVMeofGwMonitorGroupClient.h new file mode 100644 index 000000000000..f4ca4c4f3d19 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWMONITORGROUPCLIENT_H__ +#define __NVMEOFGWMONITORGROUPCLIENT_H__ +#include +#include +#include + +#include + +#include "monitor.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwMonitorGroupClient { + public: + NVMeofGwMonitorGroupClient(std::shared_ptr channel) + : stub_(MonitorGroup::NewStub(channel)) {} + + bool set_group_id(const uint32_t& id); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/gateway b/src/nvmeof/gateway new file mode 160000 index 000000000000..53098bdc90fd --- /dev/null +++ b/src/nvmeof/gateway @@ -0,0 +1 @@ +Subproject commit 53098bdc90fdaa45990b0a2ff38c4ed82b7c6e25 diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py index 7d2dd16cf0d6..1d957d85ae01 100644 --- a/src/pybind/mgr/cephadm/services/nvmeof.py +++ b/src/pybind/mgr/cephadm/services/nvmeof.py @@ -29,7 +29,7 @@ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonD host_ip = self.mgr.inventory.get_addr(daemon_spec.host) keyring = self.get_keyring_with_caps(self.get_auth_entity(nvmeof_gw_id), - ['mon', 'profile rbd', + ['mon', 'allow *', 'osd', 'allow all tag rbd *=*']) # TODO: check if we can force jinja2 to generate dicts with double quotes instead of using json.dumps @@ -52,6 +52,16 @@ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonD daemon_spec.extra_files = {'ceph-nvmeof.conf': gw_conf} daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) daemon_spec.deps = [] + # Notify monitor about this gateway creation + cmd = { + 'prefix': 'nvme-gw create', + 'id': name, + 'group': spec.group, + 'pool': spec.pool + } + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") return daemon_spec def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: @@ -83,6 +93,19 @@ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None Called after the daemon is removed. """ logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') + spec = cast(NvmeofServiceSpec, self.mgr.spec_store[daemon.service_name].spec) + name = '{}.{}'.format(utils.name_to_config_section('nvmeof'), daemon.daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': spec.group, + 'pool': spec.pool + } + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") + # TODO: remove config for dashboard nvmeof gateways if any # and any certificates being used for mTLS diff --git a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 index 72a3e5839edc..69b8332cde39 100644 --- a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 +++ b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 @@ -1,7 +1,7 @@ # {{ cephadm_managed }} [gateway] name = {{ name }} -group = {{ spec.group if spec.group is not none else '' }} +group = {{ spec.group }} addr = {{ addr }} port = {{ port }} enable_auth = {{ spec.enable_auth }} diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 4181ee2563e4..0126f1957d5c 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1168,7 +1168,7 @@ def __init__(self, #: ``name`` name of the nvmeof gateway self.name = name #: ``group`` name of the nvmeof gateway - self.group = group + self.group = group or '' #: ``enable_auth`` enables user authentication on nvmeof gateway self.enable_auth = enable_auth #: ``server_key`` gateway server key