Skip to content

Commit

Permalink
ldms_ls option to print decomposition templates
Browse files Browse the repository at this point in the history
This feature dumps a decomposition template for each unqique
schema published by an ldmsd. Although this template can be
used directly, it is intended to be edited by the user to
define indices of interest and rename/remove columns.

While developing this change it was discovered that the
type names produced by ldms_metric_type_to_str() were
not symmetric with those consumed by ldms_metric_str_to_type().
This change addresses this discrepency.

Also added is a #define for the minimum SHA256 schema digest
string length LDMS_DIGEST_STR_LENGTH.
  • Loading branch information
root authored and tom95858 committed Jul 3, 2023
1 parent 26f8c63 commit 5830abf
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 45 deletions.
70 changes: 41 additions & 29 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -2186,35 +2186,47 @@ static struct _ldms_type_name_map {
enum ldms_value_type type;
} type_name_map[] = {
/* This map needs to be sorted by name */
{ "CHAR", LDMS_V_CHAR },
{ "CHAR_ARRAY", LDMS_V_CHAR_ARRAY },
{ "D64", LDMS_V_D64, },
{ "D64_ARRAY", LDMS_V_D64_ARRAY},
{ "F32", LDMS_V_F32, },
{ "F32_ARRAY", LDMS_V_F32_ARRAY},
{ "LIST", LDMS_V_LIST },
{ "NONE", LDMS_V_NONE, },
{ "RECORD", LDMS_V_RECORD_INST },
{ "RECORD_ARRAY", LDMS_V_RECORD_ARRAY },
{ "RECORD_TYPE", LDMS_V_RECORD_TYPE },
{ "S16", LDMS_V_S16, },
{ "S16_ARRAY", LDMS_V_S16_ARRAY},
{ "S32", LDMS_V_S32, },
{ "S32_ARRAY", LDMS_V_S32_ARRAY},
{ "S64", LDMS_V_S64, },
{ "S64_ARRAY", LDMS_V_S64_ARRAY},
{ "S8", LDMS_V_S8, },
{ "S8_ARRAY", LDMS_V_S8_ARRAY},
{ "TIMESTAMP", LDMS_V_TIMESTAMP},
{ "TS", LDMS_V_TIMESTAMP},
{ "U16", LDMS_V_U16, },
{ "U16_ARRAY", LDMS_V_U16_ARRAY},
{ "U32", LDMS_V_U32, },
{ "U32_ARRAY", LDMS_V_U32_ARRAY},
{ "U64", LDMS_V_U64, },
{ "U64_ARRAY", LDMS_V_U64_ARRAY},
{ "U8", LDMS_V_U8, },
{ "U8_ARRAY", LDMS_V_U8_ARRAY},
{ "char", LDMS_V_CHAR },
{ "char[]", LDMS_V_CHAR_ARRAY },
{ "char_array", LDMS_V_CHAR_ARRAY },
{ "d64", LDMS_V_D64 },
{ "d64[]", LDMS_V_D64_ARRAY },
{ "d64_array", LDMS_V_D64_ARRAY },
{ "f32", LDMS_V_F32 },
{ "f32[]", LDMS_V_F32_ARRAY },
{ "f32_array", LDMS_V_F32_ARRAY },
{ "list", LDMS_V_LIST },
{ "list<>", LDMS_V_LIST },
{ "none", LDMS_V_NONE },
{ "record", LDMS_V_RECORD_INST },
{ "record_array",LDMS_V_RECORD_ARRAY },
{ "record_type",LDMS_V_RECORD_TYPE },
{ "s16", LDMS_V_S16 },
{ "s16[]", LDMS_V_S16_ARRAY },
{ "s16_array", LDMS_V_S16_ARRAY},
{ "s32", LDMS_V_S32 },
{ "s32[]", LDMS_V_S32_ARRAY },
{ "s32_array", LDMS_V_S32_ARRAY},
{ "s64", LDMS_V_S64 },
{ "s64[]", LDMS_V_S64_ARRAY },
{ "s64_array", LDMS_V_S64_ARRAY},
{ "s8", LDMS_V_S8 },
{ "s8[]", LDMS_V_S8_ARRAY },
{ "s8_array", LDMS_V_S8_ARRAY },
{ "timestamp", LDMS_V_TIMESTAMP },
{ "ts", LDMS_V_TIMESTAMP },
{ "u16", LDMS_V_U16 },
{ "u16[]", LDMS_V_U16_ARRAY },
{ "u16_array", LDMS_V_U16_ARRAY },
{ "u32", LDMS_V_U32 },
{ "u32[]", LDMS_V_U32_ARRAY },
{ "u32_array", LDMS_V_U32_ARRAY},
{ "u64", LDMS_V_U64 },
{ "u64[]", LDMS_V_U64_ARRAY },
{ "u64_array", LDMS_V_U64_ARRAY },
{ "u8", LDMS_V_U8 },
{ "u8[]", LDMS_V_U8_ARRAY },
{ "u8_array", LDMS_V_U8_ARRAY },
};

