Skip to content

Commit

Permalink
Merge pull request #37 from yas-okadatech/expand-sutable-and-empty-fi…
Browse files Browse the repository at this point in the history
…elds

add expand_subtable and handling empty fields
  • Loading branch information
u110 authored Oct 23, 2023
2 parents 5493826 + d840140 commit 4c4cdf4
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 12 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ e.g. limit, offset are not supported.
- **basic_auth_username**: Kintone basic auth username Please see Kintone basic auth [here](https://jp.cybozu.help/general/en/admin/list_security/list_ip_basic/basic_auth.html) (string, optional)
- **basic_auth_password**: Kintone basic auth password (string, optional)
- **guest_space_id**: Kintone app belongs to guest space, guest space id is required. (integer, optional)
- **fields** (required)
- **expand_subtable**: Expand subtabble (boolean, default: `false`)
- **fields**: If fields is empty, include all available columns (required)
- **name** the field code of Kintone app record will be retrieved.
- **type** Column values are converted to this embulk type. Available values options are: boolean, long, double, string, json, timestamp) Kintone `SUBTABLE` type is loaded as json text.
- **format** Format of the timestamp if type is timestamp. The format for kintone DATETIME is `%Y-%m-%dT%H:%M:%S%z`.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repositories {
}

group = "io.trocco"
version = "0.1.8"
version = "0.1.9"

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down
64 changes: 59 additions & 5 deletions src/main/java/org/embulk/input/kintone/KintoneClient.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
package org.embulk.input.kintone;

import com.google.common.annotations.VisibleForTesting;
import com.kintone.client.AppClient;
import com.kintone.client.KintoneClientBuilder;
import com.kintone.client.RecordClient;
import com.kintone.client.api.record.CreateCursorRequest;
import com.kintone.client.api.record.CreateCursorResponseBody;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.exception.KintoneApiRuntimeException;
import com.kintone.client.model.app.field.FieldProperty;
import com.kintone.client.model.app.field.SubtableFieldProperty;
import com.kintone.client.model.record.FieldType;
import org.embulk.config.ConfigException;
import org.embulk.util.config.units.ColumnConfig;
import org.embulk.spi.Column;
import org.embulk.spi.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KintoneClient
{
private final Logger logger = LoggerFactory.getLogger(KintoneClient.class);
private static final int FETCH_SIZE = 500;
private RecordClient recordClient;
private AppClient appClient;

public KintoneClient() throws ConfigException
{
}

@VisibleForTesting
protected KintoneClient(AppClient appClient)
{
this.appClient = appClient;
}

@SuppressWarnings("StatementWithEmptyBody")
public void validateAuth(final PluginTask task) throws ConfigException
{
Expand Down Expand Up @@ -57,11 +73,12 @@ else if (task.getToken().isPresent()) {

com.kintone.client.KintoneClient client = builder.build();
this.recordClient = client.record();
this.appClient = client.app();
}

public GetRecordsByCursorResponseBody getResponse(final PluginTask task)
public GetRecordsByCursorResponseBody getResponse(final PluginTask task, final Schema schema)
{
CreateCursorResponseBody cursor = this.createCursor(task);
CreateCursorResponseBody cursor = this.createCursor(task, schema);
try {
return this.recordClient.getRecordsByCursor(cursor.getId());
}
Expand All @@ -84,12 +101,16 @@ public GetRecordsByCursorResponseBody getRecordsByCursor(String cursor)
}
}

public CreateCursorResponseBody createCursor(final PluginTask task)
public CreateCursorResponseBody createCursor(final PluginTask task, final Schema schema)
{
ArrayList<String> fields = new ArrayList<>();
for (ColumnConfig c : task.getFields().getColumns()) {
for (Column c : schema.getColumns()) {
fields.add(c.getName());
}
if (task.getExpandSubtable()) {
List<String> subTableFieldCodes = getFieldCodes(task, FieldType.SUBTABLE);
fields.addAll(subTableFieldCodes);
}
CreateCursorRequest request = new CreateCursorRequest();
request.setApp((long) task.getAppId());
request.setFields(fields);
Expand All @@ -113,4 +134,37 @@ public void deleteCursor(String cursor)
this.logger.error(e.toString());
}
}

public Map<String, FieldProperty> getFields(final PluginTask task)
{
Map<String, FieldProperty> fields = this.appClient.getFormFields(task.getAppId());
if (task.getExpandSubtable()) {
Map<String, FieldProperty> subtableFields = new HashMap<>();
List<String> subtableFieldCodes = new ArrayList<>();
for (Map.Entry<String, FieldProperty> fieldEntry : fields.entrySet()) {
if (fieldEntry.getValue().getType() == FieldType.SUBTABLE) {
subtableFields.putAll(((SubtableFieldProperty) fieldEntry.getValue()).getFields());
subtableFieldCodes.add(fieldEntry.getKey());
}
}
for (String subtableFieldCode : subtableFieldCodes) {
fields.remove(subtableFieldCode);
}
fields.putAll(subtableFields);
}

return fields;
}

public List<String> getFieldCodes(final PluginTask task, FieldType fieldType)
{
ArrayList<String> fieldCodes = new ArrayList<>();
Map<String, FieldProperty> fields = this.appClient.getFormFields(task.getAppId());
for (Map.Entry<String, FieldProperty> entry : fields.entrySet()) {
if (entry.getValue().getType() == fieldType) {
fieldCodes.add(entry.getKey());
}
}
return fieldCodes;
}
}
127 changes: 124 additions & 3 deletions src/main/java/org/embulk/input/kintone/KintoneInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.kintone.client.api.record.CreateCursorResponseBody;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.model.app.field.FieldProperty;
import com.kintone.client.model.record.FieldType;
import com.kintone.client.model.record.Record;
import com.kintone.client.model.record.TableRow;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
Expand All @@ -13,14 +16,21 @@
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.Schema.Builder;
import org.embulk.spi.type.Type;
import org.embulk.spi.type.Types;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.config.modules.TimestampModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class KintoneInputPlugin
implements InputPlugin
Expand All @@ -41,6 +51,10 @@ public ConfigDiff transaction(ConfigSource config,
Schema schema = task.getFields().toSchema();
int taskCount = 1; // number of run() method calls

if (schema.isEmpty()) {
schema = buildSchema(task);
}

return resume(task.toTaskSource(), schema, taskCount, control);
}

Expand Down Expand Up @@ -74,14 +88,30 @@ public TaskReport run(TaskSource taskSource,
client.validateAuth(task);
client.connect(task);

CreateCursorResponseBody cursor = client.createCursor(task);
CreateCursorResponseBody cursor = client.createCursor(task, schema);
GetRecordsByCursorResponseBody cursorResponse = new GetRecordsByCursorResponseBody(true, null);

List<String> subTableFieldCodes = null;
if (task.getExpandSubtable()) {
subTableFieldCodes = client.getFieldCodes(task, FieldType.SUBTABLE);
}

while (cursorResponse.isNext()) {
cursorResponse = client.getRecordsByCursor(cursor.getId());
for (Record record : cursorResponse.getRecords()) {
schema.visitColumns(new KintoneInputColumnVisitor(new KintoneAccessor(record), pageBuilder, task));
pageBuilder.addRecord();
List<Record> records;
if (task.getExpandSubtable()) {
records = expandSubtable(record, subTableFieldCodes);
}
else {
records = new ArrayList<>();
records.add(record);
}

for (Record expandedRecord : records) {
schema.visitColumns(new KintoneInputColumnVisitor(new KintoneAccessor(expandedRecord), pageBuilder, task));
pageBuilder.addRecord();
}
}
pageBuilder.flush();
}
Expand Down Expand Up @@ -113,4 +143,95 @@ protected KintoneClient getKintoneClient()
{
return new KintoneClient();
}

private List<Record> expandSubtable(final Record originalRecord, final List<String> subTableFieldCodes)
{
ArrayList<Record> records = new ArrayList<>();
records.add(cloneRecord(originalRecord));
for (String fieldCode : subTableFieldCodes) {
List<TableRow> tableRows = originalRecord.getSubtableFieldValue(fieldCode);
for (int idx = 0; idx < tableRows.size(); idx++) {
if (records.size() < idx + 1) {
records.add(cloneRecord(originalRecord));
}

TableRow tableRow = tableRows.get(idx);
Record currentRecord = records.get(idx);
Set<String> tableFieldCodes = tableRow.getFieldCodes();
for (String tableFieldCode : tableFieldCodes) {
currentRecord.putField(tableFieldCode, tableRow.getFieldValue(tableFieldCode));
}
}
}
return records;
}

private Record cloneRecord(final Record src)
{
Record dst = new Record(src.getId(), src.getRevision());
for (String fieldCode : src.getFieldCodes(true)) {
dst.putField(fieldCode, src.getFieldValue(fieldCode));
}
return dst;
}

private Schema buildSchema(final PluginTask task)
{
KintoneClient client = getKintoneClient();
client.validateAuth(task);
client.connect(task);

Map<String, FieldProperty> fields = new TreeMap<>(client.getFields(task));
Builder builder = Schema.builder();

// built in schema
builder.add("$id", Types.LONG);
builder.add("$revision", Types.LONG);

for (Map.Entry<String, FieldProperty> fieldEntry : fields.entrySet()) {
builder.add(fieldEntry.getKey(), buildType(fieldEntry.getValue().getType()));
}

return builder.build();
}

private Type buildType(final FieldType fieldType)
{
switch(fieldType) {
case __ID__:
case __REVISION__:
case RECORD_NUMBER:
return Types.LONG;
case CALC:
case NUMBER:
return Types.DOUBLE;
case CREATED_TIME:
case DATETIME:
case UPDATED_TIME:
return Types.TIMESTAMP;
case SUBTABLE:
return Types.JSON;
case CATEGORY:
case CHECK_BOX:
case CREATOR:
case DATE:
case DROP_DOWN:
case FILE:
case GROUP_SELECT:
case LINK:
case MODIFIER:
case MULTI_LINE_TEXT:
case MULTI_SELECT:
case ORGANIZATION_SELECT:
case RADIO_BUTTON:
case RICH_TEXT:
case SINGLE_LINE_TEXT:
case STATUS:
case STATUS_ASSIGNEE:
case TIME:
case USER_SELECT:
default:
return Types.STRING;
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/input/kintone/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public interface PluginTask
@ConfigDefault("null")
Optional<String> getQuery();

@Config("expand_subtable")
@ConfigDefault("false")
boolean getExpandSubtable();

@Config("fields")
SchemaConfig getFields();
}
Loading

0 comments on commit 4c4cdf4

Please sign in to comment.