From d076875d6d3f417c36999fb739f9396dbc705cf7 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 24 Jan 2025 13:06:34 +0100 Subject: [PATCH] Implement querier (#858) * Add querier API * Add querier example * Build fix * Mark querier API as unstable * Querier implementation * Fix modular build * Add test and update example * Add docs * Add querier session check --- CMakeLists.txt | 2 +- docs/api.rst | 40 +++++++ examples/CMakeLists.txt | 1 + examples/unix/c11/z_querier.c | 174 ++++++++++++++++++++++++++++ include/zenoh-pico/api/macros.h | 22 ++++ include/zenoh-pico/api/primitives.h | 70 +++++++++++ include/zenoh-pico/api/types.h | 42 +++++++ include/zenoh-pico/net/primitives.h | 31 +++++ include/zenoh-pico/net/query.h | 25 ++++ src/api/api.c | 143 ++++++++++++++++++++++- src/net/primitives.c | 32 ++++- src/net/query.c | 21 +++- tests/memory_leak.py | 4 + 13 files changed, 599 insertions(+), 8 deletions(-) create mode 100644 examples/unix/c11/z_querier.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 22760432f..e35bf455d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -241,7 +241,7 @@ set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport") set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport") set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY") set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions") -set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check") +set(Z_FEATURE_SESSION_CHECK 1 CACHE STRING "Toggle publisher/querier session check") set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching") set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature") set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE") diff --git a/docs/api.rst b/docs/api.rst index c2b4be996..6d65fc516 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1213,6 +1213,46 @@ See details at :ref:`owned_types_concept` .. c:function:: void z_reply_clone(z_owned_reply_t * dst, const z_loaned_reply_t * reply) .. c:function:: const z_loaned_reply_t * z_reply_loan(const z_owned_reply_t * reply) +Querier +======= + +Represents a Zenoh Querier entity. + +Types +----- + +See details at :ref:`owned_types_concept` + +.. c:type:: z_owned_querier_t +.. c:type:: z_loaned_querier_t +.. c:type:: z_moved_querier_t + +Option Types +------------ + +.. autoctype:: types.h::z_querier_options_t +.. autoctype:: types.h::z_querier_get_options_t + +Constants +--------- + +Functions +--------- +.. autocfunction:: primitives.h::z_declare_querier +.. autocfunction:: primitives.h::z_undeclare_querier +.. autocfunction:: primitives.h::z_querier_get +.. autocfunction:: primitives.h::z_querier_keyexpr + +.. autocfunction:: primitives.h::z_querier_options_default +.. autocfunction:: primitives.h::z_querier_get_options_default + +Ownership Functions +------------------- + +See details at :ref:`owned_types_concept` + +.. c:function:: const z_loaned_querier_t * z_querier_loan(const z_owned_querier_t * closure) +.. c:function:: void z_querier_drop(z_moved_querier_t * closure) Scouting ======== diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 96309c42c..42ca68e34 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -47,6 +47,7 @@ if(UNIX) add_example(z_get_channel unix/c11/z_get_channel.c) add_example(z_get_attachment unix/c11/z_get_attachment.c) add_example(z_get_liveliness unix/c11/z_get_liveliness.c) + add_example(z_querier unix/c11/z_querier.c) add_example(z_queryable unix/c11/z_queryable.c) add_example(z_queryable_channel unix/c11/z_queryable_channel.c) add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c) diff --git a/examples/unix/c11/z_querier.c b/examples/unix/c11/z_querier.c new file mode 100644 index 000000000..39222e783 --- /dev/null +++ b/examples/unix/c11/z_querier.c @@ -0,0 +1,174 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include + +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API + +int main(int argc, char **argv) { + const char *selector = "demo/example/**"; + const char *mode = "client"; + const char *clocator = NULL; + const char *llocator = NULL; + const char *value = NULL; + int n = INT_MAX; + int timeout_ms = 0; + + int opt; + while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) { + switch (opt) { + case 's': + selector = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'v': + value = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case 't': + timeout_ms = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' || + optopt == 'n' || optopt == 't') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_session_drop(z_session_move(&s)); + return -1; + } + + const char *ke = selector; + size_t ke_len = strlen(ke); + const char *params = strchr(selector, '?'); + if (params != NULL) { + ke_len = params - ke; + params += 1; + } + + z_view_keyexpr_t keyexpr; + if (z_view_keyexpr_from_substr(&keyexpr, ke, ke_len) < 0) { + printf("%.*s is not a valid key expression", (int)ke_len, ke); + exit(-1); + } + + printf("Declaring Querier on '%s'...\n", ke); + z_owned_querier_t querier; + + z_querier_options_t opts; + z_querier_options_default(&opts); + opts.timeout_ms = timeout_ms; + + if (z_declare_querier(z_loan(s), &querier, z_loan(keyexpr), &opts) < 0) { + printf("Unable to declare Querier for key expression!\n"); + exit(-1); + } + + printf("Press CTRL-C to quit...\n"); + char buf[256]; + for (int idx = 0; idx != n; ++idx) { + z_sleep_s(1); + sprintf(buf, "[%4d] %s", idx, value ? value : ""); + printf("Querying '%s' with payload '%s'...\n", selector, buf); + z_querier_get_options_t get_options; + z_querier_get_options_default(&get_options); + + if (value != NULL) { + z_owned_bytes_t payload; + z_bytes_copy_from_str(&payload, buf); + get_options.payload = z_move(payload); + } + + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t closure; + z_fifo_channel_reply_new(&closure, &handler, 16); + + z_querier_get(z_loan(querier), params, z_move(closure), &get_options); + + z_owned_reply_t reply; + for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) { + if (z_reply_is_ok(z_loan(reply))) { + const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply)); + + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); + + z_owned_string_t replystr; + z_bytes_to_string(z_sample_payload(sample), &replystr); + + printf(">> Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(keystr)), + z_string_data(z_loan(keystr)), (int)z_string_len(z_loan(replystr)), + z_string_data(z_loan(replystr))); + z_drop(z_move(replystr)); + } else { + printf(">> Received an error\n"); + } + z_drop(z_move(reply)); + } + z_drop(z_move(handler)); + } + + z_drop(z_move(querier)); + z_drop(z_move(s)); + + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index 24a49599e..ff1605b12 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -43,6 +43,7 @@ z_owned_session_t : z_session_loan, \ z_owned_subscriber_t : z_subscriber_loan, \ z_owned_publisher_t : z_publisher_loan, \ + z_owned_querier_t : z_querier_loan, \ z_owned_matching_listener_t : z_matching_listener_loan, \ z_owned_queryable_t : z_queryable_loan, \ z_owned_liveliness_token_t : z_liveliness_token_loan, \ @@ -82,6 +83,7 @@ z_owned_config_t : z_config_loan_mut, \ z_owned_session_t : z_session_loan_mut, \ z_owned_publisher_t : z_publisher_loan_mut, \ + z_owned_querier_t : z_querier_loan_mut, \ z_owned_matching_listener_t : z_matching_listener_loan_mut, \ z_owned_queryable_t : z_queryable_loan_mut, \ z_owned_liveliness_token_t : z_liveliness_token_loan_mut, \ @@ -116,6 +118,7 @@ z_moved_session_t* : z_session_drop, \ z_moved_subscriber_t* : z_subscriber_drop, \ z_moved_publisher_t* : z_publisher_drop, \ + z_moved_querier_t* : z_querier_drop, \ z_moved_matching_listener_t* : z_matching_listener_drop, \ z_moved_queryable_t* : z_queryable_drop, \ z_moved_liveliness_token_t* : z_liveliness_token_drop, \ @@ -165,6 +168,7 @@ z_owned_session_t : z_internal_session_check, \ z_owned_subscriber_t : z_internal_subscriber_check, \ z_owned_publisher_t : z_internal_publisher_check, \ + z_owned_querier_t : z_internal_querier_check, \ z_owned_matching_listener_t : z_internal_matching_listener_check, \ z_owned_queryable_t : z_internal_queryable_check, \ z_owned_liveliness_token_t : z_internal_liveliness_token_check, \ @@ -237,6 +241,7 @@ z_owned_session_t : z_session_move, \ z_owned_subscriber_t : z_subscriber_move, \ z_owned_publisher_t : z_publisher_move, \ + z_owned_querier_t : z_querier_move, \ z_owned_matching_listener_t: z_matching_listener_move, \ z_owned_queryable_t : z_queryable_move, \ z_owned_liveliness_token_t : z_liveliness_token_move, \ @@ -298,6 +303,7 @@ z_owned_keyexpr_t *: z_keyexpr_take, \ z_owned_mutex_t *: z_mutex_take, \ z_owned_publisher_t *: z_publisher_take, \ + z_owned_querier_t *: z_querier_take, \ z_owned_matching_listener_t *: z_matching_listener_take, \ z_owned_query_t *: z_query_take, \ z_owned_queryable_t *: z_queryable_take, \ @@ -351,6 +357,7 @@ #define z_internal_null(x) _Generic((x), \ z_owned_session_t * : z_internal_session_null, \ z_owned_publisher_t * : z_internal_publisher_null, \ + z_owned_querier_t * : z_internal_querier_null, \ z_owned_matching_listener_t * : z_internal_matching_listener_null, \ z_owned_keyexpr_t * : z_internal_keyexpr_null, \ z_owned_config_t * : z_internal_config_null, \ @@ -410,6 +417,7 @@ inline const z_loaned_config_t* z_loan(const z_owned_config_t& x) { return z_con inline const z_loaned_session_t* z_loan(const z_owned_session_t& x) { return z_session_loan(&x); } inline const z_loaned_subscriber_t* z_loan(const z_owned_subscriber_t& x) { return z_subscriber_loan(&x); } inline const z_loaned_publisher_t* z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); } +inline const z_loaned_querier_t* z_loan(const z_owned_querier_t& x) { return z_querier_loan(&x); } inline const z_loaned_matching_listener_t* z_loan(const z_owned_matching_listener_t& x) { return z_matching_listener_loan(&x); } inline const z_loaned_queryable_t* z_loan(const z_owned_queryable_t& x) { return z_queryable_loan(&x); } inline const z_loaned_liveliness_token_t* z_loan(const z_owned_liveliness_token_t& x) { return z_liveliness_token_loan(&x); } @@ -449,6 +457,7 @@ inline z_loaned_keyexpr_t* z_loan_mut(z_view_keyexpr_t& x) { return z_view_keyex inline z_loaned_config_t* z_loan_mut(z_owned_config_t& x) { return z_config_loan_mut(&x); } inline z_loaned_session_t* z_loan_mut(z_owned_session_t& x) { return z_session_loan_mut(&x); } inline z_loaned_publisher_t* z_loan_mut(z_owned_publisher_t& x) { return z_publisher_loan_mut(&x); } +inline z_loaned_querier_t* z_loan_mut(z_owned_querier_t& x) { return z_querier_loan_mut(&x); } inline z_loaned_matching_listener_t* z_loan_mut(z_owned_matching_listener_t& x) { return z_matching_listener_loan_mut(&x); } inline z_loaned_queryable_t* z_loan_mut(z_owned_queryable_t& x) { return z_queryable_loan_mut(&x); } inline z_loaned_liveliness_token_t* z_loan_mut(z_owned_liveliness_token_t& x) { return z_liveliness_token_loan_mut(&x); } @@ -474,6 +483,7 @@ inline ze_loaned_serializer_t* z_loan_mut(ze_owned_serializer_t& x) { return ze_ // z_drop definition inline void z_drop(z_moved_session_t* v) { z_session_drop(v); } inline void z_drop(z_moved_publisher_t* v) { z_publisher_drop(v); } +inline void z_drop(z_moved_querier_t* v) { z_querier_drop(v); } inline void z_drop(z_moved_matching_listener_t* v) { z_matching_listener_drop(v); } inline void z_drop(z_moved_keyexpr_t* v) { z_keyexpr_drop(v); } inline void z_drop(z_moved_config_t* v) { z_config_drop(v); } @@ -510,6 +520,7 @@ inline void z_drop(ze_moved_serializer_t* v) { ze_serializer_drop(v); } // z_internal_null definition inline void z_internal_null(z_owned_session_t* v) { z_internal_session_null(v); } inline void z_internal_null(z_owned_publisher_t* v) { z_internal_publisher_null(v); } +inline void z_internal_null(z_owned_querier_t* v) { z_internal_querier_null(v); } inline void z_internal_null(z_owned_matching_listener_t* v) { z_internal_matching_listener_null(v); } inline void z_internal_null(z_owned_keyexpr_t* v) { z_internal_keyexpr_null(v); } inline void z_internal_null(z_owned_config_t* v) { z_internal_config_null(v); } @@ -542,6 +553,7 @@ inline void z_internal_null(ze_owned_serializer_t* v) { return ze_internal_seria // z_internal_check definition inline bool z_internal_check(const z_owned_session_t& v) { return z_internal_session_check(&v); } inline bool z_internal_check(const z_owned_publisher_t& v) { return z_internal_publisher_check(&v); } +inline bool z_internal_check(const z_owned_querier_t& v) { return z_internal_querier_check(&v); } inline bool z_internal_check(const z_owned_matching_listener_t& v) { return z_internal_matching_listener_check(&v); } inline bool z_internal_check(const z_owned_keyexpr_t& v) { return z_internal_keyexpr_check(&v); } inline bool z_internal_check(const z_owned_config_t& v) { return z_internal_config_check(&v); } @@ -690,6 +702,7 @@ inline z_moved_reply_err_t* z_move(z_owned_reply_err_t& x) { return z_reply_err_ inline z_moved_hello_t* z_move(z_owned_hello_t& x) { return z_hello_move(&x); } inline z_moved_keyexpr_t* z_move(z_owned_keyexpr_t& x) { return z_keyexpr_move(&x); } inline z_moved_publisher_t* z_move(z_owned_publisher_t& x) { return z_publisher_move(&x); } +inline z_moved_querier_t* z_move(z_owned_querier_t& x) { return z_querier_move(&x); } inline z_moved_matching_listener_t* z_move(z_owned_matching_listener_t& x) { return z_matching_listener_move(&x); } inline z_moved_query_t* z_move(z_owned_query_t& x) { return z_query_move(&x); } inline z_moved_queryable_t* z_move(z_owned_queryable_t& x) { return z_queryable_move(&x); } @@ -717,6 +730,7 @@ inline ze_moved_serializer_t* z_move(ze_owned_serializer_t& x) { return ze_seria // z_take definition inline void z_take(z_owned_session_t* this_, z_moved_session_t* v) { return z_session_take(this_, v); } inline void z_take(z_owned_publisher_t* this_, z_moved_publisher_t* v) { return z_publisher_take(this_, v); } +inline void z_take(z_owned_querier_t* this_, z_moved_querier_t* v) { return z_querier_take(this_, v); } inline void z_take(z_owned_matching_listener_t* this_, z_moved_matching_listener_t* v) { return z_matching_listener_take(this_, v); } @@ -847,6 +861,14 @@ struct z_owned_to_loaned_type_t { typedef z_loaned_publisher_t type; }; template <> +struct z_loaned_to_owned_type_t { + typedef z_owned_querier_t type; +}; +template <> +struct z_owned_to_loaned_type_t { + typedef z_loaned_querier_t type; +}; +template <> struct z_loaned_to_owned_type_t { typedef z_owned_matching_listener_t type; }; diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 4b4e227b9..dd6d28429 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1145,6 +1145,7 @@ _Z_OWNED_FUNCTIONS_DEF(config) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(session) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(subscriber) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(publisher) +_Z_OWNED_FUNCTIONS_NO_COPY_DEF(querier) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(matching_listener) _Z_OWNED_FUNCTIONS_NO_COPY_DEF(queryable) _Z_OWNED_FUNCTIONS_DEF(hello) @@ -1775,6 +1776,75 @@ void z_get_options_default(z_get_options_t *options); */ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, const char *parameters, z_moved_closure_reply_t *callback, z_get_options_t *options); + +#ifdef Z_FEATURE_UNSTABLE_API +/** + * Constructs the default value for :c:type:`z_querier_get_options_t`. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +void z_querier_get_options_default(z_querier_get_options_t *options); + +/** + * Constructs the default value for :c:type:`z_querier_options_t`. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +void z_querier_options_default(z_querier_options_t *options); + +/** + * Constructs and declares a querier on the given key expression. + * + * The queries can be send with the help of the `z_querier_get()` function. + * + * Parameters: + * zs: The Zenoh session. + * querier: An uninitialized location in memory where querier will be constructed. + * keyexpr: The key expression to send queries on. + * options: Additional options for the querier. + * + * Return: + * ``0`` if put operation is successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ + +z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *querier, + const z_loaned_keyexpr_t *keyexpr, z_querier_options_t *options); + +/** + * Frees memory and resets querier to its gravestone state. + */ +z_result_t z_undeclare_querier(z_moved_querier_t *querier); + +/** + * Query data from the matching queryables in the system. + * + * Replies are provided through a callback function. + * + * Parameters: + * querier: The querier to make query from. + * parameters: The query's parameters, similar to a url's query segment. + * callback: The callback function that will be called on reception of replies for this query. It will be + * automatically dropped once all replies are processed. + * options: Additional options for the get. All owned fields will be consumed. + * + * Return: + * ``0`` if put operation is successful, ``negative value`` otherwise. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *parameters, z_moved_closure_reply_t *callback, + z_querier_get_options_t *options); + +/** + * Returns the key expression of the querier. + * + * .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release. + */ +const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier); +#endif // Z_FEATURE_UNSTABLE_API + /** * Checks if queryable answered with an OK, which allows this value to be treated as a sample. * diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 1712a2954..71b4d017e 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -112,6 +112,11 @@ _Z_OWNED_TYPE_VALUE(_z_subscriber_t, subscriber) */ _Z_OWNED_TYPE_VALUE(_z_publisher_t, publisher) +/** + * Represents a Zenoh Querier entity. + */ +_Z_OWNED_TYPE_VALUE(_z_querier_t, querier) + /** * Represents a Zenoh Matching listener entity. */ @@ -197,6 +202,43 @@ typedef struct { #endif } z_publisher_options_t; +/** + * Options passed to the :c:func:`z_declare_querier()` function. + * + * Members: + * z_query_target_t target: The Queryables that should be target of the querier queries. + * z_query_consolidation_t consolidation: The replies consolidation strategy to apply on replies to the querier + * queries. + * z_congestion_control_t congestion_control: The congestion control to apply when routing the querier queries. + * bool is_express: If set to ``true``, the querier queries will not be batched. This usually has a positive impact on + * latency but negative impact on throughput. + * z_priority_t priority: The priority of the querier queries. + * uint64_t timeout_ms: The timeout for the querier queries in milliseconds. 0 means default query timeout from zenoh + * configuration. + */ +typedef struct z_querier_options_t { + z_query_target_t target; + z_query_consolidation_t consolidation; + z_congestion_control_t congestion_control; + bool is_express; + z_priority_t priority; + uint64_t timeout_ms; +} z_querier_options_t; + +/** + * Options passed to the :c:func:`z_querier_get()` function. + * + * Members: + * z_moved_bytes_t *payload: An optional payload to attach to the query. + * z_moved_encoding_t *encoding: An optional encoding of the query payload and or attachment. + * z_moved_bytes_t *attachment: An optional attachment to attach to the query. + */ +typedef struct z_querier_get_options_t { + z_moved_bytes_t *payload; + z_moved_encoding_t *encoding; + z_moved_bytes_t *attachment; +} z_querier_get_options_t; + /** * Represents the configuration used to configure a queryable upon declaration :c:func:`z_declare_queryable`. * diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 269431bfe..1cf3047b7 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -243,6 +243,37 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr #endif #if Z_FEATURE_QUERY == 1 +/** + * Declare a :c:type:`_z_querier_t` for the given resource key. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to query. The callee gets the ownership of any + * allocated value. + * consolidation_mode: The kind of consolidation that should be applied on replies. + * congestion_control: The congestion control to apply when routing the querier queries. + * target: The kind of queryables that should be target of this query. + * priority: The priority of the query. + * is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth. + * timeout_ms: The timeout value of this query. + * Returns: + * The created :c:type:`_z_querier_t` (in null state if the declaration failed).. + */ +_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control, + z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms); + +/** + * Undeclare a :c:type:`_z_querier_t`. + * + * Parameters: + * querier: The :c:type:`_z_querier_t` to undeclare. The callee releases the + * querier upon successful return. + * Returns: + * 0 if success, or a negative value identifying the error. + */ +z_result_t _z_undeclare_querier(_z_querier_t *querier); + /** * Query data from the matching queryables in the system. * diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 675264744..489dd2411 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -51,6 +51,31 @@ void _z_query_free(_z_query_t **query); _Z_REFCOUNT_DEFINE(_z_query, _z_query) +/** + * Return type when declaring a querier. + */ +typedef struct _z_querier_t { + _z_keyexpr_t _key; + _z_zint_t _id; + _z_session_weak_t _zn; + _z_encoding_t _encoding; + z_consolidation_mode_t _consolidation_mode; + z_query_target_t _target; + z_congestion_control_t _congestion_control; + z_priority_t _priority; + z_reliability_t reliability; + bool _is_express; + uint64_t _timeout_ms; +} _z_querier_t; + +#if Z_FEATURE_QUERY == 1 +// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes. +static inline _z_querier_t _z_querier_null(void) { return (_z_querier_t){0}; } +static inline bool _z_querier_check(const _z_querier_t *querier) { return !_Z_RC_IS_NULL(&querier->_zn); } +void _z_querier_clear(_z_querier_t *querier); +void _z_querier_free(_z_querier_t **querier); +#endif + /** * Return type when declaring a queryable. */ diff --git a/src/api/api.c b/src/api/api.c index afbda16f8..3c9bacefe 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1020,7 +1020,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay _z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true); _z_session_t *session = NULL; -#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1 +#if Z_FEATURE_SESSION_CHECK == 1 // Try to upgrade session rc _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn); if (!_Z_RC_IS_NULL(&sess_rc)) { @@ -1054,7 +1054,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay ret = _Z_ERR_SESSION_CLOSED; } -#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1 +#if Z_FEATURE_SESSION_CHECK == 1 _z_session_rc_drop(&sess_rc); #endif @@ -1080,7 +1080,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher _z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true); _z_session_t *session = NULL; -#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1 +#if Z_FEATURE_SESSION_CHECK == 1 // Try to upgrade session rc _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&pub->_zn); if (!_Z_RC_IS_NULL(&sess_rc)) { @@ -1096,7 +1096,7 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher _z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability); -#if Z_FEATURE_PUBLISHER_SESSION_CHECK == 1 +#if Z_FEATURE_SESSION_CHECK == 1 // Clean up _z_session_rc_drop(&sess_rc); #endif @@ -1205,6 +1205,141 @@ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr return ret; } +void _z_querier_drop(_z_querier_t *querier) { + _z_undeclare_querier(querier); + _z_querier_clear(querier); +} + +_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_querier_t, querier, _z_querier_check, _z_querier_null, _z_querier_drop) + +#ifdef Z_FEATURE_UNSTABLE_API +void z_querier_get_options_default(z_querier_get_options_t *options) { + options->encoding = NULL; + options->attachment = NULL; + options->payload = NULL; +} + +void z_querier_options_default(z_querier_options_t *options) { + options->target = z_query_target_default(); + options->consolidation = z_query_consolidation_default(); + options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT; + options->priority = Z_PRIORITY_DEFAULT; + options->is_express = false; + options->timeout_ms = Z_GET_TIMEOUT_DEFAULT; +} + +z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *querier, + const z_loaned_keyexpr_t *keyexpr, z_querier_options_t *options) { + _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); + _z_keyexpr_t key = keyexpr_aliased; + + querier->_val = _z_querier_null(); + // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition + // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic + // resource declarations are only performed on unicast transports. + if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased); + if (r == NULL) { + uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &keyexpr_aliased); + key = _z_keyexpr_from_string(id, &keyexpr_aliased._suffix); + } + } + // Set options + z_querier_options_t opt; + z_querier_options_default(&opt); + if (options != NULL) { + opt = *options; + } + + // Set querier + _z_querier_t int_querier = _z_declare_querier(zs, key, opt.consolidation.mode, opt.congestion_control, opt.target, + opt.priority, opt.is_express, opt.timeout_ms); + + querier->_val = int_querier; + return _Z_RES_OK; +} + +z_result_t z_undeclare_querier(z_moved_querier_t *querier) { + z_result_t ret = _z_undeclare_querier(&querier->_this._val); + _z_querier_clear(&querier->_this._val); + return ret; +} + +z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *parameters, z_moved_closure_reply_t *callback, + z_querier_get_options_t *options) { + z_result_t ret = _Z_RES_OK; + + void *ctx = callback->_this._val.context; + callback->_this._val.context = NULL; + + z_querier_get_options_t opt; + z_querier_get_options_default(&opt); + if (options != NULL) { + opt = *options; + } + + _z_encoding_t encoding; + if (opt.encoding == NULL) { + _Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &querier->_encoding)); + } else { + encoding = _z_encoding_steal(&opt.encoding->_this._val); + } + // Remove potentially redundant ke suffix + _z_keyexpr_t querier_keyexpr = _z_keyexpr_alias_from_user_defined(querier->_key, true); + + _z_session_t *session = NULL; +#if Z_FEATURE_SESSION_CHECK == 1 + // Try to upgrade session rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn); + if (!_Z_RC_IS_NULL(&sess_rc)) { + session = _Z_RC_IN_VAL(&sess_rc); + } else { + ret = _Z_ERR_SESSION_CLOSED; + } +#else + session = _Z_RC_IN_VAL(&querier->_zn); +#endif + + z_consolidation_mode_t consolidation_mode = querier->_consolidation_mode; + if (consolidation_mode == Z_CONSOLIDATION_MODE_AUTO) { + const char *lp = (parameters == NULL) ? "" : parameters; + if (strstr(lp, Z_SELECTOR_TIME) != NULL) { + consolidation_mode = Z_CONSOLIDATION_MODE_NONE; + } else { + consolidation_mode = Z_CONSOLIDATION_MODE_LATEST; + } + } + + if (session != NULL) { + _z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this), + .encoding = _z_encoding_from_owned(&opt.encoding->_this)}; + + ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value, + callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms, + _z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control, + querier->_priority, querier->_is_express); + } else { + ret = _Z_ERR_SESSION_CLOSED; + } + +#if Z_FEATURE_SESSION_CHECK == 1 + _z_session_rc_drop(&sess_rc); +#endif + + // Clean-up + z_bytes_drop(opt.payload); + z_encoding_drop(opt.encoding); + z_bytes_drop(opt.attachment); + z_internal_closure_reply_null( + &callback->_this); // call and drop passed to _z_query, so we nullify the closure here + return ret; +} + +const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) { + return (const z_loaned_keyexpr_t *)&querier->_key; +} +#endif // Z_FEATURE_UNSTABLE_API + bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; } const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data._result.sample; } diff --git a/src/net/primitives.c b/src/net/primitives.c index adea1b3ed..a93915189 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -518,6 +518,33 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr #endif #if Z_FEATURE_QUERY == 1 +/*------------------ Querier Declaration ------------------*/ +_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control, + z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms) { + // Allocate querier + _z_querier_t ret; + // Fill querier + ret._key = _z_keyexpr_duplicate(&keyexpr); + ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + ret._consolidation_mode = consolidation_mode; + ret._congestion_control = congestion_control; + ret._target = target; + ret._priority = priority; + ret._is_express = is_express; + ret._timeout_ms = timeout_ms; + ret._zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_querier(_z_querier_t *querier) { + if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + _z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id); + return _Z_RES_OK; +} + /*------------------ Query ------------------*/ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, const z_consolidation_mode_t consolidation, _z_value_t value, _z_closure_reply_callback_t callback, @@ -532,7 +559,7 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); pq->_target = target; pq->_consolidation = consolidation; - pq->_anykey = (strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; + pq->_anykey = (parameters == NULL || strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; pq->_callback = callback; pq->_dropper = dropper; pq->_pending_replies = NULL; @@ -542,7 +569,8 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session if (ret == _Z_RES_OK) { - _z_slice_t params = _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters)); + _z_slice_t params = + (parameters == NULL) ? _z_slice_null() : _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters)); _z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, ¶ms, pq->_id, pq->_consolidation, &value, timeout_ms, attachment, cong_ctrl, priority, is_express); diff --git a/src/net/query.c b/src/net/query.c index 817e94a59..f01910adc 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -14,7 +14,6 @@ #include "zenoh-pico/net/query.h" #include "zenoh-pico/net/session.h" -#include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" @@ -57,6 +56,26 @@ void _z_query_free(_z_query_t **query) { } } +#if Z_FEATURE_QUERY == 1 +void _z_querier_clear(_z_querier_t *querier) { + _z_keyexpr_clear(&querier->_key); + _z_session_weak_drop(&querier->_zn); + _z_encoding_clear(&querier->_encoding); + *querier = _z_querier_null(); +} + +void _z_querier_free(_z_querier_t **querier) { + _z_querier_t *ptr = *querier; + + if (ptr != NULL) { + _z_querier_clear(ptr); + + z_free(ptr); + *querier = NULL; + } +} +#endif + #if Z_FEATURE_QUERYABLE == 1 void _z_queryable_clear(_z_queryable_t *qbl) { _z_session_weak_drop(&qbl->_zn); diff --git a/tests/memory_leak.py b/tests/memory_leak.py index 89e727f0a..74c159acf 100644 --- a/tests/memory_leak.py +++ b/tests/memory_leak.py @@ -177,6 +177,10 @@ def query_and_queryable(query_cmd, queryable_cmd): print("*** Query & queryable attachment test ***") if query_and_queryable('z_get_attachment -v Something', 'z_queryable_attachment -n 1') == 1: EXIT_STATUS = 1 + # Test querier and queryable examples + print("*** Querier & queryable test ***") + if query_and_queryable('z_querier -n 1', 'z_queryable -n 1') == 1: + EXIT_STATUS = 1 # Test liveliness query print("*** Get liveliness test ***") if query_and_queryable('z_get_liveliness', 'z_liveliness -t 3') == 1: