Skip to content

Commit

Permalink
feat: use jsforce v3/improve perf on commands (#843)
Browse files Browse the repository at this point in the history
originally created to QA for W-14996843

### What does this PR do?
- continued refactoring work for jsforce3 (esp async query to not return results or check even once)
- use jsforce3 bulk2 polling events
- ~~allow --use-tooling-api with --bulk on query (forcedotcom/cli#2667
- reorganize the reporters, mostly using functions.  If I created a bug, it's somewhere in there.  split them into different files, along with their tests
- more UT for reporter logic
- 1 nut fix where bulk delete resume human output wasn't being tested 
- drop support for "sfdx" and ":" in help/message output.  only sf and spaces 
- improve the perf and reduce API call usages for monitoring bulk jobs
- improve the error result if you send an uparseable csv file
- improve the output of the bulk job event updates
- remove the unused chai-as-promised devDep

`Job a07EE00001l5rHsYAI Status Job Complete Records processed 500 Records failed 0.` => 
`Job a07EE00001l5rHsYAI | Status Job Complete | Records processed 500 | Records failed 0.` 

there are some // ts-expect-error because `jsforce-node.Connection` isn't **exactly** identical to `sfdx-core.Connection` that extends jsforce v2 

one obvious difference is that they have a different implementation of `conn.bulk/.bulk2` but there are others about private members (ex: sfdx-core inserts its own logger)

When jsforce-node is rolled out in sfdx-core and sfCommand those will start failing, hopefully, and we can remove them
  • Loading branch information
mshanemc authored Mar 21, 2024
1 parent 7628c10 commit 5bc7157
Show file tree
Hide file tree
Showing 61 changed files with 78,861 additions and 2,722 deletions.
6 changes: 3 additions & 3 deletions messages/batcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Batch Status

Will poll the batch statuses every %s seconds.
To fetch the status on your own, press CTRL+C and use the command:
%s force%sdata%sbulk%sstatus -i %s -b [<batchId>]
sf force data bulk status -i %s -b [<batchId>]

# ExternalIdRequired

Expand All @@ -27,12 +27,12 @@ An External ID is required on %s to perform an upsert.
# TimeOut

The operation timed out. Check the status with command:
%s force%sdata%sbulk%sstatus -i %s -b %s
sf force data bulk status -i %s -b %s

# CheckStatusCommand

Check batch #%s’s status with the command:
%s force%sdata%sbulk%sstatus -i %s -b %s
sf force data bulk status -i %s -b %s

# BatchQueued

Expand Down
8 changes: 4 additions & 4 deletions messages/bulk.base.command.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ Bulk %s request %s started successfully

# checkStatus

Run command %s data %s resume -i %s -o %s to check status.
Run command sf data %s resume -i %s -o %s to check status.

# checkJobViaUi

To review the details of this job, run:
%s org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s"
sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s"

# remainingTimeStatus

Remaining time: %d minutes.
Remaining time: %d minutes

# remainingRecordsStatus

%d/%d/%d records successful/failed/processed.
Processed %d | Success %d | Fail %d

# bulkJobFailed

Expand Down
4 changes: 0 additions & 4 deletions messages/bulk.resume.command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ ID of the job you want to resume.

Use the ID of the most recently-run bulk job.

# flags.targetOrg.summary

Org alias or username to use for the target org.

# flags.wait.summary

Number of minutes to wait for the command to complete before displaying the results.
4 changes: 2 additions & 2 deletions messages/importApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ Data plan file %s did not validate against the schema. Errors: %s.

# dataPlanValidationErrorActions

- Did you run the "%s data%sexport%stree" command with the --plan flag?
- Did you run the "sf data export tree" command with the --plan flag?

- Make sure you're importing a plan definition file.

- Get help with the import plan schema by running "%s data%simport%stree --config-help".
- Get help with the import plan schema by running "sf data import tree --config-help".

# FlsError

Expand Down
2 changes: 1 addition & 1 deletion messages/reporter.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# bulkV2Result

Job %s Status %s Records processed %d. Records failed %d.
Job %s | Status %s | Records processed %d | Records failed %d
4 changes: 0 additions & 4 deletions messages/soql.query.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ Include deleted records. By default, deleted records are not returned.

Time to wait for the command to finish, in minutes.

# flags.targetOrg.summary

Org alias or username to use for the target org.

# displayQueryRecordsRetrieved

Total number of records retrieved: %s.
Expand Down
6 changes: 2 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"version": "oclif readme"
},
"dependencies": {
"@jsforce/jsforce-node": "^3.0.0-next.2",
"@oclif/core": "^3.23.0",
"@salesforce/core": "^6.7.1",
"@salesforce/kit": "^3.0.15",
Expand All @@ -116,16 +117,13 @@
"chalk": "^5.3.0",
"change-case": "^5.4.3",
"csv-parse": "^4.16.3",
"csv-stringify": "^6.4.6",
"jsforce": "^2.0.0-beta.29"
"csv-stringify": "^6.4.6"
},
"devDependencies": {
"@oclif/plugin-command-snapshot": "^5.1.1",
"@salesforce/cli-plugins-testkit": "^5.1.11",
"@salesforce/dev-scripts": "^8.4.2",
"@salesforce/plugin-command-reference": "^3.0.70",
"@types/chai-as-promised": "^7.1.8",
"chai-as-promised": "^7.1.1",
"eslint-plugin-sf-plugin": "^1.17.4",
"oclif": "^4.5.5",
"ts-node": "^10.9.2",
Expand Down
214 changes: 105 additions & 109 deletions src/BulkBaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,128 +5,124 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/


import { SfCommand } from '@salesforce/sf-plugins-core';
import { BulkOperation, IngestJobV2, IngestOperation, JobInfoV2, JobStateV2 } from 'jsforce/lib/api/bulk.js';
import { SfCommand, Spinner } from '@salesforce/sf-plugins-core';
import { IngestJobV2, JobInfoV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js';
import { Duration } from '@salesforce/kit';
import { capitalCase } from 'change-case';
import { Connection, Lifecycle, Messages } from '@salesforce/core';
import { Schema } from 'jsforce';
import { getResultMessage } from './reporters.js';
import { BulkResultV2 } from './types.js';
import { Messages } from '@salesforce/core';
import { Schema } from '@jsforce/jsforce-node';
import { getResultMessage } from './reporters/reporters.js';
import { BulkDataRequestCache } from './bulkDataRequestCache.js';

Messages.importMessagesDirectoryFromMetaUrl(import.meta.url)
Messages.importMessagesDirectoryFromMetaUrl(import.meta.url);
const messages = Messages.loadMessages('@salesforce/plugin-data', 'bulk.base.command');

export abstract class BulkBaseCommand extends SfCommand<BulkResultV2> {
public static readonly enableJsonFlag = true;
protected lifeCycle = Lifecycle.getInstance();
protected job!: IngestJobV2<Schema, IngestOperation>;
protected connection: Connection | undefined;
protected cache: BulkDataRequestCache | undefined;
protected isAsync = false;
protected operation!: BulkOperation;
protected endWaitTime = 0;
protected wait = 0;
private numberRecordsProcessed = 0;
private numberRecordsFailed = 0;
private numberRecordSucceeded = 0;
private timeout = false;
export const setupLifecycleListeners = ({
job,
cache,
username,
apiVersion,
cmd,
isAsync,
endWaitTime,
}: {
job: IngestJobV2<Schema>;
cache?: BulkDataRequestCache;
username?: string;
apiVersion?: string;
cmd: SfCommand<unknown>;
isAsync: boolean;
endWaitTime: number;
}): void => {
// the event emitted by jsforce's polling function
job.on('inProgress', (jobInfo: JobInfoV2) => {
cmd.spinner.status = formatSpinnerProgress(isAsync, endWaitTime, jobInfo);
});
// the event emitted other places in the plugin
job.on('jobProgress', () => {
const handler = async (): Promise<void> => {
const jobInfo = await job.check();
cmd.spinner.status = formatSpinnerProgress(isAsync, endWaitTime, jobInfo);
};
handler().catch((err) => eventListenerErrorHandler(err));
});

job.on('failed', throwAndStopSpinner(cmd.spinner));
job.on('error', throwAndStopSpinner(cmd.spinner));

job.once('jobTimeout', () => {
const handler = async (): Promise<void> => {
await cache?.createCacheEntryForRequest(job.id ?? '', username, apiVersion);
displayBulkV2Result({ jobInfo: await job.check(), username, isAsync, cmd });
};
handler().catch((err) => eventListenerErrorHandler(err));
});
};

protected displayBulkV2Result(jobInfo: JobInfoV2): void {
if (this.isAsync) {
this.logSuccess(messages.getMessage('success', [this.operation, jobInfo.id]));
this.info(
messages.getMessage('checkStatus', [
this.config.bin,
this.operation,
jobInfo.id,
this.connection?.getUsername(),
])
);
} else {
this.log();
this.info(getResultMessage(jobInfo));
if ((jobInfo.numberRecordsFailed ?? 0) > 0 || jobInfo.state === 'Failed') {
this.info(messages.getMessage('checkJobViaUi', [this.config.bin, this.connection?.getUsername(), jobInfo.id]));
process.exitCode = 1;
}
if (jobInfo.state === 'InProgress' || jobInfo.state === 'Open') {
this.info(
messages.getMessage('checkStatus', [
this.config.bin,
this.operation,
jobInfo.id,
this.connection?.getUsername(),
])
);
}
if (jobInfo.state === 'Failed') {
throw messages.createError('bulkJobFailed', [jobInfo.id]);
}
export const displayBulkV2Result = ({
jobInfo,
isAsync,
cmd,
username = 'unspecified user',
}: {
jobInfo: JobInfoV2;
isAsync: boolean;
cmd: SfCommand<unknown>;
username?: string;
}): void => {
if (isAsync && jobInfo.state !== 'JobComplete' && jobInfo.state !== 'Failed') {
cmd.logSuccess(messages.getMessage('success', [jobInfo.operation, jobInfo.id]));
cmd.info(messages.getMessage('checkStatus', [jobInfo.operation, jobInfo.id, username]));
} else {
cmd.log();
cmd.info(getResultMessage(jobInfo));
if ((jobInfo.numberRecordsFailed ?? 0) > 0 || jobInfo.state === 'Failed') {
cmd.info(messages.getMessage('checkJobViaUi', [username, jobInfo.id]));
process.exitCode = 1;
}
if (jobInfo.state === 'InProgress' || jobInfo.state === 'Open') {
cmd.info(messages.getMessage('checkStatus', [jobInfo.operation, jobInfo.id, username]));
}
if (jobInfo.state === 'Failed') {
throw messages.createError('bulkJobFailed', [jobInfo.id]).setData(jobInfo);
}
}
};

protected setupLifecycleListeners(): void {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.job.on('jobProgress', async () => {
const jobInfo = await this.job.check();
this.numberRecordsProcessed = jobInfo.numberRecordsProcessed ?? 0;
this.numberRecordsFailed = jobInfo.numberRecordsFailed ?? 0;
this.numberRecordSucceeded = this.numberRecordsProcessed - this.numberRecordsFailed;
this.spinner.status = `${this.getRemainingTimeStatus()}${this.getStage(
jobInfo.state
)}${this.getRemainingRecordsStatus()}`;
});
const eventListenerErrorHandler = (err: unknown): never => {
throw err instanceof Error || typeof err === 'string' ? err : JSON.stringify(err);
};

this.job.on('failed', (err: Error) => {
try {
this.error(err);
} finally {
this.spinner.stop();
}
});
const throwAndStopSpinner =
(spinner: Spinner) =>
(err: Error): void => {
try {
throw err;
} finally {
spinner.stop();
}
};

this.job.on('error', (message: string) => {
try {
this.error(message);
} finally {
this.spinner.stop();
}
});
export const getRemainingTimeStatus = ({ isAsync, endWaitTime }: { isAsync: boolean; endWaitTime: number }): string =>
isAsync ? '' : messages.getMessage('remainingTimeStatus', [Duration.milliseconds(endWaitTime - Date.now()).minutes]);

// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.job.on('jobTimeout', async () => {
if (!this.timeout) {
this.timeout = true;
await this.cache?.createCacheEntryForRequest(
this.job.id ?? '',
this.connection?.getUsername(),
this.connection?.getApiVersion()
);
this.displayBulkV2Result(await this.job.check());
}
});
}
const formatSpinnerProgress = (isAsync: boolean, endWaitTime: number, jobInfo: JobInfoV2): string =>
`${getRemainingTimeStatus({
isAsync,
endWaitTime,
})} | ${getStage(jobInfo.state)} | ${getRemainingRecordsStatus(jobInfo)}`;

protected getRemainingTimeStatus(): string {
return this.isAsync
? ''
: messages.getMessage('remainingTimeStatus', [Duration.milliseconds(this.endWaitTime - Date.now()).minutes]);
}
const getStage = (state: JobInfoV2['state']): string => ` Stage: ${capitalCase(state)}`;

protected getRemainingRecordsStatus(): string {
// the leading space is intentional
return ` ${messages.getMessage('remainingRecordsStatus', [
this.numberRecordSucceeded,
this.numberRecordsFailed,
this.numberRecordsProcessed,
])}`;
}
const getRemainingRecordsStatus = (jobInfo: JobInfoV2): string => {
const numberRecordsProcessed = jobInfo.numberRecordsProcessed ?? 0;
const numberRecordsFailed = jobInfo.numberRecordsFailed ?? 0;
const numberRecordSucceeded = numberRecordsProcessed - numberRecordsFailed;

// eslint-disable-next-line class-methods-use-this
protected getStage(state: JobStateV2): string {
return ` Stage: ${capitalCase(state)}.`;
}
}
// the leading space is intentional
return ` ${messages.getMessage('remainingRecordsStatus', [
numberRecordsProcessed,
numberRecordSucceeded,
numberRecordsFailed,
])}`;
};
3 changes: 1 addition & 2 deletions src/api/data/tree/exportApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import path from 'node:path';
import fs from 'node:fs';

import { Logger, Messages, Org, SfError, Lifecycle } from '@salesforce/core';
import { DescribeSObjectResult, QueryResult } from 'jsforce';
import { DescribeSObjectResult, QueryResult } from '@jsforce/jsforce-node';
import { Ux } from '@salesforce/sf-plugins-core';
import {
BasicRecord,
Expand Down Expand Up @@ -273,7 +273,6 @@ export class ExportApi {
treeRecord[key] = ref && ref !== id ? `@${ref}` : id;
return;
}
// TODO: what to do if ref not found?
const recordId: string = record['Id'] as string;
this.logger.error(`Reference ${relTo} not found for ${key}. Skipping record ${recordId}.`);
return;
Expand Down
11 changes: 2 additions & 9 deletions src/api/data/tree/importApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class ImportApi {
private config!: ImportConfig;
private importPlanConfig: DataPlanPart[] = [];

public constructor(private readonly org: Org, private readonly cli: string, private readonly separator: string) {
public constructor(private readonly org: Org) {
this.logger = Logger.childFromRoot(this.constructor.name);
this.schemaValidator = new SchemaValidator(this.logger, importPlanSchemaFile);
}
Expand Down Expand Up @@ -274,14 +274,7 @@ export class ImportApi {
throw new SfError(
messages.getMessage('dataPlanValidationError', [planPath, error.message]),
INVALID_DATA_IMPORT_ERR_NAME,
messages.getMessages('dataPlanValidationErrorActions', [
this.cli,
this.separator,
this.separator,
this.cli,
this.separator,
this.separator,
])
messages.getMessages('dataPlanValidationErrorActions')
);
}
throw SfError.wrap(error);
Expand Down
Loading

0 comments on commit 5bc7157

Please sign in to comment.