diff --git a/src/read.js b/src/read.js index c2cdc91..f95a9ee 100644 --- a/src/read.js +++ b/src/read.js @@ -1,7 +1,7 @@ import { getColumnOffset, readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' -import { getColumnName } from './schema.js' +import { getColumnName, isMapLike } from './schema.js' /** * Read parquet data rows from a file-like object. @@ -109,6 +109,8 @@ async function readRowGroup(options, rowGroup) { /** @type {any[][]} */ const groupData = [] const promises = [] + const maps = new Map() + let outputColumnIndex = 0 // read column data for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { const columnMetadata = rowGroup.columns[columnIndex].meta_data @@ -127,6 +129,7 @@ async function readRowGroup(options, rowGroup) { // TODO: stream process the data, returning only the requested rows if (columnBytes > 1 << 30) { console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`) + // TODO: set column to new Error('parquet column too large') continue } @@ -143,16 +146,51 @@ async function readRowGroup(options, rowGroup) { // read column data async promises.push(buffer.then(arrayBuffer => { // TODO: extract SchemaElement for this column - const columnData = readColumn( + /** @type {ArrayLike | undefined} */ + let columnData = readColumn( arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors ) if (columnData.length !== Number(rowGroup.num_rows)) { throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) } + + if (isMapLike(metadata.schema, columnMetadata.path_in_schema)) { + const name = columnMetadata.path_in_schema.slice(0, -2).join('.') + if (!maps.has(name)) { + maps.set(name, columnData) + columnData = undefined // do not emit column data until both key and value are read + } else { + if (columnMetadata.path_in_schema[0] === 'key') { + throw new Error('parquet map-like column key is not first') // TODO: support value-first + } else { + const values = columnData + const keys = maps.get(name) + const out = [] + if (keys.length !== values.length) { + throw new Error('parquet map-like column key/value length mismatch') + } + // assemble map-like column data + for (let i = 0; i < keys.length; i++) { + /** @type {Record} */ + const obj = {} + for (let j = 0; j < keys[i].length; j++) { + obj[keys[i][j]] = values[i][j] + } + out.push(obj) + } + columnData = out + } + maps.delete(name) + } + } + + // do not emit column data until structs are fully parsed + if (!columnData) return // notify caller of column data if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length }) // add column data to group data only if onComplete is defined - if (options.onComplete) addColumn(groupData, columnIndex, columnData) + if (options.onComplete) addColumn(groupData, outputColumnIndex, columnData) + outputColumnIndex++ })) } await Promise.all(promises) diff --git a/src/schema.js b/src/schema.js index c25b535..39038a6 100644 --- a/src/schema.js +++ b/src/schema.js @@ -126,7 +126,7 @@ export function skipDefinitionBytes(num) { * @returns {string} column name */ export function getColumnName(schema, path) { - if (isListLike(schema, path)) { + if (isListLike(schema, path) || isMapLike(schema, path)) { return path.slice(0, -2).join('.') } else { return path.join('.') @@ -155,3 +155,29 @@ function isListLike(schemaElements, path) { return true } + +/** + * Check if a column is map-like. + * + * @param {SchemaElement[]} schemaElements parquet schema elements + * @param {string[]} path column path + * @returns {boolean} true if map-like + */ +export function isMapLike(schemaElements, path) { + const schema = schemaElement(schemaElements, path.slice(0, -2)) + if (path.length < 3) return false + if (schema.element.converted_type !== 'MAP') return false + if (schema.children.length > 1) return false + + const firstChild = schema.children[0] + if (firstChild.children.length !== 2) return false + if (firstChild.element.repetition_type !== 'REPEATED') return false + + const keyChild = firstChild.children.find(child => child.element.name === 'key') + if (keyChild?.element.repetition_type !== 'REQUIRED') return false + + const valueChild = firstChild.children.find(child => child.element.name === 'value') + if (valueChild?.element.repetition_type === 'REPEATED') return false + + return true +} diff --git a/test/files/Int_Map.json b/test/files/Int_Map.json new file mode 100644 index 0000000..d689f50 --- /dev/null +++ b/test/files/Int_Map.json @@ -0,0 +1,5 @@ +[ + [{ "k1": 1, "k2": 100 }], + [{ "k1": 2 }], + [{ }] +] diff --git a/test/files/Int_Map.metadata.json b/test/files/Int_Map.metadata.json new file mode 100644 index 0000000..0a1d93e --- /dev/null +++ b/test/files/Int_Map.metadata.json @@ -0,0 +1,78 @@ +{ + "version": 1, + "created_by": "DuckDB", + "metadata_length": 241, + "num_rows": 3, + "row_groups": [ + { + "columns": [ + { + "file_offset": 0, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 4, + "encodings": ["PLAIN"], + "num_values": 5, + "path_in_schema": ["int_map", "key_value", "key"], + "statistics": { + "max": "k2", + "min": "k1" + }, + "total_compressed_size": 60, + "total_uncompressed_size": 63, + "type": "BYTE_ARRAY" + } + }, + { + "file_offset": 0, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 64, + "encodings": ["PLAIN"], + "num_values": 5, + "path_in_schema": ["int_map", "key_value", "value"], + "statistics": { + "max": "d\u0000\u0000\u0000", + "min": "\u0001\u0000\u0000\u0000" + }, + "total_compressed_size": 52, + "total_uncompressed_size": 53, + "type": "INT32" + } + } + ], + "num_rows": 3, + "total_byte_size": 74752 + } + ], + "schema": [ + { + "name": "duckdb_schema", + "num_children": 1, + "repetition_type": "REQUIRED" + }, + { + "converted_type": "MAP", + "name": "int_map", + "num_children": 1, + "repetition_type": "OPTIONAL" + }, + { + "name": "key_value", + "num_children": 2, + "repetition_type": "REPEATED" + }, + { + "converted_type": "UTF8", + "name": "key", + "repetition_type": "REQUIRED", + "type": "BYTE_ARRAY" + }, + { + "converted_type": "INT_32", + "name": "value", + "repetition_type": "OPTIONAL", + "type": "INT32" + } + ] +} diff --git a/test/files/Int_Map.parquet b/test/files/Int_Map.parquet new file mode 100644 index 0000000..28e8825 Binary files /dev/null and b/test/files/Int_Map.parquet differ diff --git a/test/read.test.js b/test/read.test.js index 4ff5026..fe10a25 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -17,7 +17,7 @@ const compressors = { } describe('parquetRead', () => { - const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) + const files = fs.readdirSync('test/files').filter(f => f.endsWith('Int_Map.parquet')) files.forEach(file => { it(`should parse data from ${file}`, async () => { @@ -50,11 +50,11 @@ describe('parquetRead', () => { onComplete: (rows) => { /* eslint-disable no-sparse-arrays */ expect(toJson(rows)).toEqual([ - [,, 2], - [,, 3], - [,, 4], - [,, 5], - [,, 2], + [2], + [3], + [4], + [5], + [2], ]) }, }) @@ -76,11 +76,39 @@ describe('parquetRead', () => { onComplete: (rows) => { /* eslint-disable no-sparse-arrays */ expect(toJson(rows)).toEqual([ - [,,,, [1, 2, 3]], - [,,,, null], - [,,,, null], - [,,,, [1, 2, 3]], - [,,,, [1, 2]], + [[1, 2, 3]], + [null], + [null], + [[1, 2, 3]], + [[1, 2]], + ]) + }, + }) + }) + + it('should read a map-like column from a file', async () => { + const asyncBuffer = fileToAsyncBuffer('test/files/Int_Map.parquet') + await parquetRead({ + file: asyncBuffer, + columns: ['int_map'], + onChunk: (rows) => { + expect(toJson(rows)).toEqual({ + columnName: 'int_map', + columnData: [ + { k1: 1, k2: 100 }, + { k1: 2 }, + { }, + ], + rowStart: 0, + rowEnd: 3, + }) + }, + onComplete: (rows) => { + /* eslint-disable no-sparse-arrays */ + expect(toJson(rows)).toEqual([ + [{ k1: 1, k2: 100 }], + [{ k1: 2 }], + [{}], ]) }, })