Skip to content

Commit

Permalink
Rework proxy job updating.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelrsweet committed Sep 27, 2024
1 parent c9e0736 commit 062c248
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 47 deletions.
1 change: 1 addition & 0 deletions pappl/printer-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct _pappl_printer_s // Printer data
*proxy_uri, // Proxy printer-uri value
*proxy_uuid; // Proxy output-device-uuid value
cups_array_t *proxy_jobs; // Proxy jobs
cups_mutex_t proxy_jobs_mutex; // Mutex for proxy jobs array
};


Expand Down
171 changes: 124 additions & 47 deletions pappl/printer-proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static void free_proxy_job(_pappl_proxy_job_t *pj, void *data);
static int subscribe_events(pappl_printer_t *printer, http_t *http);
static void unsubscribe_events(pappl_printer_t *printer, http_t *http, int sub_id);
static bool update_active_jobs(pappl_printer_t *printer, http_t *http);
static bool update_proxy_job_no_lock(pappl_printer_t *printer, int job_id, ipp_jstate_t job_state);
static bool update_proxy_jobs(pappl_printer_t *printer);
static bool wait_events(pappl_printer_t *printer, http_t *http, int sub_id, int *seq_number, time_t *next_wait_events);

Expand Down Expand Up @@ -77,7 +78,7 @@ _papplPrinterRunProxy(
http_t *http = NULL; // Connection to server
int sub_id = 0; // Event subscription ID
int seq_number = 0; // Event sequence number
bool check_jobs = true, // Check for new jobs to accept?
bool fetch_jobs = true, // Check for new jobs to accept?
update_jobs = true; // Do an Update-Active-Jobs request?
time_t next_wait_events = 0; // Next update time

Expand All @@ -93,7 +94,7 @@ _papplPrinterRunProxy(
while (!printer->proxy_terminate && !papplPrinterIsDeleted(printer) && papplSystemIsRunning(printer->system))
{
// See if we have anything to do...
if (sub_id > 0 && !update_jobs && !check_jobs && time(NULL) < next_wait_events)
if (sub_id > 0 && !update_jobs && !fetch_jobs && time(NULL) < next_wait_events)
{
// Nothing to do, sleep for 1 second and then continue...
sleep(1);
Expand All @@ -117,11 +118,11 @@ _papplPrinterRunProxy(
sub_id = subscribe_events(printer, http);

// Check for new jobs as needed...
if (check_jobs)
if (fetch_jobs)
check_fetchable_jobs(printer, http);

// Wait for new jobs/state changes...
check_jobs = wait_events(printer, http, sub_id, &seq_number, &next_wait_events);
fetch_jobs = wait_events(printer, http, sub_id, &seq_number, &next_wait_events);
}

// Unsubscribe from events and close the connection to the Infrastructure Printer...
Expand Down Expand Up @@ -152,13 +153,13 @@ _papplPrinterUpdateProxyDocument(


// Find the proxy job, if any...
_papplRWLockRead(printer);
cupsMutexLock(&printer->proxy_jobs_mutex);
for (pjob = (_pappl_proxy_job_t *)cupsArrayGetFirst(printer->proxy_jobs); pjob; pjob = (_pappl_proxy_job_t *)cupsArrayGetNext(printer->proxy_jobs))
{
if (pjob->job == job)
break;
}
_papplRWUnlock(printer);
cupsMutexUnlock(&printer->proxy_jobs_mutex);

if (!pjob)
return;
Expand Down Expand Up @@ -201,13 +202,13 @@ _papplPrinterUpdateProxyJobNoLock(


// Find the proxy job, if any...
_papplRWLockRead(printer);
cupsMutexLock(&printer->proxy_jobs_mutex);
for (pjob = (_pappl_proxy_job_t *)cupsArrayGetFirst(printer->proxy_jobs); pjob; pjob = (_pappl_proxy_job_t *)cupsArrayGetNext(printer->proxy_jobs))
{
if (pjob->job == job)
break;
}
_papplRWUnlock(printer);
cupsMutexUnlock(&printer->proxy_jobs_mutex);

if (!pjob)
return;
Expand Down Expand Up @@ -439,37 +440,8 @@ update_active_jobs(
if (ippGetGroupTag(job_ids) == IPP_TAG_OPERATION && ippGetGroupTag(job_states) == IPP_TAG_OPERATION && ippGetCount(job_ids) == ippGetCount(job_states))
{
// Got a list of jobs with different states...
ipp_jstate_t local_state, // Local job state
remote_state; // Remote job state

_papplRWLockWrite(printer);

for (i = 0, count = ippGetCount(job_ids); i < count; i ++)
{
key.parent_job_id = ippGetInteger(job_ids, i);
remote_state = (ipp_jstate_t)ippGetInteger(job_states, i);

if ((job = (_pappl_proxy_job_t *)cupsArrayFind(printer->proxy_jobs, &key)) != NULL)
{
local_state = papplJobGetState(job->job);

if (remote_state >= IPP_JSTATE_CANCELED && local_state < IPP_JSTATE_CANCELED)
{
// Cancel local job
_papplRWLockWrite(job->job);
_papplJobCancelNoLock(job->job);
_papplRWUnlock(job->job);
}
else if (remote_state == IPP_JSTATE_PENDING && local_state == IPP_JSTATE_HELD)
{
// Release held job
_papplRWLockWrite(job->job);
_papplJobReleaseNoLock(job->job, /*username*/NULL);
check_jobs = true;
_papplRWUnlock(job->job);
}
}
}
check_jobs |= update_proxy_job_no_lock(printer, ippGetInteger(job_ids, i), (ipp_jstate_t)ippGetInteger(job_states, i));

// Get the jobs that no longer exist on the Infrastructure Printer...
if ((job_ids = ippFindNextAttribute(response, "job-ids", IPP_TAG_INTEGER)) != NULL && ippGetGroupTag(job_ids) == IPP_TAG_UNSUPPORTED_GROUP)
Expand Down Expand Up @@ -505,6 +477,57 @@ update_active_jobs(
}


//
// 'update_proxy_job_no_lock()' - Update a proxy job's state without changing the printer's rwlock.
//

static bool // O - Check jobs?
update_proxy_job_no_lock(
pappl_printer_t *printer, // I - Printer
int job_id, // I - Job ID
ipp_jstate_t remote_state) // I - Remote job state value
{
bool check_jobs = false;
// Check for jobs to process?
_pappl_proxy_job_t key, // Search key
*job; // Proxy job
ipp_jstate_t local_state; // Local jost state value


// Find the proxy job...
cupsMutexLock(&printer->proxy_jobs_mutex);

key.parent_job_id = job_id;
job = (_pappl_proxy_job_t *)cupsArrayFind(printer->proxy_jobs, &key);

cupsMutexUnlock(&printer->proxy_jobs_mutex);

if (job)
{
// Update the local job as needed...
local_state = papplJobGetState(job->job);

if (remote_state >= IPP_JSTATE_CANCELED && local_state < IPP_JSTATE_CANCELED)
{
// Cancel local job
_papplRWLockWrite(job->job);
_papplJobCancelNoLock(job->job);
_papplRWUnlock(job->job);
}
else if (remote_state == IPP_JSTATE_PENDING && local_state == IPP_JSTATE_HELD)
{
// Release held job
_papplRWLockWrite(job->job);
_papplJobReleaseNoLock(job->job, /*username*/NULL);
check_jobs = true;
_papplRWUnlock(job->job);
}
}

return (check_jobs);
}


//
// 'update_proxy_jobs()' - Update the available proxy jobs.
//
Expand All @@ -521,6 +544,8 @@ update_proxy_jobs(
_pappl_proxy_job_t pj; // Proxy job

// Create the proxy jobs array...
cupsMutexLock(&printer->proxy_jobs_mutex);

printer->proxy_jobs = cupsArrayNew((cups_array_cb_t)compare_proxy_jobs, /*cbdata*/NULL, /*hash_cb*/NULL, /*hash_size*/0, (cups_acopy_cb_t)copy_proxy_job, (cups_afree_cb_t)free_proxy_job);

// Scan existing jobs for parent-job-xxx attributes...
Expand All @@ -542,6 +567,8 @@ update_proxy_jobs(
}
}
}

cupsMutexUnlock(&printer->proxy_jobs_mutex);
}


Expand All @@ -561,11 +588,15 @@ wait_events(
int *seq_number, // IO - Event sequence number
time_t *next_wait_events) // O - notify-get-interval time
{
bool check_jobs = false; // Return value
bool check_jobs = false, // Return value
fetch_jobs = false; // Fetch jobs?
ipp_t *request, // IPP request
*response; // IPP response
ipp_attribute_t *attr; // IPP attribute
int get_interval; // "notify-get-interval" value
int job_id = 0; // Remote job ID
ipp_jstate_t job_state = IPP_JSTATE_ABORTED;
// Remote job state


// Send a Get-Notifications request
Expand All @@ -586,20 +617,66 @@ wait_events(
else
*next_wait_events = time(NULL) + 5;

// Find the last event...
for (attr = ippFindAttribute(response, "notify-sequence-number", IPP_TAG_INTEGER); attr; attr = ippFindNextAttribute(response, "notify-sequence-number", IPP_TAG_INTEGER))
// Process events...
_papplRWLockWrite(printer);

for (attr = ippGetFirstAttribute(response); attr; attr = ippGetNextAttribute(response))
{
// All events for this subscription are job events, so check jobs...
check_jobs = true;
// Look at the current attribute/group...
const char *name; // Attribute name
ipp_tag_t value_tag; // Value type
const char *keyword; // "notify-event" value
int number; // "notify-sequence-number" value

if (ippGetGroupTag(attr) != IPP_TAG_EVENT_NOTIFICATION)
{
// Update proxy (remote) print job...
if (job_id > 0)
check_jobs |= update_proxy_job_no_lock(printer, job_id, job_state);

job_id = 0;
job_state = IPP_JSTATE_ABORTED;
continue;
}

// In the middle of an event...
name = ippGetName(attr);
value_tag = ippGetValueTag(attr);

if (!strcmp(name, "notify-job-id") && value_tag == IPP_TAG_INTEGER)
{
job_id = ippGetInteger(attr, 0);
}
else if (!strcmp(name, "notify-sequence-number") && value_tag == IPP_TAG_INTEGER && (number = ippGetInteger(attr, 0)) > *seq_number)
{
*seq_number = number;
}
else if (!strcmp(name, "notify-subscribed-event") && value_tag == IPP_TAG_KEYWORD)
{
// See what kind of a job event this is...
keyword = ippGetString(attr, 0, NULL);

// See if this is a higher number than before...
if (ippGetInteger(attr, 0) > *seq_number)
*seq_number = ippGetInteger(attr, 0);
if (!strcmp(keyword, "job-fetchable"))
fetch_jobs = true;
}
else if (!strcmp(name, "job-state") && value_tag == IPP_TAG_ENUM)
{
job_state = (ipp_jstate_t)ippGetInteger(attr, 0);
}
}

if (job_id > 0)
check_jobs |= update_proxy_job_no_lock(printer, job_id, job_state);

// If any jobs were released, see if they can be started now...
if (check_jobs)
_papplPrinterCheckJobsNoLock(printer);

_papplRWUnlock(printer);

// Free memory and return...
ippDelete(response);

return (check_jobs);
return (fetch_jobs);
}

2 changes: 2 additions & 0 deletions pappl/printer.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ _papplPrinterDelete(

cupsRWDestroy(&printer->output_rwlock);
cupsRWDestroy(&printer->rwlock);
cupsMutexDestroy(&printer->proxy_jobs_mutex);

free(printer);
}
Expand Down Expand Up @@ -749,6 +750,7 @@ create_printer(
// Initialize printer structure and attributes...
cupsRWInit(&printer->rwlock);
cupsRWInit(&printer->output_rwlock);
cupsMutexInit(&printer->proxy_jobs_mutex);

printer->system = system;
printer->name = strdup(printer_name);
Expand Down

0 comments on commit 062c248

Please sign in to comment.