Fix struct map parsing

This commit is contained in:
Kenny Daniel 2024-03-11 19:35:57 -07:00
parent 76c4278f40
commit b14809a71a
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 190 additions and 15 deletions

@ -1,7 +1,7 @@
import { getColumnOffset, readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
import { getColumnName } from './schema.js'
import { getColumnName, isMapLike } from './schema.js'
/**
* Read parquet data rows from a file-like object.
@ -109,6 +109,8 @@ async function readRowGroup(options, rowGroup) {
/** @type {any[][]} */
const groupData = []
const promises = []
const maps = new Map()
let outputColumnIndex = 0
// read column data
for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) {
const columnMetadata = rowGroup.columns[columnIndex].meta_data
@ -127,6 +129,7 @@ async function readRowGroup(options, rowGroup) {
// TODO: stream process the data, returning only the requested rows
if (columnBytes > 1 << 30) {
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`)
// TODO: set column to new Error('parquet column too large')
continue
}
@ -143,16 +146,51 @@ async function readRowGroup(options, rowGroup) {
// read column data async
promises.push(buffer.then(arrayBuffer => {
// TODO: extract SchemaElement for this column
const columnData = readColumn(
/** @type {ArrayLike<any> | undefined} */
let columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors
)
if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)
}
if (isMapLike(metadata.schema, columnMetadata.path_in_schema)) {
const name = columnMetadata.path_in_schema.slice(0, -2).join('.')
if (!maps.has(name)) {
maps.set(name, columnData)
columnData = undefined // do not emit column data until both key and value are read
} else {
if (columnMetadata.path_in_schema[0] === 'key') {
throw new Error('parquet map-like column key is not first') // TODO: support value-first
} else {
const values = columnData
const keys = maps.get(name)
const out = []
if (keys.length !== values.length) {
throw new Error('parquet map-like column key/value length mismatch')
}
// assemble map-like column data
for (let i = 0; i < keys.length; i++) {
/** @type {Record<string, any>} */
const obj = {}
for (let j = 0; j < keys[i].length; j++) {
obj[keys[i][j]] = values[i][j]
}
out.push(obj)
}
columnData = out
}
maps.delete(name)
}
}
// do not emit column data until structs are fully parsed
if (!columnData) return
// notify caller of column data
if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length })
// add column data to group data only if onComplete is defined
if (options.onComplete) addColumn(groupData, columnIndex, columnData)
if (options.onComplete) addColumn(groupData, outputColumnIndex, columnData)
outputColumnIndex++
}))
}
await Promise.all(promises)

@ -126,7 +126,7 @@ export function skipDefinitionBytes(num) {
* @returns {string} column name
*/
export function getColumnName(schema, path) {
if (isListLike(schema, path)) {
if (isListLike(schema, path) || isMapLike(schema, path)) {
return path.slice(0, -2).join('.')
} else {
return path.join('.')
@ -155,3 +155,29 @@ function isListLike(schemaElements, path) {
return true
}
/**
* Check if a column is map-like.
*
* @param {SchemaElement[]} schemaElements parquet schema elements
* @param {string[]} path column path
* @returns {boolean} true if map-like
*/
export function isMapLike(schemaElements, path) {
const schema = schemaElement(schemaElements, path.slice(0, -2))
if (path.length < 3) return false
if (schema.element.converted_type !== 'MAP') return false
if (schema.children.length > 1) return false
const firstChild = schema.children[0]
if (firstChild.children.length !== 2) return false
if (firstChild.element.repetition_type !== 'REPEATED') return false
const keyChild = firstChild.children.find(child => child.element.name === 'key')
if (keyChild?.element.repetition_type !== 'REQUIRED') return false
const valueChild = firstChild.children.find(child => child.element.name === 'value')
if (valueChild?.element.repetition_type === 'REPEATED') return false
return true
}

5
test/files/Int_Map.json Normal file

@ -0,0 +1,5 @@
[
[{ "k1": 1, "k2": 100 }],
[{ "k1": 2 }],
[{ }]
]

@ -0,0 +1,78 @@
{
"version": 1,
"created_by": "DuckDB",
"metadata_length": 241,
"num_rows": 3,
"row_groups": [
{
"columns": [
{
"file_offset": 0,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 4,
"encodings": ["PLAIN"],
"num_values": 5,
"path_in_schema": ["int_map", "key_value", "key"],
"statistics": {
"max": "k2",
"min": "k1"
},
"total_compressed_size": 60,
"total_uncompressed_size": 63,
"type": "BYTE_ARRAY"
}
},
{
"file_offset": 0,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 64,
"encodings": ["PLAIN"],
"num_values": 5,
"path_in_schema": ["int_map", "key_value", "value"],
"statistics": {
"max": "d\u0000\u0000\u0000",
"min": "\u0001\u0000\u0000\u0000"
},
"total_compressed_size": 52,
"total_uncompressed_size": 53,
"type": "INT32"
}
}
],
"num_rows": 3,
"total_byte_size": 74752
}
],
"schema": [
{
"name": "duckdb_schema",
"num_children": 1,
"repetition_type": "REQUIRED"
},
{
"converted_type": "MAP",
"name": "int_map",
"num_children": 1,
"repetition_type": "OPTIONAL"
},
{
"name": "key_value",
"num_children": 2,
"repetition_type": "REPEATED"
},
{
"converted_type": "UTF8",
"name": "key",
"repetition_type": "REQUIRED",
"type": "BYTE_ARRAY"
},
{
"converted_type": "INT_32",
"name": "value",
"repetition_type": "OPTIONAL",
"type": "INT32"
}
]
}

BIN
test/files/Int_Map.parquet Normal file

Binary file not shown.

@ -17,7 +17,7 @@ const compressors = {
}
describe('parquetRead', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
const files = fs.readdirSync('test/files').filter(f => f.endsWith('Int_Map.parquet'))
files.forEach(file => {
it(`should parse data from ${file}`, async () => {
@ -50,11 +50,11 @@ describe('parquetRead', () => {
onComplete: (rows) => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[,, 2],
[,, 3],
[,, 4],
[,, 5],
[,, 2],
[2],
[3],
[4],
[5],
[2],
])
},
})
@ -76,11 +76,39 @@ describe('parquetRead', () => {
onComplete: (rows) => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[,,,, [1, 2, 3]],
[,,,, null],
[,,,, null],
[,,,, [1, 2, 3]],
[,,,, [1, 2]],
[[1, 2, 3]],
[null],
[null],
[[1, 2, 3]],
[[1, 2]],
])
},
})
})
it('should read a map-like column from a file', async () => {
const asyncBuffer = fileToAsyncBuffer('test/files/Int_Map.parquet')
await parquetRead({
file: asyncBuffer,
columns: ['int_map'],
onChunk: (rows) => {
expect(toJson(rows)).toEqual({
columnName: 'int_map',
columnData: [
{ k1: 1, k2: 100 },
{ k1: 2 },
{ },
],
rowStart: 0,
rowEnd: 3,
})
},
onComplete: (rows) => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[{ k1: 1, k2: 100 }],
[{ k1: 2 }],
[{}],
])
},
})