From 062c248615b5155c9dea5b140e5d35d924033076 Mon Sep 17 00:00:00 2001 From: Michael R Sweet Date: Fri, 27 Sep 2024 17:05:28 -0400 Subject: [PATCH] Rework proxy job updating. --- pappl/printer-private.h | 1 + pappl/printer-proxy.c | 171 +++++++++++++++++++++++++++++----------- pappl/printer.c | 2 + 3 files changed, 127 insertions(+), 47 deletions(-) diff --git a/pappl/printer-private.h b/pappl/printer-private.h index d8ea0f15..d7b2c24e 100644 --- a/pappl/printer-private.h +++ b/pappl/printer-private.h @@ -106,6 +106,7 @@ struct _pappl_printer_s // Printer data *proxy_uri, // Proxy printer-uri value *proxy_uuid; // Proxy output-device-uuid value cups_array_t *proxy_jobs; // Proxy jobs + cups_mutex_t proxy_jobs_mutex; // Mutex for proxy jobs array }; diff --git a/pappl/printer-proxy.c b/pappl/printer-proxy.c index e1b73423..d372ce1e 100644 --- a/pappl/printer-proxy.c +++ b/pappl/printer-proxy.c @@ -36,6 +36,7 @@ static void free_proxy_job(_pappl_proxy_job_t *pj, void *data); static int subscribe_events(pappl_printer_t *printer, http_t *http); static void unsubscribe_events(pappl_printer_t *printer, http_t *http, int sub_id); static bool update_active_jobs(pappl_printer_t *printer, http_t *http); +static bool update_proxy_job_no_lock(pappl_printer_t *printer, int job_id, ipp_jstate_t job_state); static bool update_proxy_jobs(pappl_printer_t *printer); static bool wait_events(pappl_printer_t *printer, http_t *http, int sub_id, int *seq_number, time_t *next_wait_events); @@ -77,7 +78,7 @@ _papplPrinterRunProxy( http_t *http = NULL; // Connection to server int sub_id = 0; // Event subscription ID int seq_number = 0; // Event sequence number - bool check_jobs = true, // Check for new jobs to accept? + bool fetch_jobs = true, // Check for new jobs to accept? update_jobs = true; // Do an Update-Active-Jobs request? time_t next_wait_events = 0; // Next update time @@ -93,7 +94,7 @@ _papplPrinterRunProxy( while (!printer->proxy_terminate && !papplPrinterIsDeleted(printer) && papplSystemIsRunning(printer->system)) { // See if we have anything to do... - if (sub_id > 0 && !update_jobs && !check_jobs && time(NULL) < next_wait_events) + if (sub_id > 0 && !update_jobs && !fetch_jobs && time(NULL) < next_wait_events) { // Nothing to do, sleep for 1 second and then continue... sleep(1); @@ -117,11 +118,11 @@ _papplPrinterRunProxy( sub_id = subscribe_events(printer, http); // Check for new jobs as needed... - if (check_jobs) + if (fetch_jobs) check_fetchable_jobs(printer, http); // Wait for new jobs/state changes... - check_jobs = wait_events(printer, http, sub_id, &seq_number, &next_wait_events); + fetch_jobs = wait_events(printer, http, sub_id, &seq_number, &next_wait_events); } // Unsubscribe from events and close the connection to the Infrastructure Printer... @@ -152,13 +153,13 @@ _papplPrinterUpdateProxyDocument( // Find the proxy job, if any... - _papplRWLockRead(printer); + cupsMutexLock(&printer->proxy_jobs_mutex); for (pjob = (_pappl_proxy_job_t *)cupsArrayGetFirst(printer->proxy_jobs); pjob; pjob = (_pappl_proxy_job_t *)cupsArrayGetNext(printer->proxy_jobs)) { if (pjob->job == job) break; } - _papplRWUnlock(printer); + cupsMutexUnlock(&printer->proxy_jobs_mutex); if (!pjob) return; @@ -201,13 +202,13 @@ _papplPrinterUpdateProxyJobNoLock( // Find the proxy job, if any... - _papplRWLockRead(printer); + cupsMutexLock(&printer->proxy_jobs_mutex); for (pjob = (_pappl_proxy_job_t *)cupsArrayGetFirst(printer->proxy_jobs); pjob; pjob = (_pappl_proxy_job_t *)cupsArrayGetNext(printer->proxy_jobs)) { if (pjob->job == job) break; } - _papplRWUnlock(printer); + cupsMutexUnlock(&printer->proxy_jobs_mutex); if (!pjob) return; @@ -439,37 +440,8 @@ update_active_jobs( if (ippGetGroupTag(job_ids) == IPP_TAG_OPERATION && ippGetGroupTag(job_states) == IPP_TAG_OPERATION && ippGetCount(job_ids) == ippGetCount(job_states)) { // Got a list of jobs with different states... - ipp_jstate_t local_state, // Local job state - remote_state; // Remote job state - - _papplRWLockWrite(printer); - for (i = 0, count = ippGetCount(job_ids); i < count; i ++) - { - key.parent_job_id = ippGetInteger(job_ids, i); - remote_state = (ipp_jstate_t)ippGetInteger(job_states, i); - - if ((job = (_pappl_proxy_job_t *)cupsArrayFind(printer->proxy_jobs, &key)) != NULL) - { - local_state = papplJobGetState(job->job); - - if (remote_state >= IPP_JSTATE_CANCELED && local_state < IPP_JSTATE_CANCELED) - { - // Cancel local job - _papplRWLockWrite(job->job); - _papplJobCancelNoLock(job->job); - _papplRWUnlock(job->job); - } - else if (remote_state == IPP_JSTATE_PENDING && local_state == IPP_JSTATE_HELD) - { - // Release held job - _papplRWLockWrite(job->job); - _papplJobReleaseNoLock(job->job, /*username*/NULL); - check_jobs = true; - _papplRWUnlock(job->job); - } - } - } + check_jobs |= update_proxy_job_no_lock(printer, ippGetInteger(job_ids, i), (ipp_jstate_t)ippGetInteger(job_states, i)); // Get the jobs that no longer exist on the Infrastructure Printer... if ((job_ids = ippFindNextAttribute(response, "job-ids", IPP_TAG_INTEGER)) != NULL && ippGetGroupTag(job_ids) == IPP_TAG_UNSUPPORTED_GROUP) @@ -505,6 +477,57 @@ update_active_jobs( } +// +// 'update_proxy_job_no_lock()' - Update a proxy job's state without changing the printer's rwlock. +// + +static bool // O - Check jobs? +update_proxy_job_no_lock( + pappl_printer_t *printer, // I - Printer + int job_id, // I - Job ID + ipp_jstate_t remote_state) // I - Remote job state value +{ + bool check_jobs = false; + // Check for jobs to process? + _pappl_proxy_job_t key, // Search key + *job; // Proxy job + ipp_jstate_t local_state; // Local jost state value + + + // Find the proxy job... + cupsMutexLock(&printer->proxy_jobs_mutex); + + key.parent_job_id = job_id; + job = (_pappl_proxy_job_t *)cupsArrayFind(printer->proxy_jobs, &key); + + cupsMutexUnlock(&printer->proxy_jobs_mutex); + + if (job) + { + // Update the local job as needed... + local_state = papplJobGetState(job->job); + + if (remote_state >= IPP_JSTATE_CANCELED && local_state < IPP_JSTATE_CANCELED) + { + // Cancel local job + _papplRWLockWrite(job->job); + _papplJobCancelNoLock(job->job); + _papplRWUnlock(job->job); + } + else if (remote_state == IPP_JSTATE_PENDING && local_state == IPP_JSTATE_HELD) + { + // Release held job + _papplRWLockWrite(job->job); + _papplJobReleaseNoLock(job->job, /*username*/NULL); + check_jobs = true; + _papplRWUnlock(job->job); + } + } + + return (check_jobs); +} + + // // 'update_proxy_jobs()' - Update the available proxy jobs. // @@ -521,6 +544,8 @@ update_proxy_jobs( _pappl_proxy_job_t pj; // Proxy job // Create the proxy jobs array... + cupsMutexLock(&printer->proxy_jobs_mutex); + printer->proxy_jobs = cupsArrayNew((cups_array_cb_t)compare_proxy_jobs, /*cbdata*/NULL, /*hash_cb*/NULL, /*hash_size*/0, (cups_acopy_cb_t)copy_proxy_job, (cups_afree_cb_t)free_proxy_job); // Scan existing jobs for parent-job-xxx attributes... @@ -542,6 +567,8 @@ update_proxy_jobs( } } } + + cupsMutexUnlock(&printer->proxy_jobs_mutex); } @@ -561,11 +588,15 @@ wait_events( int *seq_number, // IO - Event sequence number time_t *next_wait_events) // O - notify-get-interval time { - bool check_jobs = false; // Return value + bool check_jobs = false, // Return value + fetch_jobs = false; // Fetch jobs? ipp_t *request, // IPP request *response; // IPP response ipp_attribute_t *attr; // IPP attribute int get_interval; // "notify-get-interval" value + int job_id = 0; // Remote job ID + ipp_jstate_t job_state = IPP_JSTATE_ABORTED; + // Remote job state // Send a Get-Notifications request @@ -586,20 +617,66 @@ wait_events( else *next_wait_events = time(NULL) + 5; - // Find the last event... - for (attr = ippFindAttribute(response, "notify-sequence-number", IPP_TAG_INTEGER); attr; attr = ippFindNextAttribute(response, "notify-sequence-number", IPP_TAG_INTEGER)) + // Process events... + _papplRWLockWrite(printer); + + for (attr = ippGetFirstAttribute(response); attr; attr = ippGetNextAttribute(response)) { - // All events for this subscription are job events, so check jobs... - check_jobs = true; + // Look at the current attribute/group... + const char *name; // Attribute name + ipp_tag_t value_tag; // Value type + const char *keyword; // "notify-event" value + int number; // "notify-sequence-number" value + + if (ippGetGroupTag(attr) != IPP_TAG_EVENT_NOTIFICATION) + { + // Update proxy (remote) print job... + if (job_id > 0) + check_jobs |= update_proxy_job_no_lock(printer, job_id, job_state); + + job_id = 0; + job_state = IPP_JSTATE_ABORTED; + continue; + } + + // In the middle of an event... + name = ippGetName(attr); + value_tag = ippGetValueTag(attr); + + if (!strcmp(name, "notify-job-id") && value_tag == IPP_TAG_INTEGER) + { + job_id = ippGetInteger(attr, 0); + } + else if (!strcmp(name, "notify-sequence-number") && value_tag == IPP_TAG_INTEGER && (number = ippGetInteger(attr, 0)) > *seq_number) + { + *seq_number = number; + } + else if (!strcmp(name, "notify-subscribed-event") && value_tag == IPP_TAG_KEYWORD) + { + // See what kind of a job event this is... + keyword = ippGetString(attr, 0, NULL); - // See if this is a higher number than before... - if (ippGetInteger(attr, 0) > *seq_number) - *seq_number = ippGetInteger(attr, 0); + if (!strcmp(keyword, "job-fetchable")) + fetch_jobs = true; + } + else if (!strcmp(name, "job-state") && value_tag == IPP_TAG_ENUM) + { + job_state = (ipp_jstate_t)ippGetInteger(attr, 0); + } } + if (job_id > 0) + check_jobs |= update_proxy_job_no_lock(printer, job_id, job_state); + + // If any jobs were released, see if they can be started now... + if (check_jobs) + _papplPrinterCheckJobsNoLock(printer); + + _papplRWUnlock(printer); + // Free memory and return... ippDelete(response); - return (check_jobs); + return (fetch_jobs); } diff --git a/pappl/printer.c b/pappl/printer.c index e97082a4..6744e494 100644 --- a/pappl/printer.c +++ b/pappl/printer.c @@ -237,6 +237,7 @@ _papplPrinterDelete( cupsRWDestroy(&printer->output_rwlock); cupsRWDestroy(&printer->rwlock); + cupsMutexDestroy(&printer->proxy_jobs_mutex); free(printer); } @@ -749,6 +750,7 @@ create_printer( // Initialize printer structure and attributes... cupsRWInit(&printer->rwlock); cupsRWInit(&printer->output_rwlock); + cupsMutexInit(&printer->proxy_jobs_mutex); printer->system = system; printer->name = strdup(printer_name);