diff --git a/src/column.js b/src/column.js index f4f7089..03b5796 100644 --- a/src/column.js +++ b/src/column.js @@ -7,44 +7,33 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' import { snappyUncompress } from './snappy.js' import { concat } from './utils.js' -/** - * @typedef {import('./types.js').SchemaTree} SchemaTree - * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData - * @typedef {import('./types.js').Compressors} Compressors - * @typedef {import('./types.js').RowGroup} RowGroup - */ - /** * Parse column data from a buffer. * - * @param {ArrayBuffer} arrayBuffer parquet file contents - * @param {number} columnOffset offset to start reading from - * @param {RowGroup} rowGroup row group metadata + * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData + * @param {import('./types.js').DataReader} reader + * @param {import('./types.js').RowGroup} rowGroup row group metadata * @param {ColumnMetaData} columnMetadata column metadata - * @param {SchemaTree[]} schemaPath schema path for the column - * @param {Compressors} [compressors] custom decompressors + * @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column + * @param {import('./hyparquet.js').ParquetReadOptions} options read options * @returns {any[]} array of values */ -export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) { +export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compressors, utf8 }) { + const { element } = schemaPath[schemaPath.length - 1] /** @type {ArrayLike | undefined} */ let dictionary = undefined let seen = 0 /** @type {any[]} */ const rowData = [] - const { element } = schemaPath[schemaPath.length - 1] - // column reader: - const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 } while (seen < rowGroup.num_rows) { // parse column header const header = parquetHeader(reader) - if (header.compressed_page_size === undefined) { - throw new Error('parquet compressed page size is undefined') - } + // assert(header.compressed_page_size !== undefined) // read compressed_page_size bytes starting at offset const compressedBytes = new Uint8Array( - arrayBuffer, columnOffset + reader.offset, header.compressed_page_size + reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size ) // parse page data by type @@ -54,16 +43,14 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const daph = header.data_page_header if (!daph) throw new Error('parquet data page header is undefined') - const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors - ) + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) seen += daph.num_values // assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) // construct output values: skip nulls and construct lists dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, element) + values = convert(dataPage, element, utf8) if (repetitionLevels.length || definitionLevels?.length) { // Use repetition levels to construct lists const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) @@ -92,7 +79,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, seen += daph2.num_values dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, element) + values = convert(dataPage, element, utf8) if (repetitionLevels.length || definitionLevels?.length) { // Use repetition levels to construct lists const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) @@ -155,7 +142,7 @@ export function getColumnOffset({ dictionary_page_offset, data_page_offset }) { * @param {Uint8Array} compressedBytes * @param {number} uncompressed_page_size * @param {import('./types.js').CompressionCodec} codec - * @param {Compressors | undefined} compressors + * @param {import('./types.js').Compressors | undefined} compressors * @returns {Uint8Array} */ export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) { diff --git a/src/convert.js b/src/convert.js index 86230ca..a911e25 100644 --- a/src/convert.js +++ b/src/convert.js @@ -6,18 +6,11 @@ const dayMillis = 86400000 // 1 day in milliseconds * @typedef {import('./types.js').DecodedArray} DecodedArray * @param {DecodedArray} data series of primitive types * @param {import('./types.js').SchemaElement} schemaElement schema element for the data + * @param {boolean | undefined} utf8 decode bytes as utf8? * @returns {DecodedArray} series of rich types */ -export function convert(data, schemaElement) { +export function convert(data, schemaElement, utf8 = true) { const ctype = schemaElement.converted_type - if (ctype === 'UTF8') { - const decoder = new TextDecoder() - const arr = new Array(data.length) - for (let i = 0; i < arr.length; i++) { - arr[i] = data[i] && decoder.decode(data[i]) - } - return arr - } if (ctype === 'DECIMAL') { const scale = schemaElement.scale || 0 const factor = Math.pow(10, -scale) @@ -50,12 +43,19 @@ export function convert(data, schemaElement) { if (ctype === 'INTERVAL') { throw new Error('parquet interval not supported') } + if (ctype === 'UTF8' || utf8 && schemaElement.type === 'BYTE_ARRAY') { + const decoder = new TextDecoder() + const arr = new Array(data.length) + for (let i = 0; i < arr.length; i++) { + arr[i] = data[i] && decoder.decode(data[i]) + } + return arr + } // TODO: ctype UINT const logicalType = schemaElement.logical_type?.type if (logicalType === 'FLOAT16') { return Array.from(data).map(parseFloat16) } - // TODO: logical types return data } diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index 4194056..7979c56 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -100,6 +100,7 @@ export interface ParquetReadOptions { onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range. onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed compressors?: Compressors // custom decompressors + utf8?: boolean // decode byte arrays as utf8 strings (default true) } /** diff --git a/src/read.js b/src/read.js index 3f483fa..c1dedd1 100644 --- a/src/read.js +++ b/src/read.js @@ -81,7 +81,7 @@ export async function parquetRead(options) { * @returns {Promise} resolves to row data */ async function readRowGroup(options, rowGroup, groupStart) { - const { file, metadata, columns, compressors } = options + const { file, metadata, columns } = options if (!metadata) throw new Error('parquet metadata not found') // loop through metadata to find min/max bytes to read @@ -151,10 +151,9 @@ async function readRowGroup(options, rowGroup, groupStart) { // read column data async promises.push(buffer.then(arrayBuffer => { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) + const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } /** @type {any[] | undefined} */ - let columnData = readColumn( - arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors - ) + let columnData = readColumn(reader, rowGroup, columnMetadata, schemaPath, options) // assert(columnData.length === Number(rowGroup.num_rows) // TODO: fast path for non-nested columns diff --git a/test/convert.test.js b/test/convert.test.js index 23d947b..1d596ac 100644 --- a/test/convert.test.js +++ b/test/convert.test.js @@ -13,11 +13,27 @@ describe('convert function', () => { expect(convert(data, schemaElement)).toEqual(data) }) - it('converts byte arrays to UTF8 strings', () => { - const data = [new TextEncoder().encode('test'), new TextEncoder().encode('vitest')] + it('converts byte arrays to utf8', () => { + const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] /** @type {SchemaElement} */ const schemaElement = { name, converted_type: 'UTF8' } - expect(convert(data, schemaElement)).toEqual(['test', 'vitest']) + expect(convert(data, schemaElement)).toEqual(['foo', 'bar']) + }) + + it('converts byte arrays to utf8 default true', () => { + const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] + /** @type {SchemaElement} */ + const schemaElement = { name, type: 'BYTE_ARRAY' } + expect(convert(data, schemaElement)).toEqual(['foo', 'bar']) + }) + + it('preserves byte arrays utf8=false', () => { + const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')] + /** @type {SchemaElement} */ + const schemaElement = { name, type: 'BYTE_ARRAY' } + expect(convert(data, schemaElement, false)).toEqual([ + new Uint8Array([102, 111, 111]), new Uint8Array([98, 97, 114]), + ]) }) it('converts numbers to DECIMAL', () => { diff --git a/test/files/brotli_compressed.json b/test/files/brotli_compressed.json index 9956d35..b3b0100 100644 --- a/test/files/brotli_compressed.json +++ b/test/files/brotli_compressed.json @@ -1,22 +1,22 @@ [ [ 1593604800, - [97, 98, 99], + "abc", 42 ], [ 1593604800, - [100, 101, 102], + "def", 7.7 ], [ 1593604801, - [97, 98, 99], + "abc", 42.125 ], [ 1593604801, - [100, 101, 102], + "def", 7.7 ] ] diff --git a/test/files/hadoop_lz4_compressed.json b/test/files/hadoop_lz4_compressed.json index 9956d35..b3b0100 100644 --- a/test/files/hadoop_lz4_compressed.json +++ b/test/files/hadoop_lz4_compressed.json @@ -1,22 +1,22 @@ [ [ 1593604800, - [97, 98, 99], + "abc", 42 ], [ 1593604800, - [100, 101, 102], + "def", 7.7 ], [ 1593604801, - [97, 98, 99], + "abc", 42.125 ], [ 1593604801, - [100, 101, 102], + "def", 7.7 ] ] diff --git a/test/files/lz4_raw_compressed.json b/test/files/lz4_raw_compressed.json index 9956d35..b3b0100 100644 --- a/test/files/lz4_raw_compressed.json +++ b/test/files/lz4_raw_compressed.json @@ -1,22 +1,22 @@ [ [ 1593604800, - [97, 98, 99], + "abc", 42 ], [ 1593604800, - [100, 101, 102], + "def", 7.7 ], [ 1593604801, - [97, 98, 99], + "abc", 42.125 ], [ 1593604801, - [100, 101, 102], + "def", 7.7 ] ]