int comparator(const void *a, const void *b)
Expand Down
17 changes: 8 additions & 9 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ ldms_schema_t ldms_schema_from_template(const char *name,
struct ldms_metric_template_s tmp[],
int mid[]);

/**
/**
* \brief Write a JSON representation of the schema to a file
*
* \param schema The schema handle
Expand Down Expand Up @@ -1423,21 +1423,19 @@ int ldms_record_metric_add_template(ldms_record_t rec_def,
struct ldms_metric_template_s tmp[], int mid[]);

/**
* Get the size (bytes) required in the heap for a record instance.
* \brief Return the heap memory required by a record type
*
* This function is useful for estimating the minimum heap size required to a
* record instance of the given record type definition. To determine the minimum
* heap size supporting \c N record instances, simply multiply the returned
* number with \c N.
* This function returns the heap size required by an instance of a
* record type.
*
* \param rec_def The handle returned by \c ldms_record_create().
*
* \retval bytes The size of the record instance in the heap.
* \retval bytes The size of the record type in the heap.
*/
size_t ldms_record_heap_size_get(ldms_record_t rec_def);

/**
* Get the size (bytes) of the heap memory storing the record metric values.
* \brief Get the size (bytes) of the heap memory storing the record metric values.
*
* \param rec_def The handle returned by \c ldms_record_create().
*
Expand Down Expand Up @@ -1740,6 +1738,7 @@ extern uint32_t ldms_set_card_get(ldms_set_t s);
* \return The schema digest
*/
#define LDMS_DIGEST_LENGTH SHA256_DIGEST_LENGTH
#define LDMS_DIGEST_STR_LENGTH ((2 * LDMS_DIGEST_LENGTH) + 1)
struct ldms_digest_s {
unsigned char digest[LDMS_DIGEST_LENGTH];
};
Expand All @@ -1751,7 +1750,7 @@ extern ldms_digest_t ldms_set_digest_get(ldms_set_t s);
*
* \param digest The digest
* \param buf The output buffer
* \param buf_len The buffer length
* \param buf_len The buffer length - must be >= LDMS_DIGEST_STR_LENGTH
*
* \retval NULL If there is an error (\c errno describing the error)
* \retval buf If succeeded, the output buffer containing formatted digest
Expand Down
149 changes: 142 additions & 7 deletions ldms/src/ldmsd/ldms_ls.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ static int done;
static sem_t conn_sem;

static int schema;
static int print_decomp;

struct ls_set {
struct ldms_dir_set_s *set_data;
Expand Down Expand Up @@ -132,7 +133,7 @@ const char *auth_name = "none";
struct attr_value_list *auth_opt = NULL;
const int auth_opt_max = 128;

#define FMT "h:p:x:w:m:ESIlvua:A:VP"
#define FMT "h:p:x:w:m:ESIlvua:A:VPd"
void usage(char *argv[])
{
printf("%s -h <hostname> -x <transport> [ name ... ]\n"
Expand All @@ -144,24 +145,25 @@ void usage(char *argv[])
" localhost unless -h is specified in which case it is sock.\n"
"\n -w <secs> The time to wait before giving up on the server.\n"
" The default is 10 seconds.\n"
"\n -v Show detail information about the metric set. Specifying\n"
"\n -v Show detail information about the metric set. Specifying\n"
" this option multiple times increases the verbosity.\n"
"\n -E The <name> arguments are regular expressions.\n"
"\n -S The <name>s refers to the schema name.\n"
"\n -I The <name>s refer to the instance name (default).\n",
"\n -I The <name>s refer to the instance name (default).\n"
"\n -d Output a decomposition template for matching schema.\n",
argv[0]);
printf("\n -m <memory size> Maximum size of pre-allocated memory for metric sets.\n"
" The given size must be less than 1 petabytes.\n"
" The default is %s.\n"
" For example, 20M or 20mb are 20 megabytes.\n"
" - The environment variable %s could be set\n"
" The environment variable %s could be set\n"
" instead of giving the -m option. If both are given,\n"
" this option takes precedence over the environment variable.\n"
"\n -a <auth> LDMS Authentication plugin to be used (default: 'none').\n"
"\n -A <key>=<value> (repeatable) LDMS Authentication plugin parameters.\n"
, LDMS_LS_MAX_MEM_SZ_STR, LDMS_LS_MEM_SZ_ENVVAR);
printf("\n -V Print LDMS version and exit.\n");
printf("\n -P Register for push updates.\n");
printf("\n -V Print LDMS version and exit.\n");
printf("\n -P Register for push updates.\n");
exit(1);
}

Expand Down Expand Up @@ -671,6 +673,90 @@ static int is_matched(char *inst_name, char *schema_name)
static int verbose = 0;
static int long_format = 0;

void fprint_record(FILE *fp, int indent, const char *lname, ldms_mval_t rval)
{
int i;
size_t array_len;
enum ldms_value_type type;
for (i = 0; i < ldms_record_card(rval); i++) {
const char *mname = ldms_record_metric_name_get(rval, i);
type = ldms_record_metric_type_get(rval, i, &array_len);
fprintf(fp, "%*s{ \"src\" : \"%s\", ", indent, "", lname);
fprintf(fp, "\"rec_member\" : \"%s\", ", mname);
fprintf(fp, "\"dst\" : \"%s\", ", mname);
fprintf(fp, "\"type\" : \"%s\"", ldms_metric_type_to_str(type));
if (ldms_type_is_array(type))
fprintf(fp, ", \"array_len\" : %zu }", array_len);
else
fprintf(fp, " }");
if (i < ldms_record_card(rval)-1)
fprintf(fp, ",\n");
else
fprintf(fp, "\n");
}
}

void fprint_decomp(FILE *fp, ldms_set_t s)
{
int i;
enum ldms_value_type type;
fprintf(fp, " \"%s_decomp\" : {\n", ldms_set_schema_name_get(s));
fprintf(fp, " \"type\" : \"static\",\n");
fprintf(fp, " \"rows\" : [\n");
fprintf(fp, " {\n");
fprintf(fp, " \"schema\" : \"%s\",\n", ldms_set_schema_name_get(s));
fprintf(fp, " \"cols\" : [\n");
i = ldms_metric_by_name(s, "timestamp");
if (i < 0) {
/* Add a transaction timestamp if a metric named 'timestamp' is not present in the schema */
fprintf(fp, " { \"src\" : \"timestamp\", \"dst\" : \"timestamp\", \"type\" : \"ts\" },\n");
}
for (i = 0; i < ldms_set_card_get(s); i++) {
type = ldms_metric_type_get(s, i);
if (type == LDMS_V_RECORD_TYPE)
continue;
if (type == LDMS_V_LIST) {
enum ldms_value_type rtype;
ldms_mval_t lval = ldms_metric_get(s, i);
ldms_mval_t rval = ldms_list_first(s, lval, &rtype, NULL);
fprint_record(fp, 14, ldms_metric_name_get(s, i), rval);
} else if (ldms_type_is_array(type)) {
fprintf(fp, " { \"src\" : \"%s\", \"dst\" : \"%s\", "
"\"type\" : \"%s\", \"array_len\" : %d }",
ldms_metric_name_get(s, i), ldms_metric_name_get(s, i),
ldms_metric_type_to_str(type),
ldms_type_is_array(type) ? ldms_metric_array_get_len(s, i) : 0);
} else {
fprintf(fp, " { \"src\" : \"%s\", \"dst\" : \"%s\", "
"\"type\" : \"%s\" }",
ldms_metric_name_get(s, i), ldms_metric_name_get(s, i),
ldms_metric_type_to_str(type));
}
if (i < ldms_set_card_get(s)-1)
fprintf(fp, ",\n");
else
fprintf(fp, "\n");
}
fprintf(fp, " ],\n"); /* terminate cols list */
fprintf(fp, " \"indices\" : [\n");
fprintf(fp, " ]\n");
fprintf(fp, " }\n"); /* terminate row */
fprintf(fp, " ]\n"); /* terminate rows list */
fprintf(fp, " }"); /* terminate decomposition */
}

int digest_cmp(void *a, const void *b)
{
return strcmp(a, b);
}

struct rbt digest_tree = RBT_INITIALIZER( digest_cmp );
struct digest_entry {
char digest_str[LDMS_DIGEST_STR_LENGTH];
char *schema_name;
struct rbn rbn;
};

void print_cb(ldms_t t, ldms_set_t s, int rc, void *arg)
{
int err;
Expand All @@ -691,6 +777,26 @@ void print_cb(ldms_t t, ldms_set_t s, int rc, void *arg)
return;
}
}
if (print_decomp) {
char digest_str[LDMS_DIGEST_STR_LENGTH];
struct digest_entry *de;
struct rbn *rbn;
ldms_digest_str(ldms_set_digest_get(s), digest_str, sizeof(digest_str));
rbn = rbt_find(&digest_tree, digest_str);
if (rbn)
goto out;
if (rbt_card(&digest_tree))
fprintf(stdout, ",\n");
else
fprintf(stdout, "\n");
de = malloc(sizeof(*de));
strcpy(de->digest_str, digest_str);
de->schema_name = strdup(ldms_set_schema_name_get(s));
rbn_init(&de->rbn, de->digest_str);
rbt_ins(&digest_tree, &de->rbn);
fprint_decomp(stdout, s);
goto out;
}
struct ldms_timestamp _ts = ldms_transaction_timestamp_get(s);
struct ldms_timestamp const *ts = &_ts;
int consistent = ldms_set_is_consistent(s);
Expand All @@ -716,7 +822,8 @@ void print_cb(ldms_t t, ldms_set_t s, int rc, void *arg)
if ((rc == 0) || (rc & LDMS_UPD_F_PUSH_LAST))
ldms_set_delete(s);
out:
printf("\n");
if (!print_decomp)
printf("\n");
if (last) {
pthread_mutex_lock(&print_lock);
print_done = 1;
Expand Down Expand Up @@ -1028,6 +1135,10 @@ int main(int argc, char *argv[])
case 'I':
schema = 0;
break;
case 'd':
print_decomp = 1;
long_format = 1;
break;
case 'h':
free(hostname);
hostname = strdup(optarg);
Expand Down Expand Up @@ -1333,6 +1444,11 @@ int main(int argc, char *argv[])
if (verbose && long_format)
printf("\n=======================================================================\n\n");

if (print_decomp) {
fprintf(stdout, "{\n");
fprintf(stdout, " \"type\" : \"flex\",\n");
fprintf(stdout, " \"decomposition\" : {");
}
/*
* Handle the long format (-l)
*/
Expand All @@ -1359,6 +1475,25 @@ int main(int argc, char *argv[])
pthread_mutex_unlock(&print_lock);
free(lss);
}
if (print_decomp) {
struct digest_entry *de;
fprintf(stdout, "\n },\n"); /* terminate decomposition */
fprintf(stdout, " \"digest\" : {\n");
while (!rbt_empty(&digest_tree)) {
struct rbn *rbn = rbt_min(&digest_tree);
rbt_del(&digest_tree, rbn);
de = container_of(rbn, struct digest_entry, rbn);
fprintf(stdout, " \"%s\" : \"%s_decomp\"", de->digest_str, de->schema_name);
if (rbt_empty(&digest_tree))
fprintf(stdout, "\n");
else
fprintf(stdout, ",\n");
free(de->schema_name);
free(de);
}
fprintf(stdout, " }\n"); /* terminate digest dictionary */
fprintf(stdout, "}\n"); /* terminate flex decomposition */
}
done = 1;
done:
pthread_mutex_lock(&done_lock);
Expand Down

0 comments on commit 5830abf

Please sign in to comment.