Skip to content

Commit 2eae61d

Browse files
authored
feat: Support BULK 2.0 API (#63)
The BULK 2.0 API is faster and simpler to use. It's also specifically built to work with larger data sets. From a user's point of view, using the classic BULK API is the same. All you need to do is change the API type to "BULK2."
1 parent d5269a7 commit 2eae61d

File tree

4 files changed

+100
-5
lines changed

4 files changed

+100
-5
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pip install git+https://github.com/MeltanoLabs/tap-salesforce.git
3030
**Required**
3131
```
3232
{
33-
"api_type": "BULK",
33+
"api_type": "BULK2",
3434
"select_fields_by_default": true,
3535
}
3636
```
@@ -67,7 +67,7 @@ The `client_id` and `client_secret` keys are your OAuth Salesforce App secrets.
6767

6868
The `start_date` is used by the tap as a bound on SOQL queries when searching for records. This should be an [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) formatted date-time, like "2018-01-08T00:00:00Z". For more details, see the [Singer best practices for dates](https://github.com/singer-io/getting-started/blob/master/BEST_PRACTICES.md#dates).
6969

70-
The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default.
70+
The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST", "BULK" and "BULK 2.0" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default.
7171

7272
The `state_message_threshold` is used to throttle how often STATE messages are generated when the tap is using the "REST" API. This is a balance between not slowing down execution due to too many STATE messages produced and how many records must be fetched again if a tap fails unexpectedly. Defaults to 1000 (generate a STATE message every 1000 records).
7373

tap_salesforce/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def do_discover(sf: Salesforce, streams: list[str]):
186186
f, mdata)
187187

188188
# Compound Address fields cannot be queried by the Bulk API
189-
if f['type'] in ("address", "location") and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE:
189+
if f['type'] in ("address", "location") and sf.api_type in [tap_salesforce.salesforce.BULK_API_TYPE, tap_salesforce.salesforce.BULK2_API_TYPE]:
190190
unsupported_fields.add(
191191
(field_name, 'cannot query compound address fields with bulk API'))
192192

tap_salesforce/salesforce/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from singer import metadata, metrics
1010

1111
from tap_salesforce.salesforce.bulk import Bulk
12+
from tap_salesforce.salesforce.bulk2 import Bulk2
1213
from tap_salesforce.salesforce.rest import Rest
1314
from tap_salesforce.salesforce.exceptions import (
1415
TapSalesforceException,
@@ -20,6 +21,7 @@
2021
LOGGER = singer.get_logger()
2122

2223
BULK_API_TYPE = "BULK"
24+
BULK2_API_TYPE = "BULK2"
2325
REST_API_TYPE = "REST"
2426

2527
STRING_TYPES = set([
@@ -388,6 +390,9 @@ def query(self, catalog_entry, state):
388390
if self.api_type == BULK_API_TYPE:
389391
bulk = Bulk(self)
390392
return bulk.query(catalog_entry, state)
393+
elif self.api_type == BULK2_API_TYPE:
394+
bulk = Bulk2(self)
395+
return bulk.query(catalog_entry, state)
391396
elif self.api_type == REST_API_TYPE:
392397
rest = Rest(self)
393398
return rest.query(catalog_entry, state)
@@ -397,7 +402,7 @@ def query(self, catalog_entry, state):
397402
self.api_type))
398403

399404
def get_blacklisted_objects(self):
400-
if self.api_type == BULK_API_TYPE:
405+
if self.api_type in [BULK_API_TYPE, BULK2_API_TYPE]:
401406
return UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS.union(
402407
QUERY_RESTRICTED_SALESFORCE_OBJECTS).union(QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS)
403408
elif self.api_type == REST_API_TYPE:
@@ -409,7 +414,7 @@ def get_blacklisted_objects(self):
409414

410415
# pylint: disable=line-too-long
411416
def get_blacklisted_fields(self):
412-
if self.api_type == BULK_API_TYPE:
417+
if self.api_type == BULK_API_TYPE or self.api_type == BULK2_API_TYPE:
413418
return {('EntityDefinition', 'RecordTypesSupported'): "this field is unsupported by the Bulk API."}
414419
elif self.api_type == REST_API_TYPE:
415420
return {}

tap_salesforce/salesforce/bulk2.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import time
2+
import csv
3+
import sys
4+
import json
5+
import singer
6+
from singer import metrics
7+
8+
9+
BATCH_STATUS_POLLING_SLEEP = 20
10+
DEFAULT_CHUNK_SIZE = 50000
11+
12+
LOGGER = singer.get_logger()
13+
14+
class Bulk2():
15+
bulk_url = '{}/services/data/v60.0/jobs/query'
16+
17+
def __init__(self, sf):
18+
csv.field_size_limit(sys.maxsize)
19+
self.sf = sf
20+
21+
22+
def query(self, catalog_entry, state):
23+
job_id = self._create_job(catalog_entry, state)
24+
self._wait_for_job(job_id)
25+
26+
for batch in self._get_next_batch(job_id):
27+
reader = csv.DictReader(batch.decode('utf-8').splitlines())
28+
29+
for row in reader:
30+
yield row
31+
32+
33+
def _get_bulk_headers(self):
34+
return {**self.sf.auth.rest_headers, "Content-Type": "application/json"}
35+
36+
def _create_job(self, catalog_entry, state):
37+
url = self.bulk_url.format(self.sf.instance_url)
38+
start_date = self.sf.get_start_date(state, catalog_entry)
39+
40+
query = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=False)
41+
42+
body = {
43+
"operation": "query",
44+
"query": query,
45+
}
46+
47+
with metrics.http_request_timer("create_job") as timer:
48+
timer.tags['sobject'] = catalog_entry['stream']
49+
resp = self.sf._make_request(
50+
'POST',
51+
url,
52+
headers=self._get_bulk_headers(),
53+
body=json.dumps(body))
54+
55+
job = resp.json()
56+
57+
return job['id']
58+
59+
def _wait_for_job(self, job_id):
60+
status_url = self.bulk_url + '/{}'
61+
url = status_url.format(self.sf.instance_url, job_id)
62+
status = None
63+
64+
while status not in ('JobComplete', 'Failed'):
65+
resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers()).json()
66+
status = resp['state']
67+
68+
if status == 'JobComplete':
69+
break
70+
71+
if status == 'Failed':
72+
raise Exception("Job failed: {}".format(resp.json()))
73+
74+
time.sleep(BATCH_STATUS_POLLING_SLEEP)
75+
76+
def _get_next_batch(self, job_id):
77+
url = self.bulk_url + '/{}/results'
78+
url = url.format(self.sf.instance_url, job_id)
79+
locator = ''
80+
81+
while locator != 'null':
82+
params = {"maxRecords": DEFAULT_CHUNK_SIZE}
83+
84+
if locator != '':
85+
params['locator'] = locator
86+
87+
resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers(), params=params)
88+
locator = resp.headers.get('Sforce-Locator')
89+
90+
yield resp.content

0 commit comments

Comments
 (0)