Skip to content

Commit

Permalink
apacheGH-36141: [Go] Support large and fixed types in csv (apache#36142)
Browse files Browse the repository at this point in the history
### Rationale for this change

### What changes are included in this PR?

### Are these changes tested?
Yes

### Are there any user-facing changes?
Yes

* Closes: apache#36141

Authored-by: izveigor <izveigor@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
izveigor authored Jun 21, 2023
1 parent ea4f03a commit d7b3d4f
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 33 deletions.
6 changes: 3 additions & 3 deletions go/arrow/csv/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ func validate(schema *arrow.Schema) {
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType:
case *arrow.StringType, *arrow.LargeStringType:
case *arrow.TimestampType:
case *arrow.Date32Type, *arrow.Date64Type:
case *arrow.Decimal128Type, *arrow.Decimal256Type:
case *arrow.ListType:
case *arrow.BinaryType:
case *arrow.ListType, *arrow.LargeListType, *arrow.FixedSizeListType:
case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType:
case arrow.ExtensionType:
default:
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft))
Expand Down
134 changes: 133 additions & 1 deletion go/arrow/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,21 @@ func (r *Reader) initFieldConverter(bldr array.Builder) func(string) {
bldr.(*array.StringBuilder).Append(str)
}
}
case *arrow.LargeStringType:
// specialize the implementation when we know we cannot have nulls
if r.stringsCanBeNull {
return func(str string) {
if r.isNull(str) {
bldr.AppendNull()
} else {
bldr.(*array.LargeStringBuilder).Append(str)
}
}
} else {
return func(str string) {
bldr.(*array.LargeStringBuilder).Append(str)
}
}
case *arrow.TimestampType:
return func(str string) {
r.parseTimestamp(bldr, str, dt.Unit)
Expand All @@ -475,10 +490,26 @@ func (r *Reader) initFieldConverter(bldr array.Builder) func(string) {
return func(s string) {
r.parseList(bldr, s)
}
case *arrow.LargeListType:
return func(s string) {
r.parseLargeList(bldr, s)
}
case *arrow.FixedSizeListType:
return func(s string) {
r.parseFixedSizeList(bldr, s, int(dt.Len()))
}
case *arrow.BinaryType:
return func(s string) {
r.parseBinaryType(bldr, s)
}
case *arrow.LargeBinaryType:
return func(s string) {
r.parseLargeBinaryType(bldr, s)
}
case *arrow.FixedSizeBinaryType:
return func(s string) {
r.parseFixedSizeBinaryType(bldr, s, dt.Bytes())
}
case arrow.ExtensionType:
return func(s string) {
r.parseExtension(bldr, s)
Expand Down Expand Up @@ -783,6 +814,68 @@ func (r *Reader) parseList(field array.Builder, str string) {
}
}

func (r *Reader) parseLargeList(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
if !(strings.HasPrefix(str, "{") && strings.HasSuffix(str, "}")) {
r.err = errors.New("invalid list format. should start with '{' and end with '}'")
return
}
str = strings.Trim(str, "{}")
largeListBldr := field.(*array.LargeListBuilder)
largeListBldr.Append(true)
if len(str) == 0 {
// we don't want to create the csv reader if we already know the
// string is empty
return
}
valueBldr := largeListBldr.ValueBuilder()
reader := csv.NewReader(strings.NewReader(str))
items, err := reader.Read()
if err != nil {
r.err = err
return
}
for _, str := range items {
r.initFieldConverter(valueBldr)(str)
}
}

func (r *Reader) parseFixedSizeList(field array.Builder, str string, n int) {
if r.isNull(str) {
field.AppendNull()
return
}
if !(strings.HasPrefix(str, "{") && strings.HasSuffix(str, "}")) {
r.err = errors.New("invalid list format. should start with '{' and end with '}'")
return
}
str = strings.Trim(str, "{}")
fixedSizeListBldr := field.(*array.FixedSizeListBuilder)
fixedSizeListBldr.Append(true)
if len(str) == 0 {
// we don't want to create the csv reader if we already know the
// string is empty
return
}
valueBldr := fixedSizeListBldr.ValueBuilder()
reader := csv.NewReader(strings.NewReader(str))
items, err := reader.Read()
if err != nil {
r.err = err
return
}
if len(items) == n {
for _, str := range items {
r.initFieldConverter(valueBldr)(str)
}
} else {
r.err = fmt.Errorf("%w: fixed size list items should match the fixed size list length, expected %d, got %d", arrow.ErrInvalid, n, len(items))
}
}

func (r *Reader) parseBinaryType(field array.Builder, str string) {
// specialize the implementation when we know we cannot have nulls
if r.isNull(str) {
Expand All @@ -791,11 +884,50 @@ func (r *Reader) parseBinaryType(field array.Builder, str string) {
}
decodedVal, err := base64.StdEncoding.DecodeString(str)
if err != nil {
panic("cannot decode base64 string " + str)
r.err = fmt.Errorf("cannot decode base64 string %s", str)
field.AppendNull()
return
}

field.(*array.BinaryBuilder).Append(decodedVal)
}

func (r *Reader) parseLargeBinaryType(field array.Builder, str string) {
// specialize the implementation when we know we cannot have nulls
if r.isNull(str) {
field.AppendNull()
return
}
decodedVal, err := base64.StdEncoding.DecodeString(str)
if err != nil {
r.err = fmt.Errorf("cannot decode base64 string %s", str)
field.AppendNull()
return
}

field.(*array.BinaryBuilder).Append(decodedVal)
}

func (r *Reader) parseFixedSizeBinaryType(field array.Builder, str string, byteWidth int) {
// specialize the implementation when we know we cannot have nulls
if r.isNull(str) {
field.AppendNull()
return
}
decodedVal, err := base64.StdEncoding.DecodeString(str)
if err != nil {
r.err = fmt.Errorf("cannot decode base64 string %s", str)
field.AppendNull()
return
}

if len(decodedVal) == byteWidth {
field.(*array.FixedSizeBinaryBuilder).Append(decodedVal)
} else {
r.err = fmt.Errorf("%w: the length of fixed size binary value should match the fixed size binary byte width, expected %d, got %d", arrow.ErrInvalid, byteWidth, len(decodedVal))
}
}

func (r *Reader) parseExtension(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
Expand Down
22 changes: 21 additions & 1 deletion go/arrow/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,14 @@ func testCSVReader(t *testing.T, filepath string, withHeader bool, stringsCanBeN
{Name: "f32", Type: arrow.PrimitiveTypes.Float32},
{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
{Name: "str", Type: arrow.BinaryTypes.String},
{Name: "large_str", Type: arrow.BinaryTypes.LargeString},
{Name: "ts", Type: arrow.FixedWidthTypes.Timestamp_ms},
{Name: "list(i64)", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)},
{Name: "large_list(i64)", Type: arrow.LargeListOf(arrow.PrimitiveTypes.Int64)},
{Name: "fixed_size_list(i64)", Type: arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int64)},
{Name: "binary", Type: arrow.BinaryTypes.Binary},
{Name: "large_binary", Type: arrow.BinaryTypes.LargeBinary},
{Name: "fixed_size_binary", Type: &arrow.FixedSizeBinaryType{ByteWidth: 3}},
{Name: "uuid", Type: types.NewUUIDType()},
},
nil,
Expand Down Expand Up @@ -406,9 +411,14 @@ rec[0]["f16"]: [1.0996094]
rec[0]["f32"]: [1.1]
rec[0]["f64"]: [1.1]
rec[0]["str"]: ["str-1"]
rec[0]["large_str"]: ["str-1"]
rec[0]["ts"]: [1652054461000]
rec[0]["list(i64)"]: [[1 2 3]]
rec[0]["large_list(i64)"]: [[1 2 3]]
rec[0]["fixed_size_list(i64)"]: [[1 2 3]]
rec[0]["binary"]: ["\x00\x01\x02"]
rec[0]["large_binary"]: ["\x00\x01\x02"]
rec[0]["fixed_size_binary"]: ["\x00\x01\x02"]
rec[0]["uuid"]: ["00000000-0000-0000-0000-000000000001"]
rec[1]["bool"]: [false]
rec[1]["i8"]: [-2]
Expand All @@ -423,9 +433,14 @@ rec[1]["f16"]: [2.1992188]
rec[1]["f32"]: [2.2]
rec[1]["f64"]: [2.2]
rec[1]["str"]: [%s]
rec[1]["large_str"]: [%s]
rec[1]["ts"]: [1652140799000]
rec[1]["list(i64)"]: [[]]
rec[1]["large_list(i64)"]: [[]]
rec[1]["fixed_size_list(i64)"]: [[4 5 6]]
rec[1]["binary"]: [(null)]
rec[1]["large_binary"]: [(null)]
rec[1]["fixed_size_binary"]: [(null)]
rec[1]["uuid"]: ["00000000-0000-0000-0000-000000000002"]
rec[2]["bool"]: [(null)]
rec[2]["i8"]: [(null)]
Expand All @@ -440,11 +455,16 @@ rec[2]["f16"]: [(null)]
rec[2]["f32"]: [(null)]
rec[2]["f64"]: [(null)]
rec[2]["str"]: [%s]
rec[2]["large_str"]: [%s]
rec[2]["ts"]: [(null)]
rec[2]["list(i64)"]: [(null)]
rec[2]["large_list(i64)"]: [(null)]
rec[2]["fixed_size_list(i64)"]: [(null)]
rec[2]["binary"]: [(null)]
rec[2]["large_binary"]: [(null)]
rec[2]["fixed_size_binary"]: [(null)]
rec[2]["uuid"]: [(null)]
`, str1Value, str2Value)
`, str1Value, str1Value, str2Value, str2Value)
got, want := out.String(), want
require.Equal(t, want, got)

Expand Down
8 changes: 4 additions & 4 deletions go/arrow/csv/testdata/header.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
bool;i8;i16;i32;i64;u8;u16;u32;u64;f16;f32;f64;str;ts;list(i64);binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;2022-05-09T00:01:01;{1,2,3};AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;2022-05-09T23:59:59;{};;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null
bool;i8;i16;i32;i64;u8;u16;u32;u64;f16;f32;f64;str;large_str;ts;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
8 changes: 4 additions & 4 deletions go/arrow/csv/testdata/types.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
## supported types: bool;int8;int16;int32;int64;uint8;uint16;uint32;uint64;float16;float32;float64;string;timestamp;binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;2022-05-09T00:01:01;{1,2,3};AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;2022-05-09T23:59:59;{};;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null
## supported types: bool;int8;int16;int32;int64;uint8;uint16;uint32;uint64;float16;float32;float64;string;large_string;timestamp;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
65 changes: 65 additions & 0 deletions go/arrow/csv/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array) []
res[i] = w.nullValue
}
}
case *arrow.LargeStringType:
arr := col.(*array.LargeString)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
res[i] = arr.Value(i)
} else {
res[i] = w.nullValue
}
}
case *arrow.Date32Type:
arr := col.(*array.Date32)
for i := 0; i < arr.Len(); i++ {
Expand Down Expand Up @@ -225,6 +234,44 @@ func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array) []
res[i] = w.nullValue
}
}
case *arrow.LargeListType:
arr := col.(*array.LargeList)
listVals, offsets := arr.ListValues(), arr.Offsets()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
list := array.NewSlice(listVals, int64(offsets[i]), int64(offsets[i+1]))
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
} else {
res[i] = w.nullValue
}
}
case *arrow.FixedSizeListType:
arr := col.(*array.FixedSizeList)
listVals := arr.ListValues()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
list := array.NewSlice(listVals, int64((arr.Len()-1)*i), int64((arr.Len()-1)*(i+1)))
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
} else {
res[i] = w.nullValue
}
}
case *arrow.BinaryType:
arr := col.(*array.Binary)
for i := 0; i < arr.Len(); i++ {
Expand All @@ -234,6 +281,24 @@ func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array) []
res[i] = w.nullValue
}
}
case *arrow.LargeBinaryType:
arr := col.(*array.LargeBinary)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
res[i] = base64.StdEncoding.EncodeToString(arr.Value(i))
} else {
res[i] = w.nullValue
}
}
case *arrow.FixedSizeBinaryType:
arr := col.(*array.FixedSizeBinary)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
res[i] = base64.StdEncoding.EncodeToString(arr.Value(i))
} else {
res[i] = w.nullValue
}
}
case arrow.ExtensionType:
arr := col.(array.ExtensionArray)
for i := 0; i < arr.Len(); i++ {
Expand Down
Loading

0 comments on commit d7b3d4f

Please sign in to comment